GraphX源码学习及原理理解

GraphX 源码解析及原理理解

原理概述

分布式图的处理

GraphX

介绍

Spark的一个图数据处理的系统API。使用RDD(Resilient Distributed Property Graph)的数据机制:顶点和边都有属性的有向多重图。GraphX提供了Pregel的接口。对Graph视图的所有操作,最终都会转换成其关联的Table视图的RDD操作来完成。一个图的计算在逻辑上等价于一系列RDD的转换过程。因此,Graph最终具备了RDD的3个关键特性:不变性、分布性和容错性。其中最关键的是不变性。逻辑上,所有图的转换和操作都产生了一个新图;物理上,GraphX会有一定程度的不变顶点和边的复用优化,对用户透明。

储存模式

很明显,点分割的储存方式要求某一机器上的点的邻居节点都要有备份,这样需要双倍的储存空间来储存节点,一定程度上增加了储存空间,却减少了不同机器之间的交流成本。

GraphX在进行图分割时,有几种不同的分区(partition)策略,它通过PartitionStrategy专门定义这些策略。在PartitionStrategy中,总共定义了EdgePartition2DEdgePartition1DRandomVertexCut以及 CanonicalRandomVertexCut这四种不同的分区策略。下面分别介绍这几种策略。

  1. RandomVertexCut

    case object RandomVertexCut extends PartitionStrategy {
        override def getPartition(src: VertexId, dst: VertexId, numParts: PartitionID): PartitionID = {
          math.abs((src, dst).hashCode()) % numParts
        }
      }
    

    这个方法相对来说比较直接简单,也就是直接使用哈希化的随机策略来进行划分,复杂度很低但是容易在实际的图上为后续工作产生复杂。注意的是,这种划分下,两节点之间相同方向的边会分到同一分区。

  2. CanonicalRandomVertexCut

    case object CanonicalRandomVertexCut extends PartitionStrategy {
        override def getPartition(src: VertexId, dst: VertexId, numParts: PartitionID): PartitionID = {
          if (src < dst) {
            math.abs((src, dst).hashCode()) % numParts
          } else {
            math.abs((dst, src).hashCode()) % numParts
          }
        }
      }
    

    可以看出,这种方法和上面方法的区别就在于只取单一方向的边在同一区域。因为src节点和dst(destination)节点之间通过哈希值大小是有一个所谓的方向的,这样无论是什么方向都会分到同一区域中。

  3. EdgePartition1D

    case object EdgePartition1D extends PartitionStrategy {
        override def getPartition(src: VertexId, dst: VertexId, numParts: PartitionID): PartitionID = {
          val mixingPrime: VertexId = 1125899906842597L
          (math.abs(src * mixingPrime) % numParts).toInt
        }
      }
    

    这种方法仅仅根据源顶点id来将边分配到不同的分区。有相同源顶点的边会分配到同一分区。

  4. EdgePartition2D

    case object EdgePartition2D extends PartitionStrategy {
        override def getPartition(src: VertexId, dst: VertexId, numParts: PartitionID): PartitionID = {
          val ceilSqrtNumParts: PartitionID = math.ceil(math.sqrt(numParts)).toInt
          val mixingPrime: VertexId = 1125899906842597L
          if (numParts == ceilSqrtNumParts * ceilSqrtNumParts) {
            // Use old method for perfect squared to ensure we get same results
            val col: PartitionID = (math.abs(src * mixingPrime) % ceilSqrtNumParts).toInt
            val row: PartitionID = (math.abs(dst * mixingPrime) % ceilSqrtNumParts).toInt
            (col * ceilSqrtNumParts + row) % numParts
          } else {
            // Otherwise use new method
            val cols = ceilSqrtNumParts
            val rows = (numParts + cols - 1) / cols
            val lastColRows = numParts - rows * (cols - 1)
            val col = (math.abs(src * mixingPrime) % numParts / rows).toInt
            val row = (math.abs(dst * mixingPrime) % (if (col < cols - 1) rows else lastColRows)).toInt
            col * rows + row
          }
        }
      }
    

    这种方法相对来说是较为复杂的一个。

    ​ 这种分割方法同时使用到了源顶点id和目的顶点id。它使用稀疏边连接矩阵的2维区分来将边分配到不同的分区,从而保证顶点的备份数不大于2 * sqrt(numParts)的限制。这里numParts表示分区数。 这个方法的实现分两种情况,即分区数能完全开方和不能完全开方两种情况。当分区数能完全开方时,采用下面的方法:

 val col: PartitionID = (math.abs(src * mixingPrime) % ceilSqrtNumParts).toInt
 val row: PartitionID = (math.abs(dst * mixingPrime) % ceilSqrtNumParts).toInt
 (col * ceilSqrtNumParts + row) % numParts

  当分区数不能完全开方时,采用下面的方法。这个方法的最后一列允许拥有不同的行数。

