其他分享
首页 > 其他分享> > Spark -实时综合实战

Spark -实时综合实战

作者:互联网

 

 

 

 

 

 

 

 

# Start HDFS
hadoop-daemon.sh start namenode
hadoop-daemon.sh start datanode
# Start YARN
yarn-daemon.sh start resourcemanager
yarn-daemon.sh start nodemanager
# Start MRHistoryServer
mr-jobhistory-daemon.sh start historyserver
# Start Spark HistoryServer
/export/server/spark/sbin/start-history-server.sh
# Start Zookeeper
zookeeper-daemon.sh start
# Start Kafka
kafka-server-start.sh -daemon /export/server/kafka/config/server.properties
# Start HBase
hbase-daemon.sh start master
hbase-daemon.sh start regionserver
# Start search
elasticsearch-daemon.sh start
# Start Redis
/export/server/redis/bin/redis-server /export/server/redis/conf/redis.conf
整个实时综合案例所涉及的大数据技术框架,基本上都是企业实时业务使用的,通过此案例框 架整合使用,进一步掌握大数据框架技术与应用。    

1.2.2 应用开发环境

在前面创建的Maven Project工程中创建Maven Module模块,pom.xml文件中添加相关依赖:
<!-- 指定仓库位置,依次为aliyun、cloudera和jboss仓库 -->
<repositories>
<repository>
<id>aliyun</id>
<url>http://maven.aliyun.com/nexus/content/groups/public/</url>
</repository>
<repository>
<id>cloudera</id>
<url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
</repository>
<repository>
<id>jboss</id>
<url>http://repository.jboss.com/nexus/content/groups/public</url>
</repository>
</repositories>
<properties>
<scala.version>2.11.12</scala.version>
<scala.binary.version>2.11</scala.binary.version>
<spark.version>2.4.5</spark.version>
<hadoop.version>2.6.0-cdh5.16.2</hadoop.version>
<hbase.version>1.2.0-cdh5.16.2</hbase.version>
<kafka.version>2.0.0</kafka.version>
<mysql.version>8.0.19</mysql.version>
</properties>
<dependencies>
<!-- 依赖Scala语言 -->
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>
<!-- Spark Core 依赖 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<!-- Spark SQL 依赖 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<!-- Structured Streaming + Kafka 依赖 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql-kafka-0-10_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<!-- Spark Streaming 依赖 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<!-- Spark Streaming 与Kafka 0.10.0 集成依赖-->
<!--
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
</dependency>
-->
<!-- Spark Streaming 与Kafka 0.8.2.1 集成依赖 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-8_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<!-- Hadoop Client 依赖 -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${hadoop.version}</version>
</dependency>
<!-- HBase Client 依赖 -->
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-server</artifactId>
<version>${hbase.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-hadoop2-compat</artifactId>
<version>${hbase.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>${hbase.version}</version>
</dependency>
<!-- MySQL Client 依赖 -->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>${mysql.version}</version>
</dependency>
<!-- 管理配置文件 -->
<dependency>
<groupId>com.typesafe</groupId>
<artifactId>config</artifactId>
<version>1.2.1</version>
</dependency>
<!-- 根据ip转换为省市区 -->
<dependency>
<groupId>org.lionsoul</groupId>
<artifactId>ip2region</artifactId>
<version>1.7.2</version>
</dependency>
<!-- JSON解析库:fastjson -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.47</version>
</dependency>
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch</artifactId>
<version>6.0.0</version>
</dependency>
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>transport</artifactId>
<version>6.0.0</version>
</dependency>
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch-spark-20_2.11</artifactId>
<version>6.0.0</version>
</dependency>
<dependency>
<groupId>com.redislabs</groupId>
<artifactId>spark-redis_2.11</artifactId>
<version>2.4.2</version>
</dependency>
</dependencies>
<build>
<outputDirectory>target/classes</outputDirectory>
<testOutputDirectory>target/test-classes</testOutputDirectory>
<resources>
<resource>
<directory>${project.basedir}/src/main/resources</directory>
</resource>
</resources>
<!-- Maven 编译的插件 -->
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.0</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
<encoding>UTF-8</encoding>
</configuration>
</plugin>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.2.0</version>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
按照应用开发分层结构,需要在【src源码目录】下创建相关目录和包,具体如下:

 

 

# local mode
app.is.local=true
app.spark.master=local[3]
# kafka config
kafka.bootstrap.servers=node1.itcast.cn:9092
kafka.auto.offset.reset=largest
kafka.source.topics=orderTopic
kafka.etl.topic=orderEtlTopic
kafka.max.offsets.per.trigger=100000
# Kafka Consumer Group ID
streaming.etl.group.id=order-etl-1000
# Zookeeper Server
kafka.zk.url=node1.itcast.cn:2181/kafka200
# streaming checkpoint
streaming.etl.ckpt=datas/order-apps/ckpt/etl-ckpt/
streaming.hbase.ckpt=datas/order-apps/ckpt/hbase-ckpt/
streaming.es.ckpt=datas/order-apps/ckpt/es-ckpt/
streaming.amt.total.ckpt=datas/order-apps/ckpt/amt-total-ckpt/
streaming.amt.province.ckpt=datas/order-apps/ckpt/amt-province-ckpt/
streaming.amt.city.ckpt=datas/order-apps/ckpt/amt-city-ckpt/
##streaming.etl.ckpt=/spark/order-apps/ckpt/etl-ckpt/
##streaming.hbase.ckpt=/spark/order-apps/ckpt/hbase-ckpt/
##streaming.es.ckpt=/spark/order-apps/ckpt/es-ckpt/
##streaming.amt.total.ckpt=/spark/order-apps/ckpt/amt-total-ckpt/
##streaming.amt.province.ckpt=/spark/order-apps/ckpt/amt-province-ckpt/
##streaming.amt.city.ckpt=/spark/order-apps/ckpt/amt-city-ckpt/
# streaming stop file
stop.etl.file=datas/order-apps/stop/etl-stop
stop.hbase.file=datas/order-apps/stop/hbase-stop
stop.es.file=datas/order-apps/stop/es-stop
stop.state.file=datas/order-apps/stop/state-stop
##stop.etl.file=/spark/order-apps/stop/etl-stop
##stop.hbase.file=/spark/order-apps/stop/hbase-stop
##stop.es.file=/spark/order-apps/stop/es-stop
##stop.state.file=/spark/order-apps/stop/state-stop
# HBase Config
hbase.zk.hosts=node1.itcast.cn
hbase.zk.port=2181
hbase.zk.znode=/hbase
hbase.order.table=htb_orders
hbase.table.family=info
hbase.table.columns=orderId,userId,orderTime,ip,orderMoney,orderStatus,province,city
# Elasticsearch Config
es.nodes=node1.itcast.cn
es.port=9200
es.index.auto.create=true
es.write.operation=upsert
es.index.name=orders/index
es.mapping.id=orderId
# Redis Config
redis.host=node1.itcast.cn
redis.port=6379
redis.db=0
# 字典数据
ipdata.region.path=dataset/ip2region.db
##ipdata.region.path=hdfs://node1.itcast.cn:8020/spark/dataset/ip2region.db
其中应用开发时,采用本地模式运行,相关数据保存在本地文件系统,测试生产时使用HDFS 文件系统。 编写加载属性文件工具类:ApplicationConfig,位于【cn.itcast.spark.config】包,具体代码如下:
package cn.itcast.spark.config
import com.typesafe.config.{Config, ConfigFactory}
/**
* 加载应用Application属性配置文件config.properties获取属性值
*/
object ApplicationConfig {
// 加载属性文件
private val config: Config = ConfigFactory.load("config.properties")
/*
运行模式,开发测试为本地模式,测试生产通过--master传递
*/
lazy val APP_LOCAL_MODE: Boolean = config.getBoolean("app.is.local")
lazy val APP_SPARK_MASTER: String = config.getString("app.spark.master")
/*
Kafka 相关配置信息
*/
lazy val KAFKA_BOOTSTRAP_SERVERS: String = config.getString("kafka.bootstrap.servers")
lazy val KAFKA_AUTO_OFFSET_RESET: String = config.getString("kafka.auto.offset.reset")
lazy val KAFKA_SOURCE_TOPICS: String = config.getString("kafka.source.topics")
lazy val KAFKA_ETL_TOPIC: String = config.getString("kafka.etl.topic")
lazy val KAFKA_MAX_OFFSETS: String = config.getString("kafka.max.offsets.per.trigger")
lazy val KAFKA_ZK_URL: String = config.getString("kafka.zk.url")
lazy val STREAMING_ETL_GROUP_ID: String = config.getString("streaming.etl.group.id")
/*
Streaming流式应用,检查点目录
*/
lazy val STREAMING_ETL_CKPT: String = config.getString("streaming.etl.ckpt")
lazy val STREAMING_HBASE_CKPT: String = config.getString("streaming.hbase.ckpt")
lazy val STREAMING_ES_CKPT: String = config.getString("streaming.es.ckpt")
lazy val STREAMING_AMT_TOTAL_CKPT: String = config.getString("streaming.amt.total.ckpt")
lazy val STREAMING_AMT_PROVINCE_CKPT: String = config.getString("streaming.amt.province.ckpt")
lazy val STREAMING_AMT_CITY_CKPT: String = config.getString("streaming.amt.city.ckpt")
/*
Streaming流式应用,停止文件
*/
lazy val STOP_ETL_FILE: String = config.getString("stop.etl.file")
lazy val STOP_HBASE_FILE: String = config.getString("stop.hbase.file")
lazy val STOP_ES_FILE: String = config.getString("stop.es.file")
lazy val STOP_STATE_FILE: String = config.getString("stop.state.file")
/*
HBase 数据库连接信息及表的信息
*/
lazy val HBASE_ZK_HOSTS: String = config.getString("hbase.zk.hosts")
lazy val HBASE_ZK_PORT: String = config.getString("hbase.zk.port")
lazy val HBASE_ZK_ZNODE: String = config.getString("hbase.zk.znode")
lazy val HBASE_ORDER_TABLE: String = config.getString("hbase.order.table")
lazy val HBASE_ORDER_TABLE_FAMILY: String = config.getString("hbase.table.family")
lazy val HBASE_ORDER_TABLE_COLUMNS: Array[String] = config.getString("hbase.table.columns").split(",")
/*
Elasticsearch 连接信息
*/
lazy val ES_NODES: String = config.getString("es.nodes")
lazy val ES_PORT: String = config.getString("es.port")
lazy val ES_INDEX_AUTO_CREATE: String = config.getString("es.index.auto.create")
lazy val ES_WRITE_OPERATION: String = config.getString("es.write.operation")
lazy val ES_INDEX_NAME: String = config.getString("es.index.name")
lazy val ES_MAPPING_ID: String = config.getString("es.mapping.id")
/*
Redis 数据库
*/
lazy val REDIS_HOST: String = config.getString("redis.host")
lazy val REDIS_PORT: String = config.getString("redis.port")
lazy val REDIS_DB: String = config.getString("redis.db")
// 解析IP地址字典数据文件存储路径
lazy val IPS_DATA_REGION_PATH: String = config.getString("ipdata.region.path") }
每个属性变量前使用lazy,表示懒加载初始化,当第一次使用变量时,才会进行初始化。  

 

 

package cn.itcast.spark.utils
import cn.itcast.spark.config.ApplicationConfig
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.streaming.{Seconds, StreamingContext}
/**
* 工具类:构建SparkSession和StreamingContext实例对象
*/
object SparkUtils {
/**
* 获取SparkSession实例对象,传递Class对象
* @param clazz Spark Application字节码Class对象
* @return SparkSession对象实例
*/
def createSparkSession(clazz: Class[_]): SparkSession = {
// 1. 构建SparkConf对象
val sparkConf: SparkConf = new SparkConf()
.setAppName(clazz.getSimpleName.stripSuffix("$"))
.set("spark.debug.maxToStringFields", "2000")
.set("spark.sql.debug.maxToStringFields", "2000")
// 2. 判断应用是否本地模式运行,如果是设置值
if(ApplicationConfig.APP_LOCAL_MODE){
sparkConf
.setMaster(ApplicationConfig.APP_SPARK_MASTER)
// 设置Shuffle时分区数目
.set("spark.sql.shuffle.partitions", "3") }
// 3. 获取SparkSession实例对象
val session: SparkSession = SparkSession
.builder()
.config(sparkConf)
.getOrCreate()
// 4. 返回实例
session
}
/**
* 获取StreamingContext流式上下文实例对象
* @param clazz Spark Application字节码Class对象
* @param batchInterval 每批次时间间隔
*/
def createStreamingContext(clazz: Class[_], batchInterval: Int): StreamingContext = {
// 构建对象实例
val context: StreamingContext = StreamingContext.getActiveOrCreate(
() => {
// 1. 构建SparkConf对象
val sparkConf: SparkConf = new SparkConf()
.setAppName(clazz.getSimpleName.stripSuffix("$"))
.set("spark.debug.maxToStringFields", "2000")
.set("spark.sql.debug.maxToStringFields", "2000")
.set("spark.streaming.stopGracefullyOnShutdown", "true")
// 2. 判断应用是否本地模式运行,如果是设置值
if(ApplicationConfig.APP_LOCAL_MODE){
sparkConf
.setMaster(ApplicationConfig.APP_SPARK_MASTER)
// 设置每批次消费数据最大数据量,生成环境使用命令行设置
.set("spark.streaming.kafka.maxRatePerPartition", "10000") }
// 3. 创建StreamingContext对象
new StreamingContext(sparkConf, Seconds(batchInterval))
} )
context // 返回对象
} }

 

其中应用开发本地模式运行时设置的相关属性,在测试和生成环境使用spark-submit提交应用, 通过--conf指定此属性的值。  

1.4 模拟交易订单数据

编程模拟生成交易订单数据,实时发送至Kafka Topic,为了简单起见交易订单数据字段如下, 封装到样例类OrderRecord中:  
/**
* 订单实体类(Case Class) * @param orderId 订单ID
* @param userId 用户ID
* @param orderTime 订单日期时间
* @param ip 下单IP地址
* @param orderMoney 订单金额
* @param orderStatus 订单状态
*/
case class OrderRecord(
orderId: String,
userId: String,
orderTime: String,
ip: String,
orderMoney: Double,
orderStatus: Int
)

1.4.1 创建 Topic

在整个实时综合案例中,原始的交易订单数据存储【orderTopic】,经过ETL后交易订单数据 存【orderEtlTopic】,关于Topic创建等操作命令如下:
# 启动Zookeeper
zookeeper-daemon.sh start
# 启动Kafka Broker
kafka-server-start.sh -daemon /export/server/kafka/config/server.properties
# 查看Topic信息
kafka-topics.sh --list --zookeeper node1.itcast.cn:2181/kafka200
# 创建topic
kafka-topics.sh --create --zookeeper node1.itcast.cn:2181/kafka200 --replication-factor 1 --partitions 3
--topic orderTopic
# 模拟生产者
kafka-console-producer.sh --broker-list node1.itcast.cn:9092 --topic orderTopic
# 模拟消费者
kafka-console-consumer.sh --bootstrap-server node1.itcast.cn:9092 --topic orderTopic --from-beginning
# 删除topic
kafka-topics.sh --delete --zookeeper node1.itcast.cn:2181/kafka200 --topic orderTopic
# 创建topic
kafka-topics.sh --create --zookeeper node1.itcast.cn:2181/kafka200 --replication-factor 1 --partitions 3
--topic orderEtlTopic
# 模拟消费者
kafka-console-consumer.sh --bootstrap-server node1.itcast.cn:9092 --topic orderEtlTopic --from-beginning
# 删除topic
kafka-topics.sh --delete --zookeeper node1.itcast.cn:2181/kafka200 --topic orderEtlTopic

1.4.2 模拟数据

编写程序,实时产生交易订单数据,使用Json4J类库转换数据为JSON字符,发送Kafka Topic 中,代码如下:
package cn.itcast.spark.mock
import java.util.Properties
import org.apache.commons.lang3.time.FastDateFormat
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import org.apache.kafka.common.serialization.StringSerializer
import org.json4s.jackson.Json
import scala.util.Random
/**
* 模拟生产订单数据,发送到Kafka Topic中 * Topic中每条数据Message类型为String,以JSON格式数据发送
* 数据转换:
* 将Order类实例对象转换为JSON格式字符串数据(可以使用json4s类库)
*/
object MockOrderProducer {
def main(args: Array[String]): Unit = {
var producer: KafkaProducer[String, String] = null
try {
// 1. Kafka Client Producer 配置信息
val props = new Properties()
props.put("bootstrap.servers", "node1.itcast.cn:9092")
props.put("acks", "1")
props.put("retries", "3")
props.put("key.serializer", classOf[StringSerializer].getName)
props.put("value.serializer", classOf[StringSerializer].getName)
// 2. 创建KafkaProducer对象,传入配置信息
producer = new KafkaProducer[String, String](props)
// 随机数实例对象
val random: Random = new Random()
// 订单状态:订单打开 0,订单取消 1,订单关闭 2,订单完成 3
val allStatus =Array(0, 1, 2, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0)
while(true){
// 每次循环 模拟产生的订单数目
val batchNumber: Int = random.nextInt(2) + 1 (1 to batchNumber).foreach{number =>
val currentTime: Long = System.currentTimeMillis()
val orderId: String = s"${getDate(currentTime)}%06d".format(number)
val userId: String = s"${1 + random.nextInt(5)}%08d".format(random.nextInt(1000))
val orderTime: String = getDate(currentTime, format="yyyy-MM-dd HH:mm:ss.SSS")
val orderMoney: String = s"${5 + random.nextInt(500)}.%02d".format(random.nextInt(100))
val orderStatus: Int = allStatus(random.nextInt(allStatus.length))
// 3. 订单记录数据
val orderRecord: OrderRecord = OrderRecord(
orderId, userId, orderTime, getRandomIp, orderMoney.toDouble, orderStatus
)
// 转换为JSON格式数据
val orderJson = new Json(org.json4s.DefaultFormats).write(orderRecord)
println(orderJson)
// 4. 构建ProducerRecord对象
val record = new ProducerRecord[String, String]("orderTopic", orderJson)
// 5. 发送数据:def send(messages: KeyedMessage[K,V]*), 将数据发送到Topic
producer.send(record)
}
Thread.sleep(random.nextInt(10) * 100 + 500) } }catch {
case e: Exception => e.printStackTrace()
}finally {
if(null != producer) producer.close()
} }
/**=================获取当前时间=================*/
def getDate(time: Long, format: String = "yyyyMMddHHmmssSSS"): String = {
val fastFormat: FastDateFormat = FastDateFormat.getInstance(format)
val formatDate: String = fastFormat.format(time) // 格式化日期
formatDate
}
/**================= 获取随机IP地址 =================*/
def getRandomIp: String = {
// ip范围
val range: Array[(Int, Int)] = Array(
(607649792,608174079), //36.56.0.0-36.63.255.255
(1038614528,1039007743), //61.232.0.0-61.237.255.255
(1783627776,1784676351), //106.80.0.0-106.95.255.255
(2035023872,2035154943), //121.76.0.0-121.77.255.255
(2078801920,2079064063), //123.232.0.0-123.235.255.255
(-1950089216,-1948778497),//139.196.0.0-139.215.255.255
(-1425539072,-1425014785),//171.8.0.0-171.15.255.255
(-1236271104,-1235419137),//182.80.0.0-182.92.255.255
(-770113536,-768606209),//210.25.0.0-210.47.255.255
(-569376768,-564133889) //222.16.0.0-222.95.255.255
)
// 随机数:IP地址范围下标
val random = new Random()
val index = random.nextInt(10)
val ipNumber: Int = range(index)._1 + random.nextInt(range(index)._2 - range(index)._1)
// 转换Int类型IP地址为IPv4格式
number2IpString(ipNumber)
}
/**=================将Int类型IPv4地址转换为字符串类型=================*/
def number2IpString(ip: Int): String = {
val buffer: Array[Int] = new Array[Int](4)
buffer(0) = (ip >> 24) & 0xff
buffer(1) = (ip >> 16) & 0xff
buffer(2) = (ip >> 8) & 0xff
buffer(3) = ip & 0xff
// 返回IPv4地址
buffer.mkString(".") } }

 

 

 

 

标签:实战,lazy,String,val,实时,ckpt,Spark,config,spark
来源: https://www.cnblogs.com/wq-9/p/16224466.html