其他分享
首页 > 其他分享> > Sparkstreaming的整合和其他一些知识点

Sparkstreaming的整合和其他一些知识点

作者:互联网

flume与spark的整合

flume作为日志实时采集的框架,可以与SparkStreaming实时处理框架进行对接,flume实时产生数据,sparkStreaming做实时处理。
Spark Streaming对接FlumeNG有两种方式,一种是FlumeNG将消息Push推给Spark Streaming还有一种是Spark Streaming从flume 中Poll拉取数据

为了事务保证,使用TailDirSource
在这里插入图片描述

flume采集频率:

​	flume采集数据用作离线处理:可能 是每隔半个小时采集一次

​	flume采集的数据用作实时处理:实时的采集

    如何保证flume不宕机:可以写一个shell脚本监控flume正常运行

一般不会与flume进行整合,会有弊端,flume采集的数据量可能比较大,可能有时候又比较小,就是说flume没办法对数据流量进行控制。

一般都是与kafka进行整合,使用kafka实现数据的限流的作用

kafka与spark的整合

首先需要对kafka进行修改配置

修改kafka配置文件
第一台机器修改kafka配置文件server.properties

broker.id=0
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/export/servers/kafka_2.11-1.0.0/logs
num.partitions=2
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.flush.interval.messages=10000
log.flush.interval.ms=1000
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=node01:2181,node02:2181,node03:2181
zookeeper.connection.timeout.ms=6000
group.initial.rebalance.delay.ms=0
delete.topic.enable=true
host.name=node01

第二台机器修改kafka配置文件server.properties

broker.id=1
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/export/servers/kafka_2.11-1.0.0/logs
num.partitions=2
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.flush.interval.messages=10000
log.flush.interval.ms=1000
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=node01:2181,node02:2181,node03:2181
zookeeper.connection.timeout.ms=6000
group.initial.rebalance.delay.ms=0
delete.topic.enable=true
host.name=node02

第三台机器修改kafka配置文件server.properties

broker.id=2
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/export/servers/kafka_2.11-1.0.0/logs
num.partitions=2
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.flush.interval.messages=10000
log.flush.interval.ms=1000
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=node01:2181,node02:2181,node03:2181
zookeeper.connection.timeout.ms=6000
group.initial.rebalance.delay.ms=0
delete.topic.enable=true
host.name=node03

2.4启动kafka集群
三台机器启动kafka服务

bin/kafka-server-start.sh config/server.properties > /dev/null 2>&1 &    后台启动命令

3、kafka的命令行的管理使用

创建topic
kafka-topics.sh --create --partitions 3 --replication-factor 2 --topic kafkatopic --zookeeper node01:2181,node02:2181,node03:2181
模拟生产者
kafka-console-producer.sh --broker-list node01:9092,node02:9092,node03:9092 --topic kafkatopic
模拟消费者
kafka-console-consumer.sh --from-beginning --topic kafkatopic --zookeeper node01:2181,node02:2181,node03:2181

sparkStreaming与kafka的整合,两个版本

在这里插入图片描述

kafka0.8版本:
接收数据的两种方式

ReceiverDstream:使用HighLeveAPI进行消费,offset保存在zk当中,使用at least once消费模式, 会造成数据的重复消费。

每隔一段时间,默认自动提交一次offset到zk当中去保存

​ DirectDstream:

使用是LowLeveAPI进行消费,offset保存在默认的topic里面,使用at most once消费模式,会造成数据丢失

默认是按照最新的offset进行消费

kafka0.10版本(一般用这个):

接收数据只有一种方式

DirectDStream:使用lowLeveAPI进行消费,offset默认保存在topic里面,配合手动提交offset,实现exactly once的消费模式

导入jar包依赖:

<!-- <dependency>
	<groupId>org.apache.spark</groupId>
	<artifactId>spark-streaming-kafka-0-8_2.11</artifactId>
	<version>2.2.0</version>
</dependency>-->
<dependency>
	<groupId>org.apache.spark</groupId>
	<artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
	<version>2.2.0</version>
</dependency>

源代码:

import com.google.common.hash.HashingOutputStream
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.kafka010._

object SparkStreaming10Direct {

