GraphX源码学习及原理理解
GraphX 源码解析及原理理解
原理概述
分布式图的处理
-
当图数据规模太大以至于无法在单一的机器上实现算法运行时就要考虑用分布式算法来实现。比如分布式下的PageRank计算既是一个很经典的例子。
-
图算法的主要问题集中在图数据的分布式储存、图数据集分布式算法的设计以及图数据分布式算法的系统平台架构。
-
本文主要介绍分布式下图数据的平台以及使用,这里以Spark为例。
GraphX
介绍
Spark的一个图数据处理的系统API。使用RDD(Resilient Distributed Property Graph)的数据机制:顶点和边都有属性的有向多重图。GraphX提供了Pregel的接口。对Graph
视图的所有操作,最终都会转换成其关联的Table
视图的RDD
操作来完成。一个图的计算在逻辑上等价于一系列RDD
的转换过程。因此,Graph
最终具备了RDD
的3个关键特性:不变性、分布性和容错性。其中最关键的是不变性。逻辑上,所有图的转换和操作都产生了一个新图;物理上,GraphX
会有一定程度的不变顶点和边的复用优化,对用户透明。
储存模式
- 点分割的储存模式:
很明显,点分割的储存方式要求某一机器上的点的邻居节点都要有备份,这样需要双倍的储存空间来储存节点,一定程度上增加了储存空间,却减少了不同机器之间的交流成本。
GraphX
在进行图分割时,有几种不同的分区(partition
)策略,它通过PartitionStrategy
专门定义这些策略。在PartitionStrategy
中,总共定义了EdgePartition2D
、EdgePartition1D
、RandomVertexCut
以及 CanonicalRandomVertexCut
这四种不同的分区策略。下面分别介绍这几种策略。
-
RandomVertexCut
case object RandomVertexCut extends PartitionStrategy { override def getPartition(src: VertexId, dst: VertexId, numParts: PartitionID): PartitionID = { math.abs((src, dst).hashCode()) % numParts } }
这个方法相对来说比较直接简单,也就是直接使用哈希化的随机策略来进行划分,复杂度很低但是容易在实际的图上为后续工作产生复杂。注意的是,这种划分下,两节点之间相同方向的边会分到同一分区。
-
CanonicalRandomVertexCut
case object CanonicalRandomVertexCut extends PartitionStrategy { override def getPartition(src: VertexId, dst: VertexId, numParts: PartitionID): PartitionID = { if (src < dst) { math.abs((src, dst).hashCode()) % numParts } else { math.abs((dst, src).hashCode()) % numParts } } }
可以看出,这种方法和上面方法的区别就在于只取单一方向的边在同一区域。因为src节点和dst(destination)节点之间通过哈希值大小是有一个所谓的方向的,这样无论是什么方向都会分到同一区域中。
-
EdgePartition1D
case object EdgePartition1D extends PartitionStrategy { override def getPartition(src: VertexId, dst: VertexId, numParts: PartitionID): PartitionID = { val mixingPrime: VertexId = 1125899906842597L (math.abs(src * mixingPrime) % numParts).toInt } }
这种方法仅仅根据源顶点
id
来将边分配到不同的分区。有相同源顶点的边会分配到同一分区。 -
EdgePartition2D
case object EdgePartition2D extends PartitionStrategy { override def getPartition(src: VertexId, dst: VertexId, numParts: PartitionID): PartitionID = { val ceilSqrtNumParts: PartitionID = math.ceil(math.sqrt(numParts)).toInt val mixingPrime: VertexId = 1125899906842597L if (numParts == ceilSqrtNumParts * ceilSqrtNumParts) { // Use old method for perfect squared to ensure we get same results val col: PartitionID = (math.abs(src * mixingPrime) % ceilSqrtNumParts).toInt val row: PartitionID = (math.abs(dst * mixingPrime) % ceilSqrtNumParts).toInt (col * ceilSqrtNumParts + row) % numParts } else { // Otherwise use new method val cols = ceilSqrtNumParts val rows = (numParts + cols - 1) / cols val lastColRows = numParts - rows * (cols - 1) val col = (math.abs(src * mixingPrime) % numParts / rows).toInt val row = (math.abs(dst * mixingPrime) % (if (col < cols - 1) rows else lastColRows)).toInt col * rows + row } } }
这种方法相对来说是较为复杂的一个。
这种分割方法同时使用到了源顶点
id
和目的顶点id
。它使用稀疏边连接矩阵的2维区分来将边分配到不同的分区,从而保证顶点的备份数不大于2 * sqrt(numParts)
的限制。这里numParts
表示分区数。 这个方法的实现分两种情况,即分区数能完全开方和不能完全开方两种情况。当分区数能完全开方时,采用下面的方法:
val col: PartitionID = (math.abs(src * mixingPrime) % ceilSqrtNumParts).toInt
val row: PartitionID = (math.abs(dst * mixingPrime) % ceilSqrtNumParts).toInt
(col * ceilSqrtNumParts + row) % numParts
当分区数不能完全开方时,采用下面的方法。这个方法的最后一列允许拥有不同的行数。
val cols = ceilSqrtNumParts
val rows = (numParts + cols - 1) / cols
// 最后一列允许不同的行数
val lastColRows = numParts - rows * (cols - 1)
val col = (math.abs(src * mixingPrime) % numParts / rows).toInt
val row = (math.abs(dst * mixingPrime) % (if (col < cols - 1) rows else lastColRows)).toInt
col * rows + row
下面举个例子来说明该方法。假设我们有一个拥有12个顶点的图,要把它切分到9台机器。我们可以用下面的稀疏矩阵来表示:
__________________________________
v0 | P0 * | P1 | P2 * |
v1 | **** | * | |
v2 | ******* | ** | **** |
v3 | ***** | * * | * |
----------------------------------
v4 | P3 * | P4 *** | P5 ** * |
v5 | * * | * | |
v6 | * | ** | **** |
v7 | * * * | * * | * |
----------------------------------
v8 | P6 * | P7 * | P8 * *|
v9 | * | * * | |
v10 | * | ** | * * |
v11 | * <-E | *** | ** |
----------------------------------
上面的例子中*
表示分配到处理器上的边。E
表示连接顶点v11
和v1
的边,它被分配到了处理器P6
上。为了获得边所在的处理器,我们将矩阵切分为sqrt(numParts) * sqrt(numParts)
块。 注意,上图中与顶点v11
相连接的边只出现在第一列的块(P0,P3,P6)
或者最后一行的块(P6,P7,P8)
中,这保证了V11
的副本数不会超过2 *sqrt(numParts)
份,在上例中即副本不能超过6份。
在上面的例子中,P0
里面存在很多边,这会造成工作的不均衡。为了提高均衡,我们首先用顶点id
乘以一个大的素数,然后再shuffle
顶点的位置。乘以一个大的素数本质上不能解决不平衡的问题,只是减少了不平衡的情况发生。
GraphX中的 几个概念
Vertices
在GraphX
中,vertices
对应了名为VertexRDD
的RDD
。这个RDD
有顶点id
和顶点属性两个成员变量。它的源码如下所示:
abstract class VertexRDD[VD](
sc: SparkContext,
deps: Seq[Dependency[_]]) extends RDD[(VertexId, VD)](sc, deps)
VertexRDD
是RDD[(VertexID, VD)]
的一个继承,vertexID
就是顶点ID,VD表示顶点所带的属性类别。这表明了VertexRDD具有顶点id和顶点属性。
Edges
在GraphX
中,edges
对应着EdgeRDD
。这个RDD
拥有三个成员变量,分别是源顶点id
、目标顶点id
以及边属性。它的源码如下所示:
abstract class EdgeRDD[ED](
sc: SparkContext,
deps: Seq[Dependency[_]]) extends RDD[Edge[ED]](sc, deps)
从源码中我们可以看到,EdgeRDD
继承自RDD[Edge[ED]]
,即类型为Edge[ED]
的RDD
。Edge[ED]
在后文会讲到。
Triplets
在GraphX
中,triplets
对应着EdgeTriplet
。它是一个三元组视图,这个视图逻辑上将顶点和边的属性保存为一个RDD[EdgeTriplet[VD, ED]]
。可以通过下面的Sql
表达式表示这个三元视图的含义:
SELECT src.id, dst.id, src.attr, e.attr, dst.attr
FROM edges AS e LEFT JOIN vertices AS src, vertices AS dst
ON e.srcId = src.Id AND e.dstId = dst.Id
同样,也可以通过下面图解的形式来表示它的含义:
EdgeTriplet
的源代码如下所示:
class EdgeTriplet[VD, ED] extends Edge[ED] {
//源顶点属性
var srcAttr: VD = _ // nullValue[VD]
//目标顶点属性
var dstAttr: VD = _ // nullValue[VD]
protected[spark] def set(other: Edge[ED]): EdgeTriplet[VD, ED] = {
srcId = other.srcId
dstId = other.dstId
attr = other.attr
this
}
EdgeTriplet
类继承自Edge
类,我们来看看这个父类:
case class Edge[@specialized(Char, Int, Boolean, Byte, Long, Float, Double) ED] (
var srcId: VertexId = 0,
var dstId: VertexId = 0,
var attr: ED = null.asInstanceOf[ED])
extends Serializable
Edge
类中包含源顶点id
,目标顶点id
以及边的属性。所以从源代码中我们可以知道,triplets
既包含了边属性也包含了源顶点的id
和属性、目标顶点的id
和属性。
To be continued