3.5 Функции коллективного взаимодействия

Набор операций типа точка-точка является достаточным для программирования любых алгоритмов, однако MPI вряд ли бы завоевал такую популярность, если бы ограничивался только этим набором коммуникационных операций. Одной из наиболее привлекательных сторон MPI является наличие широкого набора коллективных операций, которые берут на себя выполнение наиболее часто встречающихся при программировании действий. Например, часто возникает потребность разослать некоторую переменную или массив из одного процессора всем остальным. Каждый программист может написать такую процедуру с использованием операций Send/Recv, однако гораздо удобнее воспользоваться коллективной операцией MPI_Bcast. Причем гарантировано, что эта операция будет выполняться гораздо эффективнее, поскольку MPI – функция реализована с использованием внутренних возможностей коммуникационной среды.

Главное отличие коллективных операций от операций типа точка-точка состоит в том, что в них всегда участвуют все процессы, связанные с некоторым коммуникатором. Несоблюдение этого правила приводит либо к аварийному завершению задачи, либо к еще более неприятному зависанию задачи.

Набор коллективных операций включает:

Все коммуникационные подпрограммы, за исключением MPI_Bcast, представлены в двух вариантах:

Отличительные особенности коллективных операций:

Функция синхронизации процессов MPI_Barrier блокирует работу вызвавшего ее процесса до тех пор, пока все другие процессы группы также не вызовут эту функцию. Завершение работы этой функции возможно только всеми процессами одновременно (все процессы “преодолевают барьер” одновременно).

int MPI_Barrier(MPI_Comm comm)

где comm – коммуникатор.

Синхронизация с помощью барьеров используется, например, для завершения всеми процессами некоторого этапа решения задачи, результаты которого будут использоваться на следующем этапе. Использование барьера гарантирует, что ни один из процессов не приступит раньше времени к выполнению следующего этапа, пока результат работы предыдущего не будет окончательно сформирован. Неявную синхронизацию процессов выполняет любая коллективная функция.

Широковещательная рассылка данных выполняется с помощью функции MPI_Bcast. Процесс с номером root рассылает сообщение из своего буфера передачи всем процессам области связи коммуникатора comm.

int MPI_Bcast(void* buffer, int count, MPI_Datatype datatype, int root, MPI_Comm comm )

где buffer – адрес начала расположения в памяти рассылаемых данных, count – число посылаемых элементов, datatype – тип посылаемых элементов, root – номер процесса-отправителя, comm – коммуникатор.

После завершения подпрограммы каждый процесс в области связи коммуникатора comm, включая и самого отправителя, получит копию сообщения от процесса-отправителя root.

Семейство функций сбора блоков данных от всех процессов группы состоит из четырех подпрограмм:

MPI_Gather, MPI_Allgather, MPI_Gatherv, MPI_Allgatherv.

Каждая из указанных подпрограмм расширяет функциональные возможности предыдущих.

Функция MPI_Gather производит сборку блоков данных, посылаемых всеми процессами группы, в один массив процесса с номером root. Длина блоков предполагается одинаковой. Объединение происходит в порядке увеличения номеров процессов-отправителей. То есть данные, посланные процессом i из своего буфера sendbuf, помещаются в i-ю порцию буфера recvbuf процесса root. Длина массива, в который собираются данные, должна быть достаточной для их размещения.

int MPI_Gather(void* sendbuf, int sendcount, MPI_Datatype sendtype, void* recvbuf, int recvcount, MPI_Datatype recvtype, int root, MPI_Comm comm)

где sendbuf – адрес начала размещения посылаемых данных, sendcount – число посылаемых элементов, sendtype – тип посылаемых элементов, recvbuf – адрес начала буфера приема (используется только в процессе-получателе root), recvcount – число элементов, получаемых от каждого процесса (используется только в процессе-получателе root), recvtype – тип получаемых элементов, root – номер процесса-получателя, comm – коммуникатор.

Тип посылаемых элементов sendtype должен совпадать с типом recvtype получаемых элементов, а число sendcount должно равняться числу recvcount. То есть, recvcount в вызове из процесса root – это число собираемых от каждого процесса элементов, а не общее количество собранных элементов.


PIC


Рис. 3.1: Графическая интерпретация операции Gather


Пример программы с использованием функции MPI_Gather.

