重复非阻塞通信

介绍

如果一个通信会被重复执行,比如循环结构内的通信调用,MPI 提供了重复非阻塞通信进行优化,以降低不必要的通信开销,下面是非阻塞通信的流程:

在重复通信时,通信的初始化操作并没有启动消息通信,消息的真正通信是由 MPI_START 触发的,消息的完成操作并不释放相应的非阻塞通信对象,只是将其状态置为非活动状态,若下面进行重复通信,再由 MPI_START 将对象置为活动状态,并启动通信。当不需要再进行通信时,必须通过显式的语句MPI_Request_free将非阻塞通信对象释放掉

通信模式

根据通信模式的不同,重复非阻塞通信也有四种不同的形式,即标准模式、缓存模式、同步模式和就绪模式,分别对应的函数为 MPI_Send_initMPI_Bsend_initMPI_Ssend_initMPI_Rsend_init,下面是函数原型:

// 标准模式
int MPI_Send_init(
    void * buf,             // 发送缓冲区起始地址
    int count,              // 发送数据的个数
    MPI_Datatype datatype,  // 发送数据的数据类型
    int dest,               // 目标进程标识
    int tag,                // 消息标识
    MPI_Comm comm,          // 通信域
    MPI_Request * request   // 非阻塞通信对象
);

// 缓存模式
int MPI_Bsend_init(
    void * buf,             // 发送缓冲区的起始地址
    int count,              // 发送数据的个数
    MPI_Datatype datatype,  // 发送数据的数据类型
    int dest,               // 目标进程标识
    int tag,                // 消息标识
    MPI_Comm comm,          // 通信域
    MPI_Request *request    // 非阻塞通信对象
);

// 同步模式
int MPI_Ssend_init(
    void * buf,             // 发送缓冲区的起始地址
    int count,              // 发送数据的个数
    MPI_Datatype datatype,  // 发送数据的数据类型
    int dest,               // 目标进程标识
    int tag,                // 消息标识
    MPI_Comm comm,          // 通信域
    MPI_Request *request    // 非阻塞通信对象
);

// 就绪模式
int MPI_Ssend_init(
    void * buf,             // 发送缓冲区的起始地址
    int count,              // 发送数据的个数
    MPI_Datatype datatype,  // 发送数据的数据类型
    int dest,               // 目标进程标识
    int tag,                // 消息标识
    MPI_Comm comm,          // 通信域
    MPI_Request *request    // 非阻塞通信对象
);

通过 MPI_Recv_init 函数来完成接收操作,下面是函数原型:

int MPI_Recv_init(
    void * buf,             // 接受缓冲区的起始地址
    int count,              // 接受数据的个数
    MPI_Datatype datatype,  // 接受数据的数据类型
    int dest,               // 源进程标识 或者 MPI_ANY_SOURCE
    int tag,                // 消息标识 或者 MPI_ANY_TAG
    MPI_Comm comm,          // 通信域
    MPI_Request *request    // 非阻塞通信对象
);

在前面提到,一个非阻塞通信在创建后会处于非活动状态,需要使用 MPI_Start 函数来激活通信,下面是函数原型

int MPI_Start(
    MPI_Request * request // 费祖通信对象
);

对于多个非阻塞通信,我们还可以使用 MPI_Startall 来同时激活多个非阻塞通信,下面是函数原型

int MPI_Startall(
    int count,              // 开始非阻塞通信对象的个数
    MPI_Request * requests  // 非阻塞通信对象数组
);

示例

下面是使用重复非阻塞通信的一个示例:

#include <stdio.h>
#include <stdlib.h>
#include "mpi.h"

