К операциям этого типа относятся две представленные в предыдущем разделе коммуникационные процедуры. В коммуникационных операциях типа точка-точка всегда участвуют не более двух процессов: передающий и принимающий. В MPI имеется множество функций, реализующих такой тип обменов. Многообразие объясняется возможностью организации таких обменов множеством способов. Описанные в предыдущем разделе функции реализуют стандартный режим с блокировкой.
Блокирующие функции подразумевают полное окончание операции после выхода из процедуры, т.е. вызывающий процесс блокируется, пока операция не будет завершена. Для функции посылки сообщения это означает, что все пересылаемые данные помещены в буфер (для разных реализаций MPI это может быть либо какой-то промежуточный системный буфер, либо непосредственно буфер получателя). Для функции приема сообщения блокируется выполнение других операций, пока все данные из буфера не будут помещены в адресное пространство принимающего процесса.
Неблокирующие функции подразумевают совмещение операций обмена с другими операциями, поэтому неблокирующие функции передачи и приема по сути дела являются функциями инициализации соответствующих операций. Для опроса завершенности операции (и завершения) вводятся дополнительные функции.
Как для блокирующих, так и неблокирующих операций MPI поддерживает четыре режима выполнения. Эти режимы касаются только функций передачи данных, поэтому для блокирующих и неблокирующих операций имеется по четыре функции посылки сообщения. В таблице 10 перечислены имена базовых коммуникационных функций типа точка-точка, имеющихся в библиотеке MPI.
Таблица 10
Способ связи |
С блокировкой |
Без блокировки |
Стандартная посылка |
MPI_Send |
MPI_Isend |
Синхронная посылка |
MPI_Ssend |
MPI_Issend |
Буферизованная посылка |
MPI_Bsend |
MPI_Ibsend |
Согласованная посылка |
MPI_Rsend |
MPI_Irsend |
Прием информации |
MPI_Recv |
MPI_Irecv |
Из таблицы хорошо виден принцип формирования имен функций. К именам базовых функций Send/Recv добавляются различные префиксы.
Префикс S |
(synchronous) – означает синхронный режим передачи данных. Операция передачи данных заканчивается только тогда, когда заканчивается прием данных. Функция нелокальная. |
Префикс B |
(buffered) – означает буферизованный режим передачи данных. В адресном пространстве передающего процесса с помощью специальной функции создается буфер обмена, который используется в операциях обмена. Операция посылки заканчивается, когда данные помещены в этот буфер. Функция имеет локальный характер. |
Префикс R |
(ready) – согласованный или подготовленный режим передачи данных. Операция передачи данных начинается только тогда, когда принимающий процессор выставил признак готовности приема данных. Функция нелокальная. |
Префикс I |
(immediate) – относится к неблокирующим операциям. |
Все функции передачи и приема сообщений могут использоваться в любой комбинации друг с другом. Функции передачи, находящиеся в одном столбце, имеют совершенно одинаковый синтаксис и отличаются только внутренней реализацией. Поэтому в дальнейшем будем рассматривать только стандартный режим, который в обязательном порядке поддерживают все реализации MPI.
Блокирующие коммуникационные операции
В стандартном режиме выполнение операции обмена включает три этапа:
· Передающая сторона формирует пакет сообщения, в который помимо передаваемой информации упаковываются адрес отправителя (source), адрес получателя (dest), идентификатор сообщения (tag) и коммуникатор (comm). Этот пакет передается отправителем в буфер, и на этом функция посылки сообщения заканчивается.
· Сообщение системными средствами передается адресату.
· Принимающий процессор извлекает сообщение из системного буфера, когда у него появится потребность в этих данных. Содержательная часть сообщения помещается в адресное пространство принимающего процесса (параметр buf), а служебная – в параметр status.
Поскольку операция выполняется в асинхронном режиме, адресная часть принятого сообщения состоит из трех полей:
· коммуникатора (comm), поскольку каждый процесс может одновременно входить в несколько областей связи;
· номера отправителя в этой области связи (source);
· идентификатора сообщения (tag), который используется для взаимной привязки конкретной пары операций посылки и приема сообщений.
Параметр count (количество принимаемых элементов сообщения) в процедуре приема сообщения должен быть не меньше, чем длина принимаемого сообщения. При этом реально будет приниматься столько элементов, сколько находится в буфере. Такая реализация операции чтения связана с тем, что MPI допускает использование расширенных запросов для идентификаторов сообщений (MPI_ANY_TAG - читать сообщение с любым идентификатором) и для адресов отправителя (MPI_ANY_SOURCE - читать сообщение от любого отправителя). Не допускается расширенных запросов для коммуникаторов. Расширенные запросы возможны только в операциях чтения. В этом отражается фундаментальное свойство механизма передачи сообщений - асимметрия операций передачи и приема сообщений, связанная с тем, что инициатива в организации обмена принадлежит передающей стороне.
Таким образом, после чтения сообщения некоторые параметры могут оказаться неизвестными, а именно: число считанных элементов, идентификатор сообщения и адрес отправителя. Эту информацию можно получить с помощью параметра status. Переменные status должны быть явно объявлены в MPI программе. В языке C status - это структура типа MPI_Status с тремя полями MPI_SOURCE, MPI_TAG, MPI_ERROR. В языке FORTRAN status - массив типа INTEGER размера MPI_STATUS_SIZE. Константы MPI_SOURCE, MPI_TAG и MPI_ERROR определяют индексы элементов. Назначение полей переменной status представлено в таблице 11.
Таблица 11
Назначение полей переменной status
Поля status |
C |
FORTRAN |
Процесс-отправитель |
status.MPI_SOURCE |
status(MPI_SOURCE) |
Идентификатора сообщения |
status.MPI_TAG |
status(MPI_TAG) |
Код ошибки |
status.MPI_ERROR |
status(MPI_ERROR) |
Как видно из таблицы 11, количество считанных элементов в переменную status не заносится. Для определения числа фактически полученных элементов сообщения необходимо использовать специальную функцию MPI_Get_count:
FORTRAN:
INTEGER STATUS (MPI_STATUS_SIZE), DATATYPE, COUNT, IERR
MPI_GET_COUNT (STATUS, DATATYPE, COUNT, IERR)
C:
int MPI_Get_count (MPI_Status *status, MPI_Datatype datatype, int *count)
Входные параметры:
Status |
- |
атрибуты принятого сообщения; |
Datatype |
- |
тип элементов принятого сообщения. |
Выходные параметры:
count |
- |
число полученных элементов. |
Подпрограмма MPI_Get_count может быть вызвана либо после чтения сообщения (функциями MPI_Recv, MPI_Irecv), либо после опроса факта поступления сообщения (функциями MPI_Probe, MPI_Iprobe). Операция чтения безвозвратно уничтожает информацию в буфере приема. При этом попытка считать сообщение с параметром count меньше, чем число элементов в буфере, приводит к потере сообщения. Определить параметры полученного сообщения без его чтения можно с помощью функции MPI_Probe.
FORTRAN:
INTEGER SOURCE, TAG, COMM, STATUS(MPI_STATUS_SIZE), IERR
MPI_PROBE (SOURCE, TAG, COMM, STATUS, IERR)
C:
int MPI_Probe (int source, int tag, MPI_Comm comm, MPI_Status *status)
Входные параметры:
source |
- |
номер процесса-отправителя; |
tag |
- |
идентификатор сообщения; |
comm |
- |
коммуникатор. |
Выходные параметры:
status |
- |
атрибуты опрошенного сообщения. |
1 PROGRAM EXAMPLE
2 INCLUDE 'mpif.h'
3 PARAMETER (max=100)
4 INTEGER my_id, np, comm, ierr, status
5 INTEGER next, prev
6 REAL*8 Buf(max)
7 CALL MPI_Init(ierr)
8 comm=MPI_COMM_WORLD
9 CALL MPI_Comm_Size(comm, np, ierr)
10 CALL MPI_Comm_Rank(comm, my_id, ierr)
11 Write(*,*) 'Hello, ',my_id,' processor of ',
12 * np, 'processors'
13 next=my_id+1
14 prev=my_id-1
15 IF (my_id.EQ.0) Then
16 prev=np-1
17 End IF
18 IF (my_id.EQ.np-1) Then
19 next=0
20 End IF
21 IF (my_id.EQ.0) Then
22 Buf(1)=1
23 CALL MPI_Send(Buf,my_id+1,MPI_REAL8,next,1,comm,ierr)
24 CALL MPI_Probe (prev, 1, comm, status, ierr)
25 CALL MPI_Get_Count (status, MPI_REAL8, buf_count,
26 * ierr)
27 CALL MPI_Recv(Buf,buf_count, MPI_REAL8,prev,1,
28 * comm,status,ierr)
29 Else
30 CALL MPI_Probe (prev, 1, comm, status, ierr)
31 CALL MPI_Get_Count (status, MPI_REAL8, buf_count,
32 * ierr)
33 CALL MPI_Recv(Buf, buf_count, MPI_REAL8,prev,1,
34 * comm,status,ierr)
35 Buf(my_id+1)=Buf(prev+1)+1
36 CALL MPI_Send(Buf,my_id+1,MPI_REAL8,next,1,comm,ierr)
37 End IF
38 IF (my_id.EQ.0) Then
39 Write(*,*) my_id,': Buf=',(Buf(j),j=1,np)
40 End IF
41 CALL MPI_Finalize(ierr)
42 STOP
43 END
Рис. 24. Пример использования функций MPI_PROBE и MPI_GET_COUNT
Подпрограмма MPI_Probe выполняется с блокировкой, поэтому завершится она лишь тогда, когда сообщение с подходящим идентификатором и номером процесса-отправителя будет доступно для получения. Атрибуты этого сообщения возвращаются в переменной status. Следующий за MPI_Probe вызов MPI_Recv с теми же атрибутами сообщения (номером процесса-отправителя, идентификатором сообщения и коммуникатором) поместит в буфер приема именно то сообщение, наличие которого было опрошено подпрограммой MPI_Probe.
Рассмотрим пример, в котором принимающий процессор заранее не знает, какого размера будет принимаемый массив данных. На рис. 24 приведен пример с использованием функций MPI_PROBE и MPI_GET_COUNT для последующего использования возвращаемых ими значений в качестве параметров для функции MPI_RECV.
При использовании блокирующего режима передачи сообщений существует потенциальная опасность возникновения тупиковых ситуаций, в которых операции обмена данными блокируют друг друга, т.е. когда процессоры переходят в режим взаимного ожидания. В примере, приведенном выше, для 0 процессора первой стоит функция отправки сообщений, а для остальных процессоров – прием. Передача осуществляется следующим образом: 0 процессор передает 1 процессору, а 1 процессор принимает от 0, затем 1 передает 2 процессору, а 2 принимает от 1, и т.д. Конструкция, когда у всех процессоров первой стоит функция приема сообщений, блокирует выполнение программы и приводит к зависанию, в случае, когда у всех процессоров первой стоит функция передачи сообщения, нормально отработает, но результат будет другой.
В ситуациях, когда требуется выполнить взаимный обмен данными между процессами, безопаснее использовать совмещенную операцию MPI_Sendrecv.
FORTRAN:
<type> SENDBUF(*), RECVBUF(*)
INTEGER SENDCOUNT, SENDTYPE, DEST, SENDTAG
INTEGER RECVCOUNT, RECVTYPE, SOURCE, RECV TAG
INTEGER COMM, STATUS(MPI_STATUS_SIZE), IERR
MPI_SENDRECV(SENDBUF, SENDCOUNT, SENDTYPE, DEST, SENDTAG, RECVBUF, RECVCOUNT, RECVTYPE, SOURCE, RECVTAG, COMM, STATUS, IERR)
С:
int MPI_Sendrecv(void *sendbuf, int sendcount, MPI_Datatype sendtype, int dest, int sendtag, void *recvbuf, int recvcount, MPI_Datatype recvtype, int source, MPI_Datatypeа recvtag, MPI_Comm comm, MPI_Status *status)
Входные параметры:
sendbuf |
- |
адрес начала расположения посылаемого сообщения; |
sendcount |
- |
число посылаемых элементов; |
sendtype |
- |
тип посылаемых элементов; |
dest |
- |
номер процесса-получателя; |
sendtag |
- |
идентификатор посылаемого сообщения; |
recvcount |
- |
максимальное число принимаемых элементов; |
recvtype |
- |
тип элементов принимаемого сообщения; |
source |
- |
номер процесса-отправителя; |
recvtag |
- |
идентификатор принимаемого сообщения; |
comm |
- |
коммуникатор области связи. |
Выходные параметры:
recvbuf |
- |
адрес начала расположения принимаемого сообщения; |
status |
- |
атрибуты принятого сообщения. |
Функция MPI_Sendrecv совмещает выполнение операций передачи и приема. Обе операции используют один и тот же коммуникатор, но идентификаторы сообщений могут различаться. Расположение в адресном пространстве процесса принимаемых и передаваемых данных не должно пересекаться. Пересылаемые данные могут быть различного типа и иметь разную длину. В тех случаях, когда необходим обмен данными одного типа с замещением посылаемых данных на принимаемые, удобнее пользоваться функцией MPI_Sendrecv_replace.
FORTRAN:
<type> BUF(*)
INTEGER COUNT, DATATYPE, DEST, SENDTAG, SOURCE, RECVTAG
INTEGER COMM, STATUS(MPI_STATUS_SIZE), IERR
MPI_SENDRECV_REPLACE(BUF, COUNT, DATATYPE, DEST, SENDTAG, SOURCE, RECVTAG, COMM, STATUS, IERR)
С:
MPI_Sendrecv_replace(void* buf, int count, MPI_Datatype datatype, int dest, int sendtag, int source, int recvtag, MPI_Comm comm, MPI_Status *status)
Входные параметры:
buf |
- |
адрес начала расположения посылаемого и принимаемого сообщения; |
count |
- |
число передаваемых элементов; |
datatype |
- |
тип передаваемых элементов; |
dest |
- |
номер процесса-получателя; |
sendtag |
- |
идентификатор посылаемого сообщения; |
source |
- |
номер процесса-отправителя; |
recvtag |
- |
идентификатор принимаемого сообщения; |
comm |
- |
коммуникатор области связи. |
Выходные параметры:
buf |
- |
адрес начала расположения посылаемого и принимаемого сообщения; |
status |
- |
атрибуты принятого сообщения. |
В данной операции посылаемые данные из массива buf замещаются принимаемыми данными. В качестве адресатов source и dest в операциях пересылки данных можно использовать специальный адрес MPI_PROC_NULL. Коммуникационные операции с таким адресом ничего не делают. Применение этого адреса бывает удобным вместо использования логических конструкций для анализа условий посылать/читать сообщение или нет.
На рис. 25 приведен пример с использованием функций MPI_SENDRECV и MPI_SENDRECV_REPLACE.
1 PROGRAM EXAMPLE
2 INCLUDE 'mpif.h'
3 INTEGER my_id, np, comm,ierr, status
4 INTEGER next, prev
5 REAL*8 SBuf, RBUF
6 CALL MPI_Init(ierr)
7 comm=MPI_COMM_WORLD
8 CALL MPI_Comm_Size(comm, np, ierr)
9 CALL MPI_Comm_Rank(comm, my_id, ierr)
10 Write(*,*) 'Hello, ',my_id,' processor of '
11 * ,np, 'processors'
12 next=my_id+1
13 prev=my_id-1
14 IF (my_id.EQ.0) Then
15 prev=np-1
16 End IF
17 IF (my_id.EQ.np-1) Then
18 next=0
19 End IF
20 SBuf=my_id
21 Write (*,*) my_id,': Send to ',next,' number ',Sbuf
22 CALL MPI_SENDRECV(SBUF, 1, MPI_REAl8, next, 1, rbuf, 1,
23 * MPI_REAL8, my_id, 1, comm, status, ierr)
24 rbuf=rbuf+my_id
25 CALL MPI_SENDRECV_REPLACE(RBUF, 1, MPI_REAL8, prev, 1,
26 * my_id, 1,comm, STATUS, IERR)
27 Write (*,*)my_id,': Recv from ',next,' number ',Rbuf
28 CALL MPI_Finalize(ierr)
29 STOP
30 END
Рис. 25. Пример использования совмещенных коммуникационных операций
Неблокирующие коммуникационные операции
Использование неблокирующих коммуникационных операций более безопасно с точки зрения возникновения тупиковых ситуаций, а также может увеличить скорость работы программы за счет совмещения выполнения вычислительных и коммуникационных операций. Эти задачи решаются разделением коммуникационных операций на две стадии: инициирование операции и проверку завершения операции.
Неблокирующие операции используют специальный скрытый (opaque) объект "запрос обмена" (request) для связи между функциями обмена и функциями опроса их завершения. Для прикладных программ доступ к этому объекту возможен только через вызовы MPI функций. Если операция обмена завершена, подпрограмма проверки снимает "запрос обмена", устанавливая его в значение MPI_REQUEST_NULL. Снять запрос без ожидания завершения операции можно подпрограммой MPI_Request_free.
Функция передачи сообщения без блокировки MPI_Isend.
FORTRAN:
<type> BUF(*)
INTEGER COUNT, DATATYPE, DEST, TAG, COMM, REQUEST, IERR
MPI_ISEND(BUF, COUNT, DATATYPE, DEST, TAG, COMM, REQUEST, IERR)
C:
int MPI_Isend(void* buf, int count, MPI_Datatype datatype, int dest, int tag, MPI_Comm comm, MPI_Request *request)
Входные параметры:
buf |
- |
адрес начала расположения передаваемых данных; |
count |
- |
число посылаемых элементов; |
datatype |
- |
тип посылаемых элементов; |
dest |
- |
номер процесса-получателя; |
tag |
- |
идентификатор сообщения; |
comm |
- |
коммуникатор. |
Выходные параметры:
request |
- |
"запрос обмена". |
Возврат из подпрограммы происходит немедленно (immediate), без ожидания окончания передачи данных. Этим объясняется префикс I в именах функций. Поэтому переменную buf повторно использовать нельзя до тех пор, пока не будет погашен "запрос обмена". Это можно сделать с помощью подпрограмм MPI_Wait или MPI_Test, передав им параметр request.
Функция приема сообщения без блокировки MPI_Irecv.
FORTRAN:
<type> BUF(*)
INTEGER COUNT, DATATYPE, SOURCE, TAG, COMM, REQUEST, IERR
MPI_IRECV(BUF, COUNT, DATATYPE, SOURCE, TAG, COMM, REQUEST, IERR)
C:
int MPI_Irecv(void* buf, int count, MPI_Datatype datatype, int source, int tag, MPI_Comm comm, MPI_Request *request)
Входные параметры:
count |
- |
максимальное число принимаемых элементов; |
datatype |
- |
тип элементов принимаемого сообщения; |
source |
- |
номер процесса-отправителя; |
tag |
- |
идентификатор сообщения; |
comm |
- |
коммуникатор. |
Выходные параметры:
buf |
- |
адрес для принимаемых данных; |
request |
- |
"запрос обмена". |
Возврат из подпрограммы происходит немедленно, без ожидания окончания приема данных. Определить момент окончания приема можно с помощью подпрограмм MPI_Wait или MPI_Test с соответствующим параметром request.
Как и в блокирующих операциях, часто возникает необходимость опроса параметров полученного сообщения без его фактического чтения. Это делается с помощью функции MPI_Iprobe.
Неблокирующая функция чтения параметров полученного сообщения MPI_Iprobe.
FORTRAN:
LOGICAL FLAG
INTEGER SOURCE, TAG, COMM, STATUS(MPI_STATUS_SIZE), IERR
MPI_IPROBE (SOURCE, TAG, COMM, FLAG, STATUS, IERR)
C:
int MPI_Iprobe (int source, int tag, MPI_Comm comm, int *flag, MPI_Status *status)
Входные параметры:
source |
- |
номер процесса-отправителя; |
tag |
- |
идентификатор сообщения; |
comm |
- |
коммуникатор. |
Выходные параметры:
flag |
- |
признак завершенности операции; |
status |
- |
атрибуты опрошенного сообщения. |
Если flag=true, то операция завершилась, и в переменной status находятся атрибуты этого сообщения.
Воспользоваться результатом неблокирующей коммуникационной операции или повторно использовать ее параметры можно только после ее полного завершения.
Имеется два типа функций завершения неблокирующих операций:
- операции ожидания завершения семейства WAIT блокируют работу процесса до полного завершения операции;
- операции проверки завершения семейства TEST возвращают значения TRUE или FALSE в зависимости от того, завершилась операция или нет.
Они не блокируют работу процесса и полезны для предварительного определения факта завершения операции.
Функция ожидания завершения неблокирующей операции MPI_Wait.
FORTRAN:
INTEGER REQUEST, STATUS(MPI_STATUS_SIZE), IERR
MPI_WAIT(REQUEST, STATUS, IERR)
C:
int MPI_Wait(MPI_Request *request, MPI_Status *status)
Входные параметры:
request |
- |
запрос связи. |
Выходные параметры:
request |
- |
запрос связи; |
status |
- |
атрибуты сообщения. |
Это нелокальная блокирующая операция. Возврат происходит после завершения операции, связанной с запросом request. В параметре status возвращается информация о законченной операции.
Функция проверки завершения неблокирующей операции MPI_Test.
FORTRAN:
MPI_TEST(REQUEST, FLAG, STATUS, IERR)
LOGICAL FLAG
INTEGER REQUEST, STATUS(MPI_STATUS_SIZE), IERR
C:
int MPI_Test(MPI_Request *request, int *flag, MPI_Status *status)
Входные параметры:
request |
- |
запрос связи. |
Выходные параметры:
request |
- |
запрос связи; |
flag |
- |
признак завершенности проверяемой операции; |
status |
- |
атрибуты сообщения, если операция завершилась. |
Это локальная неблокирующая операция. Если связанная с запросом request операция завершена, возвращается flag = true, а status содержит информацию о завершенной операции. Если проверяемая операция не завершена, возвращается flag = false, а значение status в этом случае не определено.
Функция снятия запроса без ожидания завершения неблокирующей операции MPI_Request_free.
FORTRAN:
INTEGER REQUEST, IERROR
MPI_REQUEST_FREE(REQUEST, IERROR)
C:
int MPI_Request_free(MPI_Request *request)
Входные параметры:
request |
- |
запрос связи. |
Выходные параметры:
request |
- |
запрос связи. |
Параметр request устанавливается в значение MPI_REQUEST_NULL. Связанная с этим запросом операция не прерывается, однако проверить ее завершение с помощью MPI_Wait или MPI_Test уже нельзя. Для прерывания коммуникационной операции следует использовать функцию MPI_Cancel(MPI_Request *request).
В MPI имеется набор подпрограмм для одновременной проверки на завершение нескольких операций. Без подробного обсуждения приведем их перечень (таблица 12).
Кроме того, MPI позволяет для неблокирующих операций формировать целые пакеты запросов на коммуникационные операции MPI_Send_init и MPI_Recv_init, которые запускаются функциями MPI_Start или MPI_Startall. Проверка на завершение выполнения производится обычными средствами с помощью функций семейства WAIT и TEST.
Таблица 12
Функции коллективного завершения неблокирующих операций
Выполняемая проверка |
Функции ожидания (блокирующие) |
Функции проверки (неблокирующие) |
Завершились все операции |
MPI_Waitall |
MPI_Testall |
Завершилась, по крайней мере, одна операция |
MPI_Waitany |
MPI_Testany |
Завершилась одна из списка проверяемых |
MPI_Waitsome |
MPI_Testsome |