val cols = ceilSqrtNumParts
val rows = (numParts + cols - 1) / cols
// 最后一列允许不同的行数
val lastColRows = numParts - rows * (cols - 1)
val col = (math.abs(src * mixingPrime) % numParts / rows).toInt
val row = (math.abs(dst * mixingPrime) % (if (col < cols - 1) rows else lastColRows)).toInt
col * rows + row

  下面举个例子来说明该方法。假设我们有一个拥有12个顶点的图,要把它切分到9台机器。我们可以用下面的稀疏矩阵来表示:

          __________________________________
     v0   | P0 *     | P1       | P2    *  |
     v1   |  ****    |  *       |          |
     v2   |  ******* |      **  |  ****    |
     v3   |  *****   |  *  *    |       *  |
          ----------------------------------
     v4   | P3 *     | P4 ***   | P5 **  * |
     v5   |  *  *    |  *       |          |
     v6   |       *  |      **  |  ****    |
     v7   |  * * *   |  *  *    |       *  |
          ----------------------------------
     v8   | P6   *   | P7    *  | P8  *   *|
     v9   |     *    |  *    *  |          |
     v10  |       *  |      **  |  *  *    |
     v11  | * <-E    |  ***     |       ** |
          ----------------------------------

  上面的例子中*表示分配到处理器上的边。E表示连接顶点v11v1的边,它被分配到了处理器P6上。为了获得边所在的处理器,我们将矩阵切分为sqrt(numParts) * sqrt(numParts)块。 注意,上图中与顶点v11相连接的边只出现在第一列的块(P0,P3,P6)或者最后一行的块(P6,P7,P8)中,这保证了V11的副本数不会超过2 *sqrt(numParts)份,在上例中即副本不能超过6份。

  在上面的例子中,P0里面存在很多边,这会造成工作的不均衡。为了提高均衡,我们首先用顶点id乘以一个大的素数,然后再shuffle顶点的位置。乘以一个大的素数本质上不能解决不平衡的问题,只是减少了不平衡的情况发生。

GraphX中的 几个概念

Vertices

GraphX中,vertices对应了名为VertexRDDRDD。这个RDD有顶点id和顶点属性两个成员变量。它的源码如下所示:

abstract class VertexRDD[VD](
    sc: SparkContext,
    deps: Seq[Dependency[_]]) extends RDD[(VertexId, VD)](sc, deps) 

VertexRDDRDD[(VertexID, VD)]的一个继承,vertexID就是顶点ID,VD表示顶点所带的属性类别。这表明了VertexRDD具有顶点id和顶点属性。

Edges

  在GraphX中,edges对应着EdgeRDD。这个RDD拥有三个成员变量,分别是源顶点id、目标顶点id以及边属性。它的源码如下所示:

abstract class EdgeRDD[ED](
    sc: SparkContext,
    deps: Seq[Dependency[_]]) extends RDD[Edge[ED]](sc, deps) 

  从源码中我们可以看到,EdgeRDD继承自RDD[Edge[ED]],即类型为Edge[ED]RDDEdge[ED]在后文会讲到。

Triplets

 在GraphX中,triplets对应着EdgeTriplet。它是一个三元组视图,这个视图逻辑上将顶点和边的属性保存为一个RDD[EdgeTriplet[VD, ED]]。可以通过下面的Sql表达式表示这个三元视图的含义:

SELECT src.id, dst.id, src.attr, e.attr, dst.attr
FROM edges AS e LEFT JOIN vertices AS src, vertices AS dst
ON e.srcId = src.Id AND e.dstId = dst.Id

  同样,也可以通过下面图解的形式来表示它的含义:

3.1

  EdgeTriplet的源代码如下所示:

class EdgeTriplet[VD, ED] extends Edge[ED] {
  //源顶点属性
  var srcAttr: VD = _ // nullValue[VD]
  //目标顶点属性
  var dstAttr: VD = _ // nullValue[VD]
  protected[spark] def set(other: Edge[ED]): EdgeTriplet[VD, ED] = {
    srcId = other.srcId
    dstId = other.dstId
    attr = other.attr
    this
  }

  EdgeTriplet类继承自Edge类,我们来看看这个父类:

case class Edge[@specialized(Char, Int, Boolean, Byte, Long, Float, Double) ED] (
    var srcId: VertexId = 0,
    var dstId: VertexId = 0,
    var attr: ED = null.asInstanceOf[ED])
  extends Serializable

  Edge类中包含源顶点id,目标顶点id以及边的属性。所以从源代码中我们可以知道,triplets既包含了边属性也包含了源顶点的id和属性、目标顶点的id和属性。

To be continued