Sparkstreaming的整合和其他一些知识点
作者:互联网
flume与spark的整合
flume作为日志实时采集的框架,可以与SparkStreaming实时处理框架进行对接,flume实时产生数据,sparkStreaming做实时处理。
Spark Streaming对接FlumeNG有两种方式,一种是FlumeNG将消息Push推给Spark Streaming,还有一种是Spark Streaming从flume 中Poll拉取数据。
为了事务保证,使用TailDirSource
flume采集频率:
flume采集数据用作离线处理:可能 是每隔半个小时采集一次
flume采集的数据用作实时处理:实时的采集
如何保证flume不宕机:可以写一个shell脚本监控flume正常运行
一般不会与flume进行整合,会有弊端,flume采集的数据量可能比较大,可能有时候又比较小,就是说flume没办法对数据流量进行控制。
一般都是与kafka进行整合,使用kafka实现数据的限流的作用
kafka与spark的整合
首先需要对kafka进行修改配置
修改kafka配置文件
第一台机器修改kafka配置文件server.properties
broker.id=0
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/export/servers/kafka_2.11-1.0.0/logs
num.partitions=2
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.flush.interval.messages=10000
log.flush.interval.ms=1000
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=node01:2181,node02:2181,node03:2181
zookeeper.connection.timeout.ms=6000
group.initial.rebalance.delay.ms=0
delete.topic.enable=true
host.name=node01
第二台机器修改kafka配置文件server.properties
broker.id=1
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/export/servers/kafka_2.11-1.0.0/logs
num.partitions=2
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.flush.interval.messages=10000
log.flush.interval.ms=1000
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=node01:2181,node02:2181,node03:2181
zookeeper.connection.timeout.ms=6000
group.initial.rebalance.delay.ms=0
delete.topic.enable=true
host.name=node02
第三台机器修改kafka配置文件server.properties
broker.id=2
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/export/servers/kafka_2.11-1.0.0/logs
num.partitions=2
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.flush.interval.messages=10000
log.flush.interval.ms=1000
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=node01:2181,node02:2181,node03:2181
zookeeper.connection.timeout.ms=6000
group.initial.rebalance.delay.ms=0
delete.topic.enable=true
host.name=node03
2.4启动kafka集群
三台机器启动kafka服务
bin/kafka-server-start.sh config/server.properties > /dev/null 2>&1 & 后台启动命令
3、kafka的命令行的管理使用
创建topic
kafka-topics.sh --create --partitions 3 --replication-factor 2 --topic kafkatopic --zookeeper node01:2181,node02:2181,node03:2181
模拟生产者
kafka-console-producer.sh --broker-list node01:9092,node02:9092,node03:9092 --topic kafkatopic
模拟消费者
kafka-console-consumer.sh --from-beginning --topic kafkatopic --zookeeper node01:2181,node02:2181,node03:2181
sparkStreaming与kafka的整合,两个版本
kafka0.8版本:
接收数据的两种方式
ReceiverDstream:使用HighLeveAPI进行消费,offset保存在zk当中,使用at least once消费模式, 会造成数据的重复消费。
每隔一段时间,默认自动提交一次offset到zk当中去保存
DirectDstream:
使用是LowLeveAPI进行消费,offset保存在默认的topic里面,使用at most once消费模式,会造成数据丢失
默认是按照最新的offset进行消费
kafka0.10版本(一般用这个):
接收数据只有一种方式
DirectDStream:使用lowLeveAPI进行消费,offset默认保存在topic里面,配合手动提交offset,实现exactly once的消费模式
导入jar包依赖:
<!-- <dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-8_2.11</artifactId>
<version>2.2.0</version>
</dependency>-->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
<version>2.2.0</version>
</dependency>
源代码:
import com.google.common.hash.HashingOutputStream
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.kafka010._
object SparkStreaming10Direct {
def main(args: Array[String]): Unit = {
/**
* [K, V](
ssc: StreamingContext,
locationStrategy: LocationStrategy,
consumerStrategy: ConsumerStrategy[K, V]
)
*/
val sparkContext = new SparkContext(new SparkConf().setMaster("local[8]").setAppName("sparKafka10DirectStream"))
sparkContext.setLogLevel("WARN")
val streamingContext = new StreamingContext(sparkContext,Seconds(5))
val consistent = LocationStrategies.PreferConsistent
/**
* [K, V](
topics: ju.Collection[jl.String],
kafkaParams: ju.Map[String, Object])
*/
//创建topic
val brokers= "node01:9092,node02:9092,node03:9092"
val sourcetopic="sparkafka";
//创建消费者组
var group="sparkafkaGroup"
//消费者配置
val kafkaParam = Map(
"bootstrap.servers" -> brokers,//用于初始化链接到集群的地址
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
//用于标识这个消费者属于哪个消费团体
"group.id" -> group,
//如果没有初始化偏移量或者当前的偏移量不存在任何服务器上,可以使用这个配置属性
//可以使用这个配置,latest自动重置偏移量为最新的偏移量
"auto.offset.reset" -> "latest",
//如果是true,则这个消费者的偏移量会在后台自动提交
//offset不要自动提交,我们等会儿处理完了数据之后,手动提交offset
"enable.auto.commit" -> (false: java.lang.Boolean)
);
//使用 ConsumerStrategies.Subscribe来订阅某一个topic当中的数据
val consumerStrategy: ConsumerStrategy[String, String] = ConsumerStrategies.Subscribe[String,String](Array("sparkafka"),kafkaParam)
//sealed 使用这个关键字修饰的类叫做密封类 。不让你用
val resultDStream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String,String](streamingContext,consistent,consumerStrategy)
//循环遍历每一个RDD当中的数据
//一个RDD有多个分区,一个分区里面有多条数据
//一个RDD里面多条数据
//注意:foreachRDD outputOperation算子
resultDStream.foreachRDD(iter =>{
//判断rdd当中有没有数据
if(iter.count() > 0){
//如果获取到的RDD里面的数据大于0 ,表示rdd当中有数据了
//使用foreach, 是一个action算子
//循环rdd当中所有的数据,将每一条数据取出来,进行处理
//使用一个rdd进行foreach,将rdd当中所有的数据挨条取出来进行处理,如果处理完了,表示一个RDD当中所有的数据都处理完了
iter.foreach(line =>{
//获取数据的value
val value: String = line.value()
println(value)
//为了更加精确的控制offset,我们可以在这里处理一条数据,就提交一条数据的offset
})
//"enable.auto.commit" -> (false: java.lang.Boolean) 设置了offset自动提交关闭了,数据不会再自动提交offset了,需要我们手动处理提交完成
//rdd当中一批数据处理完了,将这一批数据的offset全部提交
//获取rdd当中所有数据的offset,将iter强制转换成为HasOffsetRanges 这个类,调用offsetRange方法,就得到了这一个RDD当中所有的offset值
val ranges: Array[OffsetRange] = iter.asInstanceOf[HasOffsetRanges].offsetRanges
//提交offset值,将resultDStream 强制转换成为CanCommitOffsets 这个类,然后调用commitAsync实现异步提交offset的值
resultDStream.asInstanceOf[CanCommitOffsets].commitAsync(ranges)
}
})
//启动sparkStreaming的程序
streamingContext.start()
streamingContext.awaitTermination()
}
}
Window Operations
Window Operations有点类似于Storm中的State,可以设置窗口的大小和滑动窗口的间隔来动态的获取当前Steaming的允许状态。
基于窗口的操作会在一个比 StreamingContext 的批次间隔更长的时间范围内,通过整合多个批次的结果,计算出整个窗口的结果。
SparkStreaming窗口函数reduceByKeyAndWindow,实现单词计数
2 实现流程
(1)安装并启动生成者
首先在linux服务器上用YUM安装nc工具,nc命令是netcat命令的简称,都是用来设置路由器。我们可以利用它向某个端口发送数据。
yum install -y nc
(2)启动一个服务端并监听9999端口
nc-lk 9999
向指定的端口发送数据
(3)编写Spark Streaming程序
package cn.test.spark
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}
/**
- sparkStreming开窗函数—统计一定时间内单词出现的次数
*/
object SparkStreamingTCPWindow {
def main(args: Array[String]): Unit = {
//配置sparkConf参数
val sparkConf: SparkConf = new SparkConf().setAppName(“SparkStreamingTCPWindow”).setMaster(“local[2]”)
//构建sparkContext对象
val sc: SparkContext = new SparkContext(sparkConf)
sc.setLogLevel(“WARN”)
//构建StreamingContext对象,每个批处理的时间间隔
val scc: StreamingContext = new StreamingContext(sc,Seconds(5))
//注册一个监听的IP地址和端口 用来收集数据
val lines: ReceiverInputDStream[String] = scc.socketTextStream(“192.168.200.160”,9999)
//切分每一行记录
val words: DStream[String] = lines.flatMap(.split(" "))
//每个单词记为1
val wordAndOne: DStream[(String, Int)] = words.map((,1))
//reduceByKeyAndWindow函数参数意义:
// windowDuration:表示window框住的时间长度,如本例5秒切分一次RDD,框10秒,就会保留最近2次切分的RDD
//slideDuration: 表示window滑动的时间长度,即每隔多久执行本计算
val result: DStream[(String, Int)] = wordAndOne.reduceByKeyAndWindow((a:Int,b:Int)=>a+b,Seconds(10),Seconds(5))
result.print()
scc.start()
scc.awaitTermination()
}
}
3 执行查看效果
(1)先执行nc -lk 9999
(2)然后在执行以上代码
(3)不断的在(1)中输入不同的单词,观察IDEA控制台输出
sparkStreaming的容错机制
检查点容错:可以设置checkPoint路径,方便数据的恢复
驱动器的容错:不要使用new的方式创建streamingContext
推荐使用
val ssc = StreamingContext.getOrCreate(checkpointDir, createStreamingContext _)
工作节点的容错:将接收到的数据进行保存,然后使用RDD的血统进行数据的容错
接收器的容错:可以选择比较靠谱的消息源来接收数据,例如kafka等消息队列
处理的保证:尽量保证所有的数据,处理且仅处理一次。实现数据的不丢不漏不重不错
标签:知识点,log,val,kafka,2181,整合,offset,Sparkstreaming,String 来源: https://blog.csdn.net/KujyouRuri/article/details/116240231