重复非阻塞通信
介绍
如果一个通信会被重复执行,比如循环结构内的通信调用,MPI 提供了重复非阻塞通信进行优化,以降低不必要的通信开销,下面是非阻塞通信的流程:
在重复通信时,通信的初始化操作并没有启动消息通信,消息的真正通信是由 MPI_START 触发的,消息的完成操作并不释放相应的非阻塞通信对象,只是将其状态置为非活动状态,若下面进行重复通信,再由 MPI_START 将对象置为活动状态,并启动通信。当不需要再进行通信时,必须通过显式的语句MPI_Request_free
将非阻塞通信对象释放掉。
通信模式
根据通信模式的不同,重复非阻塞通信也有四种不同的形式,即标准模式、缓存模式、同步模式和就绪模式,分别对应的函数为 MPI_Send_init
,MPI_Bsend_init
,MPI_Ssend_init
和MPI_Rsend_init
,下面是函数原型:
// 标准模式
int MPI_Send_init(
void * buf, // 发送缓冲区起始地址
int count, // 发送数据的个数
MPI_Datatype datatype, // 发送数据的数据类型
int dest, // 目标进程标识
int tag, // 消息标识
MPI_Comm comm, // 通信域
MPI_Request * request // 非阻塞通信对象
);
// 缓存模式
int MPI_Bsend_init(
void * buf, // 发送缓冲区的起始地址
int count, // 发送数据的个数
MPI_Datatype datatype, // 发送数据的数据类型
int dest, // 目标进程标识
int tag, // 消息标识
MPI_Comm comm, // 通信域
MPI_Request *request // 非阻塞通信对象
);
// 同步模式
int MPI_Ssend_init(
void * buf, // 发送缓冲区的起始地址
int count, // 发送数据的个数
MPI_Datatype datatype, // 发送数据的数据类型
int dest, // 目标进程标识
int tag, // 消息标识
MPI_Comm comm, // 通信域
MPI_Request *request // 非阻塞通信对象
);
// 就绪模式
int MPI_Ssend_init(
void * buf, // 发送缓冲区的起始地址
int count, // 发送数据的个数
MPI_Datatype datatype, // 发送数据的数据类型
int dest, // 目标进程标识
int tag, // 消息标识
MPI_Comm comm, // 通信域
MPI_Request *request // 非阻塞通信对象
);
通过 MPI_Recv_init
函数来完成接收操作,下面是函数原型:
int MPI_Recv_init(
void * buf, // 接受缓冲区的起始地址
int count, // 接受数据的个数
MPI_Datatype datatype, // 接受数据的数据类型
int dest, // 源进程标识 或者 MPI_ANY_SOURCE
int tag, // 消息标识 或者 MPI_ANY_TAG
MPI_Comm comm, // 通信域
MPI_Request *request // 非阻塞通信对象
);
在前面提到,一个非阻塞通信在创建后会处于非活动状态,需要使用 MPI_Start
函数来激活通信,下面是函数原型
int MPI_Start(
MPI_Request * request // 费祖通信对象
);
对于多个非阻塞通信,我们还可以使用 MPI_Startall
来同时激活多个非阻塞通信,下面是函数原型
int MPI_Startall(
int count, // 开始非阻塞通信对象的个数
MPI_Request * requests // 非阻塞通信对象数组
);
示例
下面是使用重复非阻塞通信的一个示例:
#include <stdio.h>
#include <stdlib.h>
#include "mpi.h"
int main() {
int rank;
int value;
MPI_Request request;
MPI_Status status;
MPI_Init(NULL, NULL);
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
if(rank == 0) {
MPI_Send_init(&value, 1, MPI_INT, 1, 99, MPI_COMM_WORLD, &request);
for(int i = 0; i < 10; i++) {
value = i;
MPI_Start(&request);
MPI_Wait(&request, &status);
}
MPI_Request_free(&request);
}
if(rank == 1) {
MPI_Recv_init(&value, 1, MPI_INT, 0, 99, MPI_COMM_WORLD, &request);
for(int i = 0; i < 10; i++) {
MPI_Start(&request);
MPI_Wait(&request, &status);
printf("value is %d\n", value);
}
MPI_Request_free(&request);
}
MPI_Finalize();
}
实现 Jacobi 迭代
下面是用重复非阻塞通信实现的Jacobi 迭代
void mpi_jacobi_new2() {
int m = 18;
int n = 18;
int a[m][n];
int b[m][n];
int i, j, k;
for(i = 0; i < m; i++) {
for(j = 0; j < n; j++) {
a[i][j] = rand() / (RAND_MAX + 1.0) * 10 * (i + j) ;
}
}
int size, rank;
MPI_Init(NULL, NULL);
MPI_Status status[4];
MPI_Request request[4];
MPI_Comm_size(MPI_COMM_WORLD, &size);
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
// 每个进程计算的行数,为了简单这里假设正好可以除尽
int gap = (m - 2) / size;
int start = gap * rank + 1;
int end = gap * (rank + 1);
int bound;
int left, right;
// 使用虚拟进程
left = rank - 1;
if(left < 0) {
left = MPI_PROC_NULL;
}
right = rank + 1;
if(right > size - 1) {
right = MPI_PROC_NULL;
}
MPI_Send_init(&b[start][0], n, MPI_INT, left, 99, MPI_COMM_WORLD, &request[0]);
MPI_Send_init(&b[end][0], n, MPI_INT, right, 99, MPI_COMM_WORLD, &request[1]);
MPI_Recv_init(&a[start - 1][0], n, MPI_INT, left, 99, MPI_COMM_WORLD, &request[2]);
MPI_Recv_init(&a[end+1][0], n, MPI_INT, right, 99, MPI_COMM_WORLD, &request[3]);
// 迭代10次,计算时忽略了 0,n-1 行 和 0,n-1 列
for(k = 0; k < 10; k++) {
// 计算边界的值
bound = start;
for(j = 1; j < m -1; j++) {
b[bound][j] = 0.25 * (a[bound-1][j] + a[bound+1][j] + a[bound][j+1] + a[bound][j-1]);
}
bound = end;
for(j = 1; j < m -1; j++) {
b[bound][j] = 0.25 * (a[bound-1][j] + a[bound+1][j] + a[bound][j+1] + a[bound][j-1]);
}
MPI_Startall(4, request);
// 计算剩余的部分
for(i = start+1; i < end; i++) {
for(j = 1; j < m -1; j++) {
b[i][j] = 0.25 * (a[i-1][j] + a[i+1][j] + a[i][j+1] + a[i][j-1]);
}
}
for(i = start ; i <= end; i++) {
for(j = 1; j < n - 1; j++) {
a[i][j] = b[i][j];
}
}
MPI_Waitall(4, request, status);
}
// 这里按照顺序输出结果
for(k = 0; k< size; k++) {
MPI_Barrier(MPI_COMM_WORLD);
if(rank == k) {
for(i = start; i <= end; i++) {
for(j = 1; j < n-1; j++) {
printf("a[%d][%d] is %-4d ", i, j, a[i][j]);
}
printf("\n");
}
}
}
for(i = 0; i < 4; i++) {
MPI_Request_free(&request[i]);
}
MPI_Finalize();
}