Spark Graphx Pregel(pregel参数详解,pregel调用实现过程的详细解释)
作者:互联网
Spark Graphx Pregel
一.Pregel概述
1.什么是pregel?
Pregel是Google 提出的用于大规模分布式图计算框架。Pregel是个强大的基于图的迭代算法。
2.pregel应用场景
一般pregel可以在图中进行迭代计算,如求最短路径,关键路径,n度关系等。
二.Pregel源码及参数解释
1.源码
def pregel[A: ClassTag](
initialMsg: A,
maxIterations: Int = Int.MaxValue,
activeDirection: EdgeDirection = EdgeDirection.Either)(
vprog: (VertexId, VD, A) => VD,
sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)],
mergeMsg: (A, A) => A)
: Graph[VD, ED] = {
Pregel(graph, initialMsg, maxIterations, activeDirection)(vprog, sendMsg, mergeMsg)
}
2.参数详细解释
(1)initialMsg
初始化消息。这个初始化消息会被用来初始化图中的每个节点的属性,在pregel调用时,会首先在图上使用mapVertices来根据initialMsg的值更新每个几点的值。至于如何更新,则由vprog参数而定,vprog函数就接收了initialMsg消息作为参数来更新对应节点的值。
(2)maxIteration
最大迭代次数
(3)activeDirection
表示边的活跃方向。
-
活跃节点:是指在某一轮迭代中,pregel会以sendMsg和mergeMsg为参数来调用graph的aggregateMessage方法后收到消息的节点
-
活跃消息:是这轮迭代中所有被成功收到的消息
则有的边src节点是活跃节点,有的dst节点是活跃节点,有的边两端节点都是活跃节点。如果activeDirection参数被指定为"EdgeDirection.out",则在下一轮迭代中,只接收消息的出边(src—>dst)才会执行sendMsg函数。也就是说,sendMsg回调函数会过滤掉(dst—>src)的edgeTriplet上下文参数
(4)vprog
节点变换函数。
在初始时,以及每轮迭代后,pregel会根据上一轮使用的msg和这里的vprog函数在图上调用joinVertices方法变化每个收到消息的节点。
(5)sendMsg
消息发送函数。该函数的运行参数是一个代表边的上下文,pregel在调用aggregateMessage是,会将EdgeContext转换成EdgeTriplet对象来使用,用户需要通过Iterator[(VertexID,A)]指定发送哪些消息,发送哪些节点,发送哪些内容;因为在一条边上可以发送多个消息,如sendToDst,sendToSrc,所以这里是个Iterator,每个元素是一个tuple,其中的vertexId便是接收此消息的节点id,只能是该边上的srcId或者dstId,而A就是要发送的内容。
因此,如果要由src发送一条消息A到dst,则有:Iterator((dstId,A)),如果什么消息也不发送,则返回一个空的Iterator:Iterator.empty
(6)mergeMsg
邻居节点收到多条消息时的合并逻辑。
区别与vprog,mergeMsg仅能合并消息内容,但合并后并不会更新到节点中去,而vprog函数可以根据收到的消息(就是mergeMsg产生的结果)更新节点属性
三.Pregel计算顶点5 到 其他各顶点的 最短距离
1.图信息
(1)顶点信息
(1L, ("Alice", 28)),
(2L, ("Bob", 27)),
(3L, ("Charlie", 65)),
(4L, ("David", 42)),
(5L, ("Ed", 55)),
(6L, ("Fran", 50))
(2)边信息
Edge(2L, 1L, 7),
Edge(2L, 4L, 2),
Edge(3L, 2L, 4),
Edge(3L, 6L, 3),
Edge(4L, 1L, 1),
Edge(2L, 5L, 2),
Edge(5L, 3L, 8),
Edge(5L, 6L, 3)
2.Pregel原理分析
- 顶点的两个状态:
- 钝化态:类似于休眠,不做任何处理
- 激活态:可以进行数据的接受和发送
- 顶点能够处于激活状态需要的条件
- 成功收到消息
- 成功发送任何一条消息
(1)调用pregel方法之前
先把图的各个顶点的属性初始化,即顶点5到自己的距离为0,所以设为0,其他顶点都设为正无穷大Double.PositiveInifinity
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-jFtfs4qL-1611387816591)(C:\Users\86188\Desktop\截图\0123\1.jpg)]
(2)当调用pregel方法开始
首先,所有顶点都将接收到一条初始消息initialMsg ,使所有顶点都处于激活态(红色标识的节点)
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-6EUoJAep-1611387816593)(C:\Users\86188\Desktop\截图\0123\2.png)]
(3)第一次迭代开始
所有顶点以EdgeDirection.Out的边方向调用sendMsg方法发送消息给目标顶点,如果 源顶点的属性+边的属性<目标顶点的属性,则发送消息。否则不发送。
5—>3(0+8<Double.Infinity,成功),
5—>6(0+3<Double.Infinity,成功),
3—>2(Double.Infinity+4>Double.Infinity,失败),
3—>6(Double.Infinity+3>Double.Infinity,失败),
2—>1(Double.Infinity+7>Double.Infinity,失败),
2—>4(Double.Infinity+2>Double.Infinity,失败),
2—>5(Double.Infinity+2>Double.Infinity,失败),
4—>1(Double.Infinity+1>Double.Infinity,失败)
sendMsg方法执行完成之后,根据顶点处于激活态的条件,顶点5成功地分别给顶点3和顶点6发送消息,顶点3 和 顶点6 也成功地接受到了消息。
所以此时只有5,3,6三个顶点处于激活状态,其他顶点全部钝化。然后收到消息的顶点3和顶点6都调用vprog方法,将收到的消息与自身属性合并。如图所示,至此第一次迭代结束。
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-1sRXSNr2-1611387816594)(C:\Users\86188\Desktop\截图\0123\3 (2)].png)
(4)第二次迭代开始
顶点3 给 顶点6 发送消息失败,顶点3 给 顶点2 发送消息成功,此时 顶点3 成功发送消息,顶点2 成功接收消息,所以顶点2 和 顶点3 都成为激活状态,其他顶点都成为钝化状态。然后顶点2 调用vprog方法,将收到的消息 与 自身的属性合并。至此第二次迭代结束
3—>2(8+4<Double.Infinity,成功),
3—>6(8+3>3,失败)
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-AVCGzoBT-1611387816595)(C:\Users\86188\Desktop\截图\0123\4.png)]
(5)第三次迭代开始
顶点3分别发送消息给顶点2失败 和 顶点6失败,顶点2 分别发消息给 顶点1成功、顶点4成功、顶点5失败 ,所以 顶点2、顶点1、顶点4 成为激活状态,其他顶点为钝化状态。顶点1 和 顶点4分别调用vprog方法,将收到的消息 与 自身的属性合并。至此第三次迭代结束
3—>2(8+4=12,失败),
3—>6(8+3>3,失败)
2—>1(12+7<Double.Infinity,成功)
2—>4(12+2<Double.Infinity,成功)
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-33HaS97c-1611387816596)(C:\Users\86188\Desktop\截图\0123\5.png)]
(6)第四次迭代开始
顶点2 分别发送消息给 顶点1失败 和 顶点4失败。顶点4 给 顶点1发送消息成功,顶点1 和 顶点4 进入激活状态,其他顶点进入钝化状态。顶点1 调用vprog方法,将收到的消息 与 自身的属性合并
2—>1(12+7=19,失败)
2—>4(12+2=14,失败)
4—>1(14+1<19,成功)
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-NOvS64ve-1611387816597)(C:\Users\86188\Desktop\截图\0123\6.1.png)]
(7)第五次迭代开始
顶点4 再给 顶点1发送消息失败,顶点4 和 顶点1 进入钝化状态,此时全图都进入钝化状态。至此结束
4—>1(14+1=15,失败)
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-fHg45sAJ-1611387816598)(C:\Users\86188\Desktop\截图\0123\7.png)]
结论:由上述分析过程可知,顶点5到其他各顶点距离全部算出,
5—>1 (15)
5—>2 (12)
5—>3 (8)
5—>4 (14)
5—>6 (3)
3.代码实现
package suanfa
import org.apache.spark.graphx._
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object PregelDemo {
def main(args: Array[String]): Unit = {
//TODO:1.创建SparkContext对象
val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("pregeldemo")
val sc = new SparkContext(conf)
//TODO:2、创建顶点
val vertexArray = Array(
(1L, ("Alice", 28)),
(2L, ("Bob", 27)),
(3L, ("Charlie", 65)),
(4L, ("David", 42)),
(5L, ("Ed", 55)),
(6L, ("Fran", 50))
)
val vertexRDD: RDD[(VertexId, (String, Int))] = sc.makeRDD(vertexArray)
//TODO:3、创建边,边的属性代表 相邻两个顶点之间的距离
val edgeArray = Array(
Edge(2L, 1L, 7),
Edge(2L, 4L, 2),
Edge(3L, 2L, 4),
Edge(3L, 6L, 3),
Edge(4L, 1L, 1),
Edge(2L, 5L, 2),
Edge(5L, 3L, 8),
Edge(5L, 6L, 3)
)
val edgeRDD: RDD[Edge[Int]] = sc.makeRDD(edgeArray)
//TODO:4、创建图(使用apply方式创建)
val graph1 = Graph(vertexRDD, edgeRDD)
/* ************************** 使用pregle算法计算 ,顶点5 到 各个顶点的最短距离 ************************** */
//TODO:5、调用pregel算法
//todo:(1)設置頂點信息
//被计算的图中 起始顶点id
val srcVertexId = 5L
//给每个顶点赋属性值
val initialGraph: Graph[Double, Int] = graph1.mapVertices { case (vid, (name, age)) => if (vid == srcVertexId) 0.0 else Double.PositiveInfinity }
println(" 1.每个顶点的属性值如下")
initialGraph.vertices.collect().foreach(println)
println("---------------开始调用pregel---------------")
//todo:(2)調用pregel
val pregelGraph: Graph[Double, PartitionID] = initialGraph.pregel(
Double.PositiveInfinity,//每个点的初始值,无穷大
Int.MaxValue, //最大迭代次数
EdgeDirection.Out //发送消息的方向
)(//todo:vprog:接受到的消息和自己的消息进行合并
//这个顶点sendMsg发送的顶点信息
// 三个参数 vprog: (VertexId, VD, A) => VD,
//VertexId当前节点的顶点id,VD当前顶点的属性,A接收到的信息
//返回值:当前顶点更新后的属性
(vid: VertexId, vd: Double, distMsg: Double) => {
println(s"----------顶点${vid}调用vprog:接受到的消息和自己的消息进行合并----------------")
//即将接收到的信息和顶点属性进行比较,取最小值进行更新该顶点属性
val minDist = math.min(vd, distMsg)
println(s"顶点${vid},顶点属性${vd},收到消息${distMsg},合并后的属性${minDist}")
minDist
},
//todo: sendMsg:发送消息,如果自己的消息+权重<目的地的消息,则发送
//一个参数 : sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)]
//即表的信息
//返回值:发送成功后的节点id和发送的消息的一个迭代器
(edgeTriplet: EdgeTriplet[Double, PartitionID]) => {
println(s"----------调用${edgeTriplet.srcId}调用sendMsg发送消息给顶点${edgeTriplet.dstId}------------")
if (edgeTriplet.srcAttr + edgeTriplet.attr < edgeTriplet.dstAttr) {
println(s"顶点${edgeTriplet.srcId}给顶点${edgeTriplet.dstId} 发送消息 ${edgeTriplet.srcAttr + edgeTriplet.attr}")
Iterator[(VertexId, Double)]((edgeTriplet.dstId, edgeTriplet.srcAttr + edgeTriplet.attr))
} else {
Iterator.empty
}
},
//todo:mergeMsg多条接收消息,mergeMessage,取小合并多条消息
// mergeMsg: (A, A) => A)
(msg1: Double, msg2: Double) => {
println(s"----------${msg1},${msg2}调用mergeMsg:合并多条接收消息------------")
println(msg1,msg2)
math.min(msg1, msg2)
}
)
}
}
输出结果:
//初始化每个节点的属性
1.每个顶点的属性值如下
(1,Infinity)
(2,Infinity)
(3,Infinity)
(4,Infinity)
(5,0.0)
(6,Infinity)
---------------开始调用pregel---------------
//最开始每个节点都会收到初始化的属性Double.PositiveInfinity,会通过调用vprog(接受到的消息和自己的消息进行合并)对节点属性进行合并
----------顶点5调用vprog:接受到的消息和自己的消息进行合并----------------
顶点5,顶点属性0.0,收到消息Infinity,合并后的属性0.0
----------顶点6调用vprog:接受到的消息和自己的消息进行合并----------------
顶点6,顶点属性Infinity,收到消息Infinity,合并后的属性Infinity
----------顶点1调用vprog:接受到的消息和自己的消息进行合并----------------
顶点1,顶点属性Infinity,收到消息Infinity,合并后的属性Infinity
----------顶点3调用vprog:接受到的消息和自己的消息进行合并----------------
顶点3,顶点属性Infinity,收到消息Infinity,合并后的属性Infinity
----------顶点2调用vprog:接受到的消息和自己的消息进行合并----------------
顶点2,顶点属性Infinity,收到消息Infinity,合并后的属性Infinity
----------顶点4调用vprog:接受到的消息和自己的消息进行合并----------------
顶点4,顶点属性Infinity,收到消息Infinity,合并后的属性Infinity
//---------------第一次迭代---------------
//第一次进行迭代,按照发送消息的方向发送消息,可知,第一次迭代只有顶点5发送消息到顶点6和3是满足sendMsg的条件的。即发送成功,
----------调用3调用sendMsg发送消息给顶点6------------
----------调用5调用sendMsg发送消息给顶点6------------
顶点5给顶点6 发送消息 3.0
----------调用2调用sendMsg发送消息给顶点5------------
----------调用3调用sendMsg发送消息给顶点2------------
----------调用4调用sendMsg发送消息给顶点1------------
----------调用5调用sendMsg发送消息给顶点3------------
顶点5给顶点3 发送消息 8.0
----------调用2调用sendMsg发送消息给顶点4------------
----------调用2调用sendMsg发送消息给顶点1------------
//顶点6和顶点3接收到信息后就会调用vprog进行合并属性
----------顶点6调用vprog:接受到的消息和自己的消息进行合并--------------
顶点6,顶点属性Infinity,收到消息3.0,合并后的属性3.0
----------顶点3调用vprog:接受到的消息和自己的消息进行合并----------------
顶点3,顶点属性Infinity,收到消息8.0,合并后的属性8.0
//---------------第二次迭代---------------
//经过第一次迭代后,3,5,6处于激活状态
//顶点3有出边,顶点6没有出边,可知,顶点3给顶点2发送消息成功,给顶点6发送失败
----------调用3调用sendMsg发送消息给顶点2------------
顶点3给顶点2 发送消息 12.0
----------调用3调用sendMsg发送消息给顶点6------------
//收到消息的顶点2调用vprog合并顶点属性
----------顶点2调用vprog:接受到的消息和自己的消息进行合并----------------
顶点2,顶点属性Infinity,收到消息12.0,合并后的属性12.0
//---------------第三次迭代---------------
----------调用2调用sendMsg发送消息给顶点5------------
----------调用2调用sendMsg发送消息给顶点1------------
顶点2给顶点1 发送消息 19.0
----------调用2调用sendMsg发送消息给顶点4------------
顶点2给顶点4 发送消息 14.0
----------顶点1调用vprog:接受到的消息和自己的消息进行合并----------------
顶点1,顶点属性Infinity,收到消息19.0,合并后的属性19.0
----------顶点4调用vprog:接受到的消息和自己的消息进行合并----------------
顶点4,顶点属性Infinity,收到消息14.0,合并后的属性14.0
//---------------第四次迭代---------------
----------调用4调用sendMsg发送消息给顶点1------------
顶点4给顶点1 发送消息 15.0
----------顶点1调用vprog:接受到的消息和自己的消息进行合并----------------
顶点1,顶点属性19.0,收到消息15.0,合并后的属性15.0
//---------- 第五次迭代不用发送消息,所有节点钝化 -----------------
通过对输出结果进行分析,可知,大致流程是首先在未调用pregel方法之前给每个节点一个初始值,然后通过调用pregel给每个顶点收到一条初始消息initialMsg,所有顶点处于激活状态,调用vprog对每个节点进行属性合并。然后每个激活态的顶点开始调用sendMsg根据 EdgeDirection.Out方向进行消息发送,将发送成功的顶点进行激活,其他顶点进行钝化处理,接收消息成功的顶点开始调用vprog进行合并顶点信息。这些被激活的顶点进行再次迭代,直到所有顶点钝化结束完成
标签:调用,Infinity,Pregel,pregel,发送,消息,vprog,顶点,Spark 来源: https://blog.csdn.net/May_J_Oldhu/article/details/113053379