MPI_Comm comm; 
int array[100]; 
int root, *rbuf; 
MPI_Comm_size(comm, &gsize); 
rbuf = (int *) malloc( gsize * 100 * sizeof(int)); 
MPI_Gather(array, 100, MPI_INT, rbuf, 100, MPI_INT, root, comm);

Функция MPI_Allgather выполняется так же, как MPI_Gather, но получателями являются все процессы группы. Данные, посланные процессом i из своего буфера sendbuf, помещаются в i-ю порцию буфера recvbuf каждого процесса. После завершения операции содержимое буферов приема recvbuf у всех процессов одинаково.

int MPI_Allgather(void* sendbuf, int sendcount, MPI_Datatype sendtype, void* recvbuf, int recvcount, MPI_Datatype recvtype, MPI_Comm comm)

где sendbuf – адрес начала буфера посылки, sendcount – число посылаемых элементов, sendtype – тип посылаемых элементов, recvbuf – адрес начала буфера приема, recvcount – число элементов, получаемых от каждого процесса, recvtype – тип получаемых элементов, comm – коммуникатор.


PIC


Рис. 3.2: Графическая интерпретация операции Allgater. На этой схеме ось Y образуют процессы группы, а ось X блоки данных


Функция MPI_Gatherv позволяет собирать блоки с разным числом элементов от каждого процесса, так как количество элементов, принимаемых от каждого процесса, задается индивидуально с помощью массива recvcounts. Эта функция обеспечивает также большую гибкость при размещении данных в процессе-получателе, благодаря введению в качестве параметра массива смещений displs.

int MPI_Gatherv(void* sendbuf, int sendcount, MPI_Datatype sendtype, void* rbuf, int *recvcounts, int *displs, MPI_Datatype recvtype, int root, MPI_Comm comm)

где sendbuf – адрес начала буфера передачи, sendcount – число посылаемых элементов, sendtype – тип посылаемых элементов, rbuf – адрес начала буфера приема, recvcounts – целочисленный массив (размер равен числу процессов в группе), i-й элемент которого определяет число элементов, которое должно быть получено от процесса i, displs – целочисленный массив (размер равен числу процессов в группе), i-ое значение определяет смещение i-го блока данных относительно начала rbuf, recvtype – тип получаемых элементов, root – номер процесса-получателя, comm – коммуникатор.

Сообщения помещаются в буфер приема процесса root в соответствии с номерами посылающих процессов, а именно, данные, посланные процессом i, размещаются в адресном пространстве процесса root, начиная с адреса rbuf + displs[i]. представлена на рис.


PIC


Рис. 3.3: Графическая интерпретация операции Gatherv


Функция MPI_Allgatherv является аналогом функции MPI_Gatherv, но сборка выполняется всеми процессами группы. Поэтому в списке параметров отсутствует параметр root.

int MPI_Allgatherv(void* sendbuf, int sendcount, MPI_Datatype sendtype, void* rbuf, int *recvcounts, int *displs, MPI_Datatype recvtype, MPI_Comm comm)

где sendbuf – адрес начала буфера передачи, sendcount – число посылаемых элементов, sendtype – тип посылаемых элементов, rbuf – адрес начала буфера приема, recvcounts – целочисленный массив (размер равен числу процессов в группе), содержащий число элементов, которое должно быть получено от каждого процесса, displs – целочисленный массив (размер равен числу процессов в группе), i-ое значение определяет смещение относительно начала rbuf i-го блока данных, recvtype – тип получаемых элементов, comm – коммуникатор.

Семейство функций распределения блоков данных по всем процессам группы состоит из двух подпрограмм: MPI_Scatter и MPI_Scatterv.

Функция MPI_Scatter разбивает сообщение из буфера посылки процесса root на равные части размером sendcount и посылает i-ю часть в буфер приема процесса с номером i (в том числе и самому себе). Процесс root использует оба буфера (посылки и приема), поэтому в вызываемой им подпрограмме все параметры являются существенными. Остальные процессы группы с коммуникатором comm являются только получателями, поэтому для них параметры, специфицирующие буфер посылки, не существенны.

int MPI_Scatter(void* sendbuf, int sendcount, MPI_Datatype sendtype, void* recvbuf, int recvcount, MPI_Datatype recvtype, int root, MPI_Comm comm)

где sendbuf – адрес начала размещения блоков распределяемых данных (используется только в процессе-отправителе root), sendcount – число элементов, посылаемых каждому процессу, sendtype – тип посылаемых элементов, recvbuf – адрес начала буфера приема, recvcount – число получаемых элементов, recvtype – тип получаемых элементов, root – номер процесса-отправителя, comm – коммуникатор.