  def main(args: Array[String]): Unit = {

    /**
      * [K, V](
      ssc: StreamingContext,
      locationStrategy: LocationStrategy,
      consumerStrategy: ConsumerStrategy[K, V]
    )
      */

    val sparkContext = new SparkContext(new SparkConf().setMaster("local[8]").setAppName("sparKafka10DirectStream"))
    sparkContext.setLogLevel("WARN")

    val streamingContext = new StreamingContext(sparkContext,Seconds(5))

    val consistent = LocationStrategies.PreferConsistent

    /**
      * [K, V](
      topics: ju.Collection[jl.String],
      kafkaParams: ju.Map[String, Object])
      */

    //创建topic
    val brokers= "node01:9092,node02:9092,node03:9092"
    val sourcetopic="sparkafka";
    //创建消费者组
    var group="sparkafkaGroup"
    //消费者配置
    val kafkaParam = Map(
      "bootstrap.servers" -> brokers,//用于初始化链接到集群的地址
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      //用于标识这个消费者属于哪个消费团体
      "group.id" -> group,
      //如果没有初始化偏移量或者当前的偏移量不存在任何服务器上,可以使用这个配置属性
      //可以使用这个配置,latest自动重置偏移量为最新的偏移量
      "auto.offset.reset" -> "latest",
      //如果是true,则这个消费者的偏移量会在后台自动提交
      //offset不要自动提交,我们等会儿处理完了数据之后,手动提交offset
      "enable.auto.commit" -> (false: java.lang.Boolean)
    );
    //使用 ConsumerStrategies.Subscribe来订阅某一个topic当中的数据
    val consumerStrategy: ConsumerStrategy[String, String] = ConsumerStrategies.Subscribe[String,String](Array("sparkafka"),kafkaParam)
//sealed 使用这个关键字修饰的类叫做密封类 。不让你用
    val resultDStream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String,String](streamingContext,consistent,consumerStrategy)
    //循环遍历每一个RDD当中的数据
    //一个RDD有多个分区,一个分区里面有多条数据
    //一个RDD里面多条数据
    //注意:foreachRDD   outputOperation算子
    resultDStream.foreachRDD(iter =>{
      //判断rdd当中有没有数据
      if(iter.count() > 0){
        //如果获取到的RDD里面的数据大于0 ,表示rdd当中有数据了
      //使用foreach, 是一个action算子
        //循环rdd当中所有的数据,将每一条数据取出来,进行处理
        //使用一个rdd进行foreach,将rdd当中所有的数据挨条取出来进行处理,如果处理完了,表示一个RDD当中所有的数据都处理完了
        iter.foreach(line =>{
          //获取数据的value
          val value: String = line.value()
          println(value)
          //为了更加精确的控制offset,我们可以在这里处理一条数据,就提交一条数据的offset
        })
        //"enable.auto.commit" -> (false: java.lang.Boolean) 设置了offset自动提交关闭了,数据不会再自动提交offset了,需要我们手动处理提交完成
        //rdd当中一批数据处理完了,将这一批数据的offset全部提交
        //获取rdd当中所有数据的offset,将iter强制转换成为HasOffsetRanges 这个类,调用offsetRange方法,就得到了这一个RDD当中所有的offset值
        val ranges: Array[OffsetRange] = iter.asInstanceOf[HasOffsetRanges].offsetRanges
        //提交offset值,将resultDStream 强制转换成为CanCommitOffsets 这个类,然后调用commitAsync实现异步提交offset的值
        resultDStream.asInstanceOf[CanCommitOffsets].commitAsync(ranges)
      }
    })
    //启动sparkStreaming的程序
    streamingContext.start()
    streamingContext.awaitTermination()


  }


}

Window Operations

Window Operations有点类似于Storm中的State,可以设置窗口的大小和滑动窗口的间隔来动态的获取当前Steaming的允许状态。
基于窗口的操作会在一个比 StreamingContext 的批次间隔更长的时间范围内,通过整合多个批次的结果,计算出整个窗口的结果。
在这里插入图片描述
SparkStreaming窗口函数reduceByKeyAndWindow,实现单词计数

2 实现流程
(1)安装并启动生成者
首先在linux服务器上用YUM安装nc工具,nc命令是netcat命令的简称,都是用来设置路由器。我们可以利用它向某个端口发送数据。
yum install -y nc
(2)启动一个服务端并监听9999端口
nc-lk 9999
向指定的端口发送数据
(3)编写Spark Streaming程序

package cn.test.spark

import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}

/**

def main(args: Array[String]): Unit = {
//配置sparkConf参数
val sparkConf: SparkConf = new SparkConf().setAppName(“SparkStreamingTCPWindow”).setMaster(“local[2]”)
//构建sparkContext对象
val sc: SparkContext = new SparkContext(sparkConf)
sc.setLogLevel(“WARN”)
//构建StreamingContext对象,每个批处理的时间间隔
val scc: StreamingContext = new StreamingContext(sc,Seconds(5))
//注册一个监听的IP地址和端口 用来收集数据
val lines: ReceiverInputDStream[String] = scc.socketTextStream(“192.168.200.160”,9999)
//切分每一行记录
val words: DStream[String] = lines.flatMap(.split(" "))
//每个单词记为1
val wordAndOne: DStream[(String, Int)] = words.map((
,1))
//reduceByKeyAndWindow函数参数意义:
// windowDuration:表示window框住的时间长度,如本例5秒切分一次RDD,框10秒,就会保留最近2次切分的RDD
//slideDuration: 表示window滑动的时间长度,即每隔多久执行本计算
val result: DStream[(String, Int)] = wordAndOne.reduceByKeyAndWindow((a:Int,b:Int)=>a+b,Seconds(10),Seconds(5))
result.print()
scc.start()
scc.awaitTermination()
}
}

3 执行查看效果
(1)先执行nc -lk 9999
在这里插入图片描述
(2)然后在执行以上代码
在这里插入图片描述
(3)不断的在(1)中输入不同的单词,观察IDEA控制台输出
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

sparkStreaming的容错机制

检查点容错:可以设置checkPoint路径,方便数据的恢复

驱动器的容错:不要使用new的方式创建streamingContext

推荐使用

val ssc = StreamingContext.getOrCreate(checkpointDir, createStreamingContext _)

工作节点的容错:将接收到的数据进行保存,然后使用RDD的血统进行数据的容错

接收器的容错:可以选择比较靠谱的消息源来接收数据,例如kafka等消息队列

处理的保证:尽量保证所有的数据,处理且仅处理一次。实现数据的不丢不漏不重不错

标签:知识点,log,val,kafka,2181,整合,offset,Sparkstreaming,String
来源: https://blog.csdn.net/KujyouRuri/article/details/116240231