其他分享
首页 > 其他分享> > 广播变量

广播变量

作者:互联网

广播变量(BrocadCast)是Spark的一大特性,通过将小数据广播分发到每个执行任务的节点(Executor),从而避免了计算过程中的频繁拉去数据的网络带宽等开销。

Spark批处理和Spark Streaming流处理均支持广播变量。广播变量支持各种类型数据,包括数据、列表、Map、RDD、DataFrame等。

 

 

object BroadcastWrapper {
@volatile private var instance: Broadcast[Dataset[Row]] = null

def getHistoryData(sparkSession: SparkSession) : DataFrame = {
HiveUtil.initHiveEnv(sparkSession)
var maxPartition = HiveUtil.getTableMaxPartition(sparkSession, taiciTableName, "ds")
val historyDataDF = sparkSession.sql(s"select * from ${taiciTableName} where ds = ${maxPartition}")
historyDataDF
}

def updateBC(sc: SparkContext, sparkSession: SparkSession, blocking: Boolean = false): Unit = {
if (instance != null) {
instance.unpersist(blocking)
instance = sc.broadcast(getHistoryData(sparkSession).as("t_taici_history"))
}
}

//Broadcast[Dataset[Row]]
def getInstance(sc: SparkContext, sparkSession: SparkSession): Broadcast[Dataset[Row]] = {
if (instance == null) {
synchronized {
if (instance == null) {
instance = sc.broadcast(getHistoryData(sparkSession: SparkSession).as("t_taici_history"))
}
}
}
instance
}
}

 

 

流处理是时时刻刻在运行着的程序,很多时候广播变量需要定时更新读取一些数据,例如数据库配置等,这些数据会不定时更新,因此,广播变量也需要触发更新机制。在实践中,业务需要实时感知台词IP的死链变化和付费状态的变化,而从腾讯视频数据流得到的cid数据不一定在上线的台词IP中,因此需要实时判断新来的cid数据是否在已上线的台词中,而已经上线的台词数据是固定的,一两周才会更新一次,所以需要将已上线的台词数据作为广播变量分发到每个计算节点,这样计算的时候就避免了频繁的数据拉取,现在的全量台词数据量在70万,28MB,距离大的数据还有距离,故可以采取这种方式。

 

广播变量如果是DataFrame或者RDD,要注意,不能在RDD或者DataFrame的map或foreach算子中操作广播变量DataFrame。因为RDD或DataFrame本就是分布式执行的,因此在分布式节点是无法按照分布式算子操作广播变量RDD的,使用广播变量广播RDD的主要作用在于join关联的时候能够方便的关联,而避免了频繁的拉取数据。

 

标签:变量,instance,sparkSession,DataFrame,RDD,广播
来源: https://www.cnblogs.com/renyang/p/16487710.html