MPI

MPI and Graphs

最近在做图计算中想用分布式实现一下,找到一些使用MPI接口来实现基础计算的例子。我其实到现在并没有 完全理解MPI这个定义。Pregel GraphX这些分布式图计算的实现方式似乎都是基于消息传递 模型来实现的,但是似乎和MPI协议不同,似乎是更加利于复杂任务的抽象,计算的多样性更强。 现在的理解是,类MP的计算模型应该都是基于MPI这种接口的设计思想来开发的,但MPI是最基本的 分布式消息传递接口架构,所以支持更加贴近硬件。本文记录在学习、使用MPI中的一些 想法和思考,主要是面向图计算。

MPI初步

MPI是一种基于消息传递的统一接口,所以并不是一种软件,可以看作一种统一协议,便于 分布式程序员的工作可以互通。而基于MPI,有一些开源的实现可以让我们在某些语言中 编译MPI这种接口从而处理我们的计算任务,比如OpenMPI MPICH.

MPI的通信

MPI就是一种消息传递的接口,而这其中包含了一些需要理解的概念。首先是communicator通讯器 communicator 定义了一组能够互相发送消息的进程,这些进程各自有一个rank来进行通信的顺序定义。 每一个进程发送的消息包含一个tag,用来定义消息的接受类型。

通信模式有两种,一种是point-to-point也就是一个进程向另一个进程发送消息,这也是最常见的 情况,而有时候某一个进程想要广播一条消息,这时候用点对点其实很麻烦,所以有 collective的集合通讯模式

安装

单机

OpenMPI和MPICH都是很好的实现,使用方式相同, 官网下载稳定版然后编译安装,没什么好说的。

多机分布式安装

TO be explored,我的实验希望能够在32台高内存机器上实际运行。 Here is a tutorial: MPICH with LAN

Hello Wolrd

Here is a small hello world example of MPI Programming:

#include <mpi.h>
#include <stdio.h>

int main(int argc, char** argv) {
    // 初始化 MPI 环境
    MPI_Init(NULL, NULL);
    /* 在hello world这个简单的例子中MPI_Init的两个参数没啥用,但复杂情况下会有用的*/
    // 通过调用以下方法来得到所有可以工作的进程数量
    int world_size;
    MPI_Comm_size(MPI_COMM_WORLD, &world_size);

    // 得到当前进程的秩
    int world_rank;
    MPI_Comm_rank(MPI_COMM_WORLD, &world_rank);

    // 得到当前进程的名字
    char processor_name[MPI_MAX_PROCESSOR_NAME];
    int name_len;
    MPI_Get_processor_name(processor_name, &name_len);

    // 打印一条带有当前进程名字,秩以及
    // 整个 communicator 的大小的 hello world 消息。
    printf("Hello world from processor %s, rank %d out of %d processors\n",
           processor_name, world_rank, world_size);

    // 释放 MPI 的一些资源
    MPI_Finalize();
}

NOTES: MPI_COMM_WORLDMPI程序内置变量,表示整个MPI程序的communicator情况。

EXECS=mpi_hello_world
MPICC?=mpicc

all: ${EXECS}

mpi_hello_world: mpi_hello_world.c
    ${MPICC} -o mpi_hello_world mpi_hello_world.c

clean:
    rm ${EXECS}

以上就是makefile的写法,简单说就是要套一层mpicc来编译c语言;mpic++来编译C++

MPI核心实现单元 接收和发送

几个核心概念:消息形式、缓存、网络通信、发送位置

在MPI进行消息通讯时,一般来说是将一些需要发送的数据打包成一个消息的包,放到缓存之中, 从而别的特定rank来决定收取形式,也就是通过rank来实现收取。 而有时候需要区分消息的形式,也就是特定消息才会被接受,这时候需要利用到tag也就是 标签来进行判断。举个例子,A发送了一些不同类型(也就是带有不同tag)的消息给B, 一方面B无论如何是要区分这些消息类型的,从而便于分类处理,另一方面,如果此时此刻B 只想要处理其中某一种类型的消息,可以先选择性接受其中一种tag,这时候其他类型消息会先被缓存。