Тип посылаемых элементов sendtype должен совпадать с типом recvtype получаемых элементов, а число посылаемых элементов sendcount должно равняться числу принимаемых recvcount. Следует отметить, что значение sendcount в вызове из процесса root – это число посылаемых каждому процессу элементов, а не общее их количество. Операция Scatter является обратной по отношению к Gather.


PIC


Рис. 3.4: Графическая интерпретация операции Scatter.


Пример использования функции MPI_Scatter

MPI_Comm comm; 
int rbuf[100], gsize; 
int root, *array; 
MPI_Comm_size(comm, &gsize); 
array = (int *) malloc(gsize * 100 * sizeof(int)); 
MPI_Scatter(array, 100, MPI_INT, rbuf, 100, MPI_INT, root, comm);

Функция MPI_Scatterv является векторным вариантом функции MPI_Scatter, позволяющий посылать каждому процессу различное количество элементов. Начало расположения элементов блока, посылаемого i-му процессу, задается в массиве смещений displs, а число посылаемых элементов – в массиве sendcounts. Эта функция является обратной по отношению к функции MPI_Gatherv.

int MPI_Scatterv(void* sendbuf, int *sendcounts, int *displs, MPI_Datatype sendtype, void* recvbuf, int recvcount, MPI_Datatype recvtype, int root, MPI_Comm comm)

где sendbuf – адрес начала буфера посылки (используется только в процессе-отправителе root), sendcounts – целочисленный массив (размер равен числу процессов в группе), содержащий число элементов, посылаемых каждому процессу, displs – целочисленный массив (размер равен числу процессов в группе), i-ое значение определяет смещение относительно начала sendbuf для данных, посылаемых процессу i, sendtype – тип посылаемых элементов, recvbuf – адрес начала буфера приема, recvcount – число получаемых элементов, recvtype – тип получаемых элементов, root – номер процесса-отправителя, comm – коммуникатор.


PIC


Рис. 3.5: Графическая интерпретация операции Scatterv.


Совмещенные коллективные операции

Функция MPI_Alltoall совмещает в себе операции Scatter и Gather и является по сути дела расширением операции Allgather, когда каждый процесс посылает различные данные разным получателям. Процесс i посылает j-ый блок своего буфера sendbuf процессу j, который помещает его в i-ый блок своего буфера recvbuf. Количество посланных данных должно быть равно количеству полученных данных для каждой пары процессов.

int MPI_Alltoall(void* sendbuf, int sendcount, MPI_Datatype sendtype, void* recvbuf, int recvcount, MPI_Datatype recvtype, MPI_Comm comm)

где sendbuf – адрес начала буфера посылки, sendcount – число посылаемых элементов, sendtype – тип посылаемых элементов, recvbuf – адрес начала буфера приема, recvcount – число элементов, получаемых от каждого процесса, recvtype – тип получаемых элементов, comm – коммуникатор.


PIC


Рис. 3.6: Графическая интерпретация операции Alltoall.


Функция MPI_Alltoallv реализует векторный вариант операции Alltoall, допускающий передачу и прием блоков различной длины с более гибким размещением передаваемых и принимаемых данных.

Глобальные вычислительные операции над распределенными данными

В параллельном программировании математические операции над блоками данных, распределенных по процессорам, называют глобальными операциями редукции. В общем случае операцией редукции называется операция, аргументом которой является вектор, а результатом – скалярная величина, полученная применением некоторой математической операции ко всем компонентам вектора. В частности, если компоненты вектора расположены в адресных пространствах процессов, выполняющихся на различных процессорах, то в этом случае говорят о глобальной (параллельной) редукции. Например, пусть в адресном пространстве всех процессов некоторой группы процессов имеются копии переменной var (необязательно имеющие одно и то же значение), тогда применение к ней операции вычисления глобальной суммы или, другими словами, операции редукции SUM возвратит одно значение, которое будет содержать сумму всех локальных значений этой переменной. Использование этих операций является одним из основных средств организации распределенных вычислений.

В MPI глобальные операции редукции представлены в нескольких вариантах:

Функция MPI_Reduce выполняется следующим образом. Операция глобальной редукции, указанная параметром op, выполняется над первыми элементами входного буфера, и результат посылается в первый элемент буфера приема процесса root. Затем то же самое делается для вторых элементов буфера и т.д.

