Spark -实时综合实战
作者:互联网
# Start HDFS hadoop-daemon.sh start namenode hadoop-daemon.sh start datanode # Start YARN yarn-daemon.sh start resourcemanager yarn-daemon.sh start nodemanager # Start MRHistoryServer mr-jobhistory-daemon.sh start historyserver # Start Spark HistoryServer /export/server/spark/sbin/start-history-server.sh # Start Zookeeper zookeeper-daemon.sh start # Start Kafka kafka-server-start.sh -daemon /export/server/kafka/config/server.properties # Start HBase hbase-daemon.sh start master hbase-daemon.sh start regionserver # Start search elasticsearch-daemon.sh start # Start Redis /export/server/redis/bin/redis-server /export/server/redis/conf/redis.conf整个实时综合案例所涉及的大数据技术框架,基本上都是企业实时业务使用的,通过此案例框 架整合使用,进一步掌握大数据框架技术与应用。
1.2.2 应用开发环境
在前面创建的Maven Project工程中创建Maven Module模块,pom.xml文件中添加相关依赖:<!-- 指定仓库位置,依次为aliyun、cloudera和jboss仓库 --> <repositories> <repository> <id>aliyun</id> <url>http://maven.aliyun.com/nexus/content/groups/public/</url> </repository> <repository> <id>cloudera</id> <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url> </repository> <repository> <id>jboss</id> <url>http://repository.jboss.com/nexus/content/groups/public</url> </repository> </repositories> <properties> <scala.version>2.11.12</scala.version> <scala.binary.version>2.11</scala.binary.version> <spark.version>2.4.5</spark.version> <hadoop.version>2.6.0-cdh5.16.2</hadoop.version> <hbase.version>1.2.0-cdh5.16.2</hbase.version> <kafka.version>2.0.0</kafka.version> <mysql.version>8.0.19</mysql.version> </properties> <dependencies> <!-- 依赖Scala语言 --> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> <version>${scala.version}</version> </dependency> <!-- Spark Core 依赖 --> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_${scala.binary.version}</artifactId> <version>${spark.version}</version> </dependency> <!-- Spark SQL 依赖 --> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_${scala.binary.version}</artifactId> <version>${spark.version}</version> </dependency> <!-- Structured Streaming + Kafka 依赖 --> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql-kafka-0-10_${scala.binary.version}</artifactId> <version>${spark.version}</version> </dependency> <!-- Spark Streaming 依赖 --> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming_${scala.binary.version}</artifactId> <version>${spark.version}</version> </dependency> <!-- Spark Streaming 与Kafka 0.10.0 集成依赖--> <!-- <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming-kafka-0-10_${scala.binary.version}</artifactId> <version>${spark.version}</version> </dependency> --> <!-- Spark Streaming 与Kafka 0.8.2.1 集成依赖 --> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming-kafka-0-8_${scala.binary.version}</artifactId> <version>${spark.version}</version> </dependency> <!-- Hadoop Client 依赖 --> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>${hadoop.version}</version> </dependency> <!-- HBase Client 依赖 --> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-server</artifactId> <version>${hbase.version}</version> </dependency> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-hadoop2-compat</artifactId> <version>${hbase.version}</version> </dependency> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-client</artifactId> <version>${hbase.version}</version> </dependency> <!-- MySQL Client 依赖 --> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>${mysql.version}</version> </dependency> <!-- 管理配置文件 --> <dependency> <groupId>com.typesafe</groupId> <artifactId>config</artifactId> <version>1.2.1</version> </dependency> <!-- 根据ip转换为省市区 --> <dependency> <groupId>org.lionsoul</groupId> <artifactId>ip2region</artifactId> <version>1.7.2</version> </dependency> <!-- JSON解析库:fastjson --> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.47</version> </dependency> <dependency> <groupId>org.elasticsearch</groupId> <artifactId>elasticsearch</artifactId> <version>6.0.0</version> </dependency> <dependency> <groupId>org.elasticsearch.client</groupId> <artifactId>transport</artifactId> <version>6.0.0</version> </dependency> <dependency> <groupId>org.elasticsearch</groupId> <artifactId>elasticsearch-spark-20_2.11</artifactId> <version>6.0.0</version> </dependency> <dependency> <groupId>com.redislabs</groupId> <artifactId>spark-redis_2.11</artifactId> <version>2.4.2</version> </dependency> </dependencies> <build> <outputDirectory>target/classes</outputDirectory> <testOutputDirectory>target/test-classes</testOutputDirectory> <resources> <resource> <directory>${project.basedir}/src/main/resources</directory> </resource> </resources> <!-- Maven 编译的插件 --> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.0</version> <configuration> <source>1.8</source> <target>1.8</target> <encoding>UTF-8</encoding> </configuration> </plugin> <plugin> <groupId>net.alchim31.maven</groupId> <artifactId>scala-maven-plugin</artifactId> <version>3.2.0</version> <executions> <execution> <goals> <goal>compile</goal> <goal>testCompile</goal> </goals> </execution> </executions> </plugin> </plugins> </build>按照应用开发分层结构,需要在【src源码目录】下创建相关目录和包,具体如下:
# local mode app.is.local=true app.spark.master=local[3] # kafka config kafka.bootstrap.servers=node1.itcast.cn:9092 kafka.auto.offset.reset=largest kafka.source.topics=orderTopic kafka.etl.topic=orderEtlTopic kafka.max.offsets.per.trigger=100000 # Kafka Consumer Group ID streaming.etl.group.id=order-etl-1000 # Zookeeper Server kafka.zk.url=node1.itcast.cn:2181/kafka200 # streaming checkpoint streaming.etl.ckpt=datas/order-apps/ckpt/etl-ckpt/ streaming.hbase.ckpt=datas/order-apps/ckpt/hbase-ckpt/ streaming.es.ckpt=datas/order-apps/ckpt/es-ckpt/ streaming.amt.total.ckpt=datas/order-apps/ckpt/amt-total-ckpt/ streaming.amt.province.ckpt=datas/order-apps/ckpt/amt-province-ckpt/ streaming.amt.city.ckpt=datas/order-apps/ckpt/amt-city-ckpt/ ##streaming.etl.ckpt=/spark/order-apps/ckpt/etl-ckpt/ ##streaming.hbase.ckpt=/spark/order-apps/ckpt/hbase-ckpt/ ##streaming.es.ckpt=/spark/order-apps/ckpt/es-ckpt/ ##streaming.amt.total.ckpt=/spark/order-apps/ckpt/amt-total-ckpt/ ##streaming.amt.province.ckpt=/spark/order-apps/ckpt/amt-province-ckpt/ ##streaming.amt.city.ckpt=/spark/order-apps/ckpt/amt-city-ckpt/ # streaming stop file stop.etl.file=datas/order-apps/stop/etl-stop stop.hbase.file=datas/order-apps/stop/hbase-stop stop.es.file=datas/order-apps/stop/es-stop stop.state.file=datas/order-apps/stop/state-stop ##stop.etl.file=/spark/order-apps/stop/etl-stop ##stop.hbase.file=/spark/order-apps/stop/hbase-stop ##stop.es.file=/spark/order-apps/stop/es-stop ##stop.state.file=/spark/order-apps/stop/state-stop # HBase Config hbase.zk.hosts=node1.itcast.cn hbase.zk.port=2181 hbase.zk.znode=/hbase hbase.order.table=htb_orders hbase.table.family=info hbase.table.columns=orderId,userId,orderTime,ip,orderMoney,orderStatus,province,city # Elasticsearch Config es.nodes=node1.itcast.cn es.port=9200 es.index.auto.create=true es.write.operation=upsert es.index.name=orders/index es.mapping.id=orderId # Redis Config redis.host=node1.itcast.cn redis.port=6379 redis.db=0 # 字典数据 ipdata.region.path=dataset/ip2region.db ##ipdata.region.path=hdfs://node1.itcast.cn:8020/spark/dataset/ip2region.db其中应用开发时,采用本地模式运行,相关数据保存在本地文件系统,测试生产时使用HDFS 文件系统。 编写加载属性文件工具类:ApplicationConfig,位于【cn.itcast.spark.config】包,具体代码如下:
package cn.itcast.spark.config import com.typesafe.config.{Config, ConfigFactory} /** * 加载应用Application属性配置文件config.properties获取属性值 */ object ApplicationConfig { // 加载属性文件 private val config: Config = ConfigFactory.load("config.properties") /* 运行模式,开发测试为本地模式,测试生产通过--master传递 */ lazy val APP_LOCAL_MODE: Boolean = config.getBoolean("app.is.local") lazy val APP_SPARK_MASTER: String = config.getString("app.spark.master") /* Kafka 相关配置信息 */ lazy val KAFKA_BOOTSTRAP_SERVERS: String = config.getString("kafka.bootstrap.servers") lazy val KAFKA_AUTO_OFFSET_RESET: String = config.getString("kafka.auto.offset.reset") lazy val KAFKA_SOURCE_TOPICS: String = config.getString("kafka.source.topics") lazy val KAFKA_ETL_TOPIC: String = config.getString("kafka.etl.topic") lazy val KAFKA_MAX_OFFSETS: String = config.getString("kafka.max.offsets.per.trigger") lazy val KAFKA_ZK_URL: String = config.getString("kafka.zk.url") lazy val STREAMING_ETL_GROUP_ID: String = config.getString("streaming.etl.group.id") /* Streaming流式应用,检查点目录 */ lazy val STREAMING_ETL_CKPT: String = config.getString("streaming.etl.ckpt") lazy val STREAMING_HBASE_CKPT: String = config.getString("streaming.hbase.ckpt") lazy val STREAMING_ES_CKPT: String = config.getString("streaming.es.ckpt") lazy val STREAMING_AMT_TOTAL_CKPT: String = config.getString("streaming.amt.total.ckpt") lazy val STREAMING_AMT_PROVINCE_CKPT: String = config.getString("streaming.amt.province.ckpt") lazy val STREAMING_AMT_CITY_CKPT: String = config.getString("streaming.amt.city.ckpt") /* Streaming流式应用,停止文件 */ lazy val STOP_ETL_FILE: String = config.getString("stop.etl.file") lazy val STOP_HBASE_FILE: String = config.getString("stop.hbase.file") lazy val STOP_ES_FILE: String = config.getString("stop.es.file") lazy val STOP_STATE_FILE: String = config.getString("stop.state.file") /* HBase 数据库连接信息及表的信息 */ lazy val HBASE_ZK_HOSTS: String = config.getString("hbase.zk.hosts") lazy val HBASE_ZK_PORT: String = config.getString("hbase.zk.port") lazy val HBASE_ZK_ZNODE: String = config.getString("hbase.zk.znode") lazy val HBASE_ORDER_TABLE: String = config.getString("hbase.order.table") lazy val HBASE_ORDER_TABLE_FAMILY: String = config.getString("hbase.table.family") lazy val HBASE_ORDER_TABLE_COLUMNS: Array[String] = config.getString("hbase.table.columns").split(",") /* Elasticsearch 连接信息 */ lazy val ES_NODES: String = config.getString("es.nodes") lazy val ES_PORT: String = config.getString("es.port") lazy val ES_INDEX_AUTO_CREATE: String = config.getString("es.index.auto.create") lazy val ES_WRITE_OPERATION: String = config.getString("es.write.operation") lazy val ES_INDEX_NAME: String = config.getString("es.index.name") lazy val ES_MAPPING_ID: String = config.getString("es.mapping.id") /* Redis 数据库 */ lazy val REDIS_HOST: String = config.getString("redis.host") lazy val REDIS_PORT: String = config.getString("redis.port") lazy val REDIS_DB: String = config.getString("redis.db") // 解析IP地址字典数据文件存储路径 lazy val IPS_DATA_REGION_PATH: String = config.getString("ipdata.region.path") }每个属性变量前使用lazy,表示懒加载初始化,当第一次使用变量时,才会进行初始化。
package cn.itcast.spark.utils import cn.itcast.spark.config.ApplicationConfig import org.apache.spark.SparkConf import org.apache.spark.sql.SparkSession import org.apache.spark.streaming.{Seconds, StreamingContext} /** * 工具类:构建SparkSession和StreamingContext实例对象 */ object SparkUtils { /** * 获取SparkSession实例对象,传递Class对象 * @param clazz Spark Application字节码Class对象 * @return SparkSession对象实例 */ def createSparkSession(clazz: Class[_]): SparkSession = { // 1. 构建SparkConf对象 val sparkConf: SparkConf = new SparkConf() .setAppName(clazz.getSimpleName.stripSuffix("$")) .set("spark.debug.maxToStringFields", "2000") .set("spark.sql.debug.maxToStringFields", "2000") // 2. 判断应用是否本地模式运行,如果是设置值 if(ApplicationConfig.APP_LOCAL_MODE){ sparkConf .setMaster(ApplicationConfig.APP_SPARK_MASTER) // 设置Shuffle时分区数目 .set("spark.sql.shuffle.partitions", "3") } // 3. 获取SparkSession实例对象 val session: SparkSession = SparkSession .builder() .config(sparkConf) .getOrCreate() // 4. 返回实例 session } /** * 获取StreamingContext流式上下文实例对象 * @param clazz Spark Application字节码Class对象 * @param batchInterval 每批次时间间隔 */ def createStreamingContext(clazz: Class[_], batchInterval: Int): StreamingContext = { // 构建对象实例 val context: StreamingContext = StreamingContext.getActiveOrCreate( () => { // 1. 构建SparkConf对象 val sparkConf: SparkConf = new SparkConf() .setAppName(clazz.getSimpleName.stripSuffix("$")) .set("spark.debug.maxToStringFields", "2000") .set("spark.sql.debug.maxToStringFields", "2000") .set("spark.streaming.stopGracefullyOnShutdown", "true") // 2. 判断应用是否本地模式运行,如果是设置值 if(ApplicationConfig.APP_LOCAL_MODE){ sparkConf .setMaster(ApplicationConfig.APP_SPARK_MASTER) // 设置每批次消费数据最大数据量,生成环境使用命令行设置 .set("spark.streaming.kafka.maxRatePerPartition", "10000") } // 3. 创建StreamingContext对象 new StreamingContext(sparkConf, Seconds(batchInterval)) } ) context // 返回对象 } }
其中应用开发本地模式运行时设置的相关属性,在测试和生成环境使用spark-submit提交应用, 通过--conf指定此属性的值。
1.4 模拟交易订单数据
编程模拟生成交易订单数据,实时发送至Kafka Topic,为了简单起见交易订单数据字段如下, 封装到样例类OrderRecord中:/** * 订单实体类(Case Class) * @param orderId 订单ID * @param userId 用户ID * @param orderTime 订单日期时间 * @param ip 下单IP地址 * @param orderMoney 订单金额 * @param orderStatus 订单状态 */ case class OrderRecord( orderId: String, userId: String, orderTime: String, ip: String, orderMoney: Double, orderStatus: Int )
1.4.1 创建 Topic
在整个实时综合案例中,原始的交易订单数据存储【orderTopic】,经过ETL后交易订单数据 存【orderEtlTopic】,关于Topic创建等操作命令如下:# 启动Zookeeper zookeeper-daemon.sh start # 启动Kafka Broker kafka-server-start.sh -daemon /export/server/kafka/config/server.properties # 查看Topic信息 kafka-topics.sh --list --zookeeper node1.itcast.cn:2181/kafka200 # 创建topic kafka-topics.sh --create --zookeeper node1.itcast.cn:2181/kafka200 --replication-factor 1 --partitions 3 --topic orderTopic # 模拟生产者 kafka-console-producer.sh --broker-list node1.itcast.cn:9092 --topic orderTopic # 模拟消费者 kafka-console-consumer.sh --bootstrap-server node1.itcast.cn:9092 --topic orderTopic --from-beginning # 删除topic kafka-topics.sh --delete --zookeeper node1.itcast.cn:2181/kafka200 --topic orderTopic # 创建topic kafka-topics.sh --create --zookeeper node1.itcast.cn:2181/kafka200 --replication-factor 1 --partitions 3 --topic orderEtlTopic # 模拟消费者 kafka-console-consumer.sh --bootstrap-server node1.itcast.cn:9092 --topic orderEtlTopic --from-beginning # 删除topic kafka-topics.sh --delete --zookeeper node1.itcast.cn:2181/kafka200 --topic orderEtlTopic
1.4.2 模拟数据
编写程序,实时产生交易订单数据,使用Json4J类库转换数据为JSON字符,发送Kafka Topic 中,代码如下:package cn.itcast.spark.mock import java.util.Properties import org.apache.commons.lang3.time.FastDateFormat import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord} import org.apache.kafka.common.serialization.StringSerializer import org.json4s.jackson.Json import scala.util.Random /** * 模拟生产订单数据,发送到Kafka Topic中 * Topic中每条数据Message类型为String,以JSON格式数据发送 * 数据转换: * 将Order类实例对象转换为JSON格式字符串数据(可以使用json4s类库) */ object MockOrderProducer { def main(args: Array[String]): Unit = { var producer: KafkaProducer[String, String] = null try { // 1. Kafka Client Producer 配置信息 val props = new Properties() props.put("bootstrap.servers", "node1.itcast.cn:9092") props.put("acks", "1") props.put("retries", "3") props.put("key.serializer", classOf[StringSerializer].getName) props.put("value.serializer", classOf[StringSerializer].getName) // 2. 创建KafkaProducer对象,传入配置信息 producer = new KafkaProducer[String, String](props) // 随机数实例对象 val random: Random = new Random() // 订单状态:订单打开 0,订单取消 1,订单关闭 2,订单完成 3 val allStatus =Array(0, 1, 2, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0) while(true){ // 每次循环 模拟产生的订单数目 val batchNumber: Int = random.nextInt(2) + 1 (1 to batchNumber).foreach{number => val currentTime: Long = System.currentTimeMillis() val orderId: String = s"${getDate(currentTime)}%06d".format(number) val userId: String = s"${1 + random.nextInt(5)}%08d".format(random.nextInt(1000)) val orderTime: String = getDate(currentTime, format="yyyy-MM-dd HH:mm:ss.SSS") val orderMoney: String = s"${5 + random.nextInt(500)}.%02d".format(random.nextInt(100)) val orderStatus: Int = allStatus(random.nextInt(allStatus.length)) // 3. 订单记录数据 val orderRecord: OrderRecord = OrderRecord( orderId, userId, orderTime, getRandomIp, orderMoney.toDouble, orderStatus ) // 转换为JSON格式数据 val orderJson = new Json(org.json4s.DefaultFormats).write(orderRecord) println(orderJson) // 4. 构建ProducerRecord对象 val record = new ProducerRecord[String, String]("orderTopic", orderJson) // 5. 发送数据:def send(messages: KeyedMessage[K,V]*), 将数据发送到Topic producer.send(record) } Thread.sleep(random.nextInt(10) * 100 + 500) } }catch { case e: Exception => e.printStackTrace() }finally { if(null != producer) producer.close() } } /**=================获取当前时间=================*/ def getDate(time: Long, format: String = "yyyyMMddHHmmssSSS"): String = { val fastFormat: FastDateFormat = FastDateFormat.getInstance(format) val formatDate: String = fastFormat.format(time) // 格式化日期 formatDate } /**================= 获取随机IP地址 =================*/ def getRandomIp: String = { // ip范围 val range: Array[(Int, Int)] = Array( (607649792,608174079), //36.56.0.0-36.63.255.255 (1038614528,1039007743), //61.232.0.0-61.237.255.255 (1783627776,1784676351), //106.80.0.0-106.95.255.255 (2035023872,2035154943), //121.76.0.0-121.77.255.255 (2078801920,2079064063), //123.232.0.0-123.235.255.255 (-1950089216,-1948778497),//139.196.0.0-139.215.255.255 (-1425539072,-1425014785),//171.8.0.0-171.15.255.255 (-1236271104,-1235419137),//182.80.0.0-182.92.255.255 (-770113536,-768606209),//210.25.0.0-210.47.255.255 (-569376768,-564133889) //222.16.0.0-222.95.255.255 ) // 随机数:IP地址范围下标 val random = new Random() val index = random.nextInt(10) val ipNumber: Int = range(index)._1 + random.nextInt(range(index)._2 - range(index)._1) // 转换Int类型IP地址为IPv4格式 number2IpString(ipNumber) } /**=================将Int类型IPv4地址转换为字符串类型=================*/ def number2IpString(ip: Int): String = { val buffer: Array[Int] = new Array[Int](4) buffer(0) = (ip >> 24) & 0xff buffer(1) = (ip >> 16) & 0xff buffer(2) = (ip >> 8) & 0xff buffer(3) = ip & 0xff // 返回IPv4地址 buffer.mkString(".") } }
标签:实战,lazy,String,val,实时,ckpt,Spark,config,spark 来源: https://www.cnblogs.com/wq-9/p/16224466.html