MPI
MPI and Graphs
最近在做图计算中想用分布式实现一下,找到一些使用MPI接口来实现基础计算的例子。我其实到现在并没有 完全理解MPI这个定义。Pregel GraphX这些分布式图计算的实现方式似乎都是基于消息传递 模型来实现的,但是似乎和MPI协议不同,似乎是更加利于复杂任务的抽象,计算的多样性更强。 现在的理解是,类MP的计算模型应该都是基于MPI这种接口的设计思想来开发的,但MPI是最基本的 分布式消息传递接口架构,所以支持更加贴近硬件。本文记录在学习、使用MPI中的一些 想法和思考,主要是面向图计算。
MPI初步
MPI是一种基于消息传递的统一接口,所以并不是一种软件,可以看作一种统一协议,便于 分布式程序员的工作可以互通。而基于MPI,有一些开源的实现可以让我们在某些语言中 编译MPI这种接口从而处理我们的计算任务,比如OpenMPI MPICH.
MPI的通信
MPI就是一种消息传递的接口,而这其中包含了一些需要理解的概念。首先是communicator通讯器 communicator 定义了一组能够互相发送消息的进程,这些进程各自有一个rank来进行通信的顺序定义。 每一个进程发送的消息包含一个tag,用来定义消息的接受类型。
通信模式有两种,一种是point-to-point也就是一个进程向另一个进程发送消息,这也是最常见的 情况,而有时候某一个进程想要广播一条消息,这时候用点对点其实很麻烦,所以有 collective的集合通讯模式
安装
单机
OpenMPI和MPICH都是很好的实现,使用方式相同, 官网下载稳定版然后编译安装,没什么好说的。
多机分布式安装
TO be explored,我的实验希望能够在32台高内存机器上实际运行。 Here is a tutorial: MPICH with LAN
Hello Wolrd
Here is a small hello world example of MPI Programming:
#include <mpi.h>
#include <stdio.h>
int main(int argc, char** argv) {
// 初始化 MPI 环境
MPI_Init(NULL, NULL);
/* 在hello world这个简单的例子中MPI_Init的两个参数没啥用,但复杂情况下会有用的*/
// 通过调用以下方法来得到所有可以工作的进程数量
int world_size;
MPI_Comm_size(MPI_COMM_WORLD, &world_size);
// 得到当前进程的秩
int world_rank;
MPI_Comm_rank(MPI_COMM_WORLD, &world_rank);
// 得到当前进程的名字
char processor_name[MPI_MAX_PROCESSOR_NAME];
int name_len;
MPI_Get_processor_name(processor_name, &name_len);
// 打印一条带有当前进程名字,秩以及
// 整个 communicator 的大小的 hello world 消息。
printf("Hello world from processor %s, rank %d out of %d processors\n",
processor_name, world_rank, world_size);
// 释放 MPI 的一些资源
MPI_Finalize();
}
NOTES: MPI_COMM_WORLD
是MPI
程序内置变量,表示整个MPI
程序的communicator
情况。
- 编译方法:
EXECS=mpi_hello_world
MPICC?=mpicc
all: ${EXECS}
mpi_hello_world: mpi_hello_world.c
${MPICC} -o mpi_hello_world mpi_hello_world.c
clean:
rm ${EXECS}
以上就是makefile的写法,简单说就是要套一层mpicc来编译c语言;mpic++来编译C++
MPI核心实现单元 接收和发送
几个核心概念:消息形式、缓存、网络通信、发送位置
在MPI进行消息通讯时,一般来说是将一些需要发送的数据打包成一个消息的包,放到缓存之中, 从而别的特定rank来决定收取形式,也就是通过rank来实现收取。 而有时候需要区分消息的形式,也就是特定消息才会被接受,这时候需要利用到tag也就是 标签来进行判断。举个例子,A发送了一些不同类型(也就是带有不同tag)的消息给B, 一方面B无论如何是要区分这些消息类型的,从而便于分类处理,另一方面,如果此时此刻B 只想要处理其中某一种类型的消息,可以先选择性接受其中一种tag,这时候其他类型消息会先被缓存。
Eg.
MPI_Send(
void* data, // 数据缓存
int count, // 数据数量
MPI_Datatype datatype, // 数据类型
int destination, // 数据发送到的rank
int tag, // 数据的tag
MPI_Comm communicatorm // 数据所使用的communicator
);
MPI_Recv(
void* data,
int count,
MPI_Datatype,
int source,
int tag,
MPI_Comm,
MPI_Status* confirm
)
与Send相似,只是多一个status来确认是否消息已经被收到。
下面是一段实际的消息发送和接收的代码
// 得到当前进程的 rank 以及整个 communicator 的大小
int world_rank;
MPI_Comm_rank(MPI_COMM_WORLD, &world_rank);
int world_size;
MPI_Comm_size(MPI_COMM_WORLD, &world_size);
int number;
if (world_rank == 0) {
number = -1;
MPI_Send(&number, 1, MPI_INT, 1, 0, MPI_COMM_WORLD);
} else if (world_rank == 1) {
MPI_Recv(&number, 1, MPI_INT, 0, 0, MPI_COMM_WORLD,
MPI_STATUS_IGNORE);
printf("Process 1 received number %d from process 0\n",
number);
}
这里一个虽然很显然需要理解的地方:不管rank是多少,我们是将其写在同一个程序里的,即使是 多机来分布式执行,只需要根据rank来控制实际执行的进程编号即可。即我们一开始初始化了 MPI_COMM_WORLD,所有的进程信息在这个变量中都可以获取得到,而world_rank就是我们获取 到的rank,其他也是同理。
另一个很有趣的例子:
int ping_pong_count = 0;
int partner_rank = (world_rank + 1) % 2;
while (ping_pong_count < PING_PONG_LIMIT) {
if (world_rank == ping_pong_count % 2) {
// Increment the ping pong count before you send it
ping_pong_count++;
MPI_Send(&ping_pong_count, 1, MPI_INT, partner_rank, 0, MPI_COMM_WORLD);
printf("%d sent and incremented ping_pong_count %d to %d\n",
world_rank, ping_pong_count,
partner_rank);
} else {
MPI_Recv(&ping_pong_count, 1, MPI_INT, partner_rank, 0,
MPI_COMM_WORLD, MPI_STATUS_IGNORE);
printf("%d received ping_pong_count %d from %d\n",
world_rank, ping_pong_count, partner_rank);
}
}
简单的解释:每一轮A发送消息过后,B在下一轮去接,数字是不断在增加的
上面的两个例子其实都是两个进程点对点的收发操作。如果有多个进程?如何设计一个环来进行循环 通信呢?
int token;
if (world_rank != 0) {
MPI_Recv(&token, 1, MPI_INT, world_rank - 1, 0,
MPI_COMM_WORLD, MPI_STATUS_IGNORE);
printf("Process %d received token %d from process %d\n",
world_rank, token, world_rank - 1);
} else {
// Set the token's value if you are process 0
token = -1;
}
MPI_Send(&token, 1, MPI_INT, (world_rank + 1) % world_size,
0, MPI_COMM_WORLD);
// Now process 0 can receive from the last process.
if (world_rank == 0) {
MPI_Recv(&token, 1, MPI_INT, world_size - 1, 0,
MPI_COMM_WORLD, MPI_STATUS_IGNORE);
printf("Process %d received token %d from process %d\n",
world_rank, token, world_size - 1);
}
上面这段程序实现了一个不会发生死锁的环,基本的逻辑是:1. 生成一个token 2.然后 在下一轮由大一个值的rank接收,从而实现最终的循环。