int MPI_Reduce(void* sendbuf, void* recvbuf, int count, MPI_Datatype datatype, MPI_Op op, int root, MPI_Comm comm)

где sendbuf – адрес начала входного буфера, recvbuf – адрес начала буфера результатов (используется только в процессе-получателе root), count – число элементов во входном буфере, datatype – тип элементов во входном буфере, op – операция, по которой выполняется редукция, root – номер процесса-получателя результата операции, comm – коммуникатор.


PIC


Рис. 3.7: Графическая интерпретация операции Reduce. На данной схеме операция “+” означает любую допустимую операцию редукции.


В качестве операции op можно использовать либо одну из предопределенных операций, либо операцию, сконструированную пользователем. Все предопределенные операции являются ассоциативными и коммутативными. Сконструированная пользователем операция, по крайней мере, должна быть ассоциативной. Порядок редукции определяется номерами процессов в группе. Тип datatype элементов должен быть совместим с операцией op. В таблице представлен перечень предопределенных операций, которые могут быть использованы в функциях редукции MPI.





Название Операция Разрешенные типы



MPI_MAX Максимум С integer, FORTRAN integer,
MPI_MIN Минимум Floating point



MPI_SUM Сумма C integer, FORTRAN integer,
MPI_PROD Произведение Floating point, Complex



MPI_LAND Логическое AND
MPI_LOR Логическое OR C integer, Logical
MPI_LXOR Логическое исключающее OR



MPI_BAND Поразрядное AND
MPI_BOR Поразрядное OR C integer, FORTRAN integer,
MPI_BXOR Поразрядное исключающее OR Byte



MPI_MAXLOC Максимальное значение
и его индекс Специальные типы
MPI_MINLOC Минимальное значение для этих функций
и его индекс




Таблица 3.3: Операции в функциях редукции MPI.

Операции MAXLOC и MINLOC выполняются над специальными парными типами, каждый элемент которых хранит две величины: значения, по которым ищется максимум или минимум, и индекс элемента. В MPI под С имеется 6 таких предопределенных типов:

Функция MPI_Allreduce сохраняет результат редукции в адресном пространстве всех процессов, поэтому в списке параметров функции отсутствует идентификатор корневого процесса root. В остальном, набор параметров такой же, как и в предыдущей функции.

int MPI_Allreduce(void* sendbuf, void* recvbuf, int count, MPI_Datatype datatype, MPI_Op op, MPI_Comm comm)

где sendbuf – адрес начала входного буфера, recvbuf – адрес начала буфера приема, count – число элементов во входном буфере, datatype – тип элементов во входном буфере, op – операция, по которой выполняется редукция, comm – коммуникатор.


PIC


Рис. 3.8: Графическая интерпретация операции Allreduce.


Функция MPI_Reduce_scatter совмещает в себе операции редукции и распределения результата по процессам.

MPI_Reduce_scatter(void* sendbuf, void* recvbuf, int *recvcounts, MPI_Datatype datatype, MPI_Op op, MPI_Comm comm)

где sendbuf – адрес начала входного буфера, recvbuf – адрес начала буфера приема, revcount – массив, в котором задаются размеры блоков, посылаемых процессам, datatype – тип элементов во входном буфере, op – операция, по которой выполняется редукция, comm – коммуникатор.

Функция MPI_Reduce_scatter отличается от MPI_Allreduce тем, что результат операции разрезается на непересекающиеся части по числу процессов в группе, i-ая часть посылается i-ому процессу в его буфер приема. Длины этих частей задает третий параметр, являющийся массивом.


PIC


Рис. 3.9: Графическая интерпретация операции Reduce_scatter.


Функция MPI_Scan выполняет префиксную редукцию. Параметры такие же, как в MPI_Allreduce, но получаемые каждым процессом результаты отличаются друг от друга. Операция пересылает в буфер приема i-го процесса редукцию значений из входных буферов процессов с номерами 0, ... i включительно.

int MPI_Scan(void* sendbuf, void* recvbuf, int count, MPI_Datatype datatype, MPI_Op op, MPI_Comm comm)

где sendbuf – адрес начала входного буфера, recvbuf – адрес начала буфера приема, count – число элементов во входном буфере, datatype – тип элементов во входном буфере, op – операция, по которой выполняется редукция, comm – коммуникатор.


PIC


Рис. 3.10: Графическая интерпретация операции Scan.