int main() {
    int rank;
    int value;
    MPI_Request request;
    MPI_Status status;
    MPI_Init(NULL, NULL);

    MPI_Comm_rank(MPI_COMM_WORLD, &rank);

    if(rank == 0) {

        MPI_Send_init(&value, 1, MPI_INT, 1, 99, MPI_COMM_WORLD, &request);
        for(int i = 0; i < 10; i++) {
            value = i;
            MPI_Start(&request);
            MPI_Wait(&request, &status);
        }
        MPI_Request_free(&request);
    }

    if(rank == 1) {
        MPI_Recv_init(&value, 1, MPI_INT, 0, 99, MPI_COMM_WORLD, &request);
        for(int i = 0; i < 10; i++) {
            MPI_Start(&request);
            MPI_Wait(&request, &status);
            printf("value is %d\n", value);
        }
        MPI_Request_free(&request);
    }

    MPI_Finalize();
}

实现 Jacobi 迭代

下面是用重复非阻塞通信实现的Jacobi 迭代

void mpi_jacobi_new2() {
    int m = 18;
    int n = 18;
    int a[m][n];
    int b[m][n];
    int i, j, k;
    for(i = 0; i < m; i++) {
        for(j = 0; j < n; j++) {
            a[i][j] =  rand() / (RAND_MAX + 1.0) * 10 * (i + j) ;
        }
    }

    int size, rank;
    MPI_Init(NULL, NULL);
    MPI_Status status[4];
    MPI_Request request[4];
    MPI_Comm_size(MPI_COMM_WORLD, &size);
    MPI_Comm_rank(MPI_COMM_WORLD, &rank);

    // 每个进程计算的行数,为了简单这里假设正好可以除尽
    int gap = (m - 2) / size;
    int start = gap * rank + 1;
    int end = gap * (rank + 1);
    int bound;
    int left, right;

    // 使用虚拟进程
    left = rank - 1;
    if(left < 0) {
        left = MPI_PROC_NULL;
    }
    right = rank + 1;
    if(right > size - 1) {
        right = MPI_PROC_NULL;
    }

    MPI_Send_init(&b[start][0], n, MPI_INT, left, 99, MPI_COMM_WORLD, &request[0]);
    MPI_Send_init(&b[end][0], n, MPI_INT, right, 99, MPI_COMM_WORLD, &request[1]);
    MPI_Recv_init(&a[start - 1][0], n, MPI_INT, left, 99, MPI_COMM_WORLD, &request[2]);
    MPI_Recv_init(&a[end+1][0], n, MPI_INT, right, 99, MPI_COMM_WORLD, &request[3]);

    // 迭代10次,计算时忽略了 0,n-1 行 和 0,n-1 列
    for(k = 0; k < 10; k++) {

        // 计算边界的值
        bound = start;
        for(j = 1; j < m -1; j++) {
            b[bound][j] = 0.25 * (a[bound-1][j] + a[bound+1][j] + a[bound][j+1] + a[bound][j-1]);     
        }
        bound = end;
        for(j = 1; j < m -1; j++) {
            b[bound][j] = 0.25 * (a[bound-1][j] + a[bound+1][j] + a[bound][j+1] + a[bound][j-1]);     
        }

        MPI_Startall(4, request);
        // 计算剩余的部分
        for(i = start+1; i < end; i++) {
            for(j = 1; j < m -1; j++) {
                b[i][j] = 0.25 * (a[i-1][j] + a[i+1][j] + a[i][j+1] + a[i][j-1]);     
            }
        }

        for(i = start ; i <= end; i++) {
            for(j = 1; j < n - 1; j++) {
                a[i][j] = b[i][j];
            }
        }
        MPI_Waitall(4, request, status);
    }

    // 这里按照顺序输出结果
    for(k = 0; k< size; k++) {
        MPI_Barrier(MPI_COMM_WORLD);
        if(rank == k) {
            for(i = start; i <= end; i++) {
                for(j = 1; j < n-1; j++) {
                    printf("a[%d][%d] is %-4d ", i, j, a[i][j]);
                }

                printf("\n");
            }
        }
    }

    for(i = 0; i < 4; i++) {
        MPI_Request_free(&request[i]);
    }

    MPI_Finalize();
}

results matching ""

    No results matching ""

    results matching ""

      No results matching ""