Eg.

MPI_Send(
    void* data, // 数据缓存
    int count, // 数据数量
    MPI_Datatype datatype, // 数据类型
    int destination, // 数据发送到的rank
    int tag, // 数据的tag
    MPI_Comm communicatorm // 数据所使用的communicator
);
MPI_Recv(
    void* data,
    int count,
    MPI_Datatype,
    int source,
    int tag,
    MPI_Comm,
    MPI_Status* confirm
)

与Send相似,只是多一个status来确认是否消息已经被收到。

下面是一段实际的消息发送和接收的代码

// 得到当前进程的 rank 以及整个 communicator 的大小
int world_rank;
MPI_Comm_rank(MPI_COMM_WORLD, &world_rank);
int world_size;
MPI_Comm_size(MPI_COMM_WORLD, &world_size);

int number;
if (world_rank == 0) {
    number = -1;
    MPI_Send(&number, 1, MPI_INT, 1, 0, MPI_COMM_WORLD);
} else if (world_rank == 1) {
    MPI_Recv(&number, 1, MPI_INT, 0, 0, MPI_COMM_WORLD,
             MPI_STATUS_IGNORE);
    printf("Process 1 received number %d from process 0\n",
           number);
}

这里一个虽然很显然需要理解的地方:不管rank是多少,我们是将其写在同一个程序里的,即使是 多机来分布式执行,只需要根据rank来控制实际执行的进程编号即可。即我们一开始初始化了 MPI_COMM_WORLD,所有的进程信息在这个变量中都可以获取得到,而world_rank就是我们获取 到的rank,其他也是同理。

另一个很有趣的例子:

int ping_pong_count = 0;
int partner_rank = (world_rank + 1) % 2;
while (ping_pong_count < PING_PONG_LIMIT) {
    if (world_rank == ping_pong_count % 2) {
        // Increment the ping pong count before you send it
        ping_pong_count++;
        MPI_Send(&ping_pong_count, 1, MPI_INT, partner_rank, 0, MPI_COMM_WORLD);
        printf("%d sent and incremented ping_pong_count %d to %d\n",
               world_rank, ping_pong_count,
               partner_rank);
    } else {
        MPI_Recv(&ping_pong_count, 1, MPI_INT, partner_rank, 0,
                 MPI_COMM_WORLD, MPI_STATUS_IGNORE);
        printf("%d received ping_pong_count %d from %d\n",
               world_rank, ping_pong_count, partner_rank);
    }
}

简单的解释:每一轮A发送消息过后,B在下一轮去接,数字是不断在增加的

上面的两个例子其实都是两个进程点对点的收发操作。如果有多个进程?如何设计一个环来进行循环 通信呢?

int token;
if (world_rank != 0) {
    MPI_Recv(&token, 1, MPI_INT, world_rank - 1, 0,
             MPI_COMM_WORLD, MPI_STATUS_IGNORE);
    printf("Process %d received token %d from process %d\n",
           world_rank, token, world_rank - 1);
} else {
    // Set the token's value if you are process 0
    token = -1;
}
MPI_Send(&token, 1, MPI_INT, (world_rank + 1) % world_size,
         0, MPI_COMM_WORLD);

// Now process 0 can receive from the last process.
if (world_rank == 0) {
    MPI_Recv(&token, 1, MPI_INT, world_size - 1, 0,
             MPI_COMM_WORLD, MPI_STATUS_IGNORE);
    printf("Process %d received token %d from process %d\n",
           world_rank, token, world_size - 1);
}

上面这段程序实现了一个不会发生死锁的环,基本的逻辑是:1. 生成一个token 2.然后 在下一轮由大一个值的rank接收,从而实现最终的循环。