Apache CarbonData 2.0 RC2:用50行代码构建全场景数据湖
作者:互联网
今夏最火综艺《青春有你2》里面刘雨昕从默默无闻一直冲到第二名,让众人感叹,只有在唱、跳、舞蹈、Rap等多方面能力俱佳,才是真偶像担当。在看了5月份初发布的CarbonData 2.0 RC2之后,笔者不禁感觉CarbonData在新版本中所体现的全方面能力,和刘雨昕一样具有"C位出道"的巨大潜力。CarbonData作为目前为数不多由中国公司贡献的Apache顶级项目,从16年正式"出道"以来,一直努力践行着实践和探索的开源精神。笔者在看到CarbonData的成长、ASF中华人力量的崛起之后,也相信国内的开源力量可以继续"不忘初心、砥砺前行",借用青春有你2里面的一句歌词:“OpenSource Made in China,出发,多远都可以到达!”
从CarbonData社区发布的版本信息看,CarbonData 2.0提供了一种新的融合数据存储方案,以一份数据同时支持多种应用场景:明细查询、交互分析、数据实时同步和更新、ETL、AI、时序聚合、空间检索等,实现EB级别数据规模,查询性能秒级响应,并通过计算存储分离优化,大大降低了数据湖成本。本文笔者将着重介绍如何快速构建一个明细查询、交互分析、ETL的数据湖,后续将会大家带来CarbonData在数据实时同步和更新、AI、时序时空聚合等场景中的应用,敬请关注。
前言
随着数据量的增大,如果基于传统的数据存储解决方案,即使用不同的数据库产品来分别处理不同业务类型,势必会带来成本和运维的巨大压力。以互联网行业用户行为数据为例,随着5G+AI的到来,数据量正呈现快速增长趋势,目前对于一个日活1000万的APP应用来说,平均每天约产生500亿条用户行为数据,一年的数据存储量约10PB。PB级别数据可以满足不同类型业务的需求,如明细查询类业务:用户行为查询、订单查询、交易查询;聚合分析类业务:A/B Test、聚合分析;ELT类业务:热销榜、PV/UV等。下表中,我们分别给出了明细查询、聚合分析、ETL业务的典型查询、可选数据存储方案和方案痛点。
类型 | 典型查询 | 可选数据库 | 痛点 |
---|---|---|---|
明细查询 | select * from fact where userid IS ‘1323’ | HBase、ES等 | 成本高、运维难 |
聚合分析 | select * from fact GROUPBY abtestid | ClickHouse、Oracle等 | 不支持横向扩展 |
ETL | select * from facttable JOIN dimensiontable | Spark on Parquet、Hive on ORC等 | 无索引,速度慢 |
具体来说,(1) 成本高:以HBase为例,单台RegionServer可维护不超过10TB的数据,面对10PB的数据存储时,需要100台计算节点部署RegionServer,每台计算节点500元/月(4U16G),计算成本共计50万/月,每PB存储的云硬盘成本为70万/月,总成本=120万/月;(2) 计算慢:Spark on Parquet可以将数据存储在对象存储中,成本大大降低,每PB存储的对象存储成本为8万/月,上层的100台计算节点假设每天开机8小时,计算成本15万/月,总成本约23万/月,成本可以降低5倍。但是由于无索引,只能通过暴力扫描的方式进行查询和计算,在暴力计算时系统往往受限于对象存储带宽,假设对象存储带宽为20GB/s,查询需要14个小时。
由上可见,Nosql数据库虽然具有较好的数据索引机制,但是“太贵”,传统的Hadoop生态数据仓库将数据放在对象存储上,但是“太慢”,这两者各自的局限性,使得我们进行EB级别数据仓库选型时,面临着这一个鱼与熊掌不可兼得的选择题。
可以像NoSQL数据库一样,构建高效索引,又可以和Spark/Hive一样,享受高度可扩展性的数据并行处理,并能利用近乎无限的低成本的对象存储资源,满足这种“又方便又快又便宜”的任性就是CarbonData的使命。
接下来的内容,我们通过演示介绍CarbonData 2.0 RC2中的数据湖构建方法。。
一、CarbonData数据湖架构
过去的几个月,Apache CarbonData 2.0一直在努力实现一种低成本全场景数据湖,力求做到"一份数据到处使用"的愿景。下图给出了一个基于CarbonData数据湖系统架构图,数据首先由Kafka完成数据收集,并由Flink完成数据清洗、预处理 。其次,Flink将数据直接写入对象存储。最后,CarbonData对对象存储的数据构建索引,并供上层多种计算引擎计算和查询,用户可使用Spark或者Presto对CarbonData数据进行明细查询和聚合分析类业务,使用Spark或者Hive对CarbonData数据进行聚合分析和ETL类业务。
从数据存储的角度看,数据主要包含三要素:元数据、数据、索引,这三要素同样也是构建数据湖的关键。下面,我们主要演示基于CarbonData的数据湖如何解决这几个问题:
-
如何构建元数据?
-
如何写入数据?
-
如何建索引?
下面,我们首先演示如何一键式搭建Kafka+Flink+CarbonData的开发环境,其次介绍CarbonData数据湖中构建元数据、写入数据、建索引的方法。
二、一键式搭建Kafka+Flink+CarbonData演示环境
- 准备1台Linux弹性云服务器
- 下载一键式安装脚本
curl -k -O http://carbondata-publish.obs.myhuaweicloud.com/quick_start_kafka_flink_carbondata.sh
- 快速安装Kafka、Flink、Spark、CarbonData
source quick_start_kafka_flink_carbondata.sh
- 进入Kafka工作目录,新建Topic,并写入数据
创建Topic
kafka_2.12-2.5.0/bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test
写入数据
kafka_2.12-2.5.0/bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic test
>c001,23,china,2016-02-23 09:01:30,china sz,computer design
>c003,23,japan,2016-02-23 08:01:30,japan to,sport speech
>c002,23,india,2016-02-23 07:01:30,india mm,draw write
接下来,我们演示如果使用Flink消费Kafka中的数据,并将数据写入CarbonData数据湖。
三、如何构建元数据?
Spark中创建CarbonData表可以快速构建元数据,元数据存储路径为${table location}/Metadata/schema。
- 建表,并构建排序索引,语法举例如下:
CREATE TABLE person(id STRING, age INT, country STRING, timestamp timestamp, address STRING, skill STRING)
STORED AS carbondata
TBLPROPERTIES('sort_scope'='GLOBAL_SORT','sort_columns'='id, age');
这里,TBLPROPERTIES(‘sort_scope’=‘GLOBAL_SORT’,‘sort_columns’=‘id, age’)代表着数据将按照(id, age)有序存储,当在处理id详单查询或者id、age联合查询时,可以通过二分查找的方式进行数据的快速定位和筛选,相比无排序时,数据查询的时间复杂度从O(N)下降为O(logN)。
四、如何写入数据?
下面给出了基于Flink消费Kafka的数据,并将CarbonData数据落盘的代码示例。下载代码
object FlinkToCarbonDemo {
def main(args: Array[String]): Unit = {
// 1. kafka地址和Topic名称
val parameter = ParameterTool.fromArgs(args)
val kafkaTopic = parameter.get("TOPIC")
val kafkaBootstrapServices = parameter.get("KAFKASERVICES")
val kafkaProperties = new Properties();
kafkaProperties.put("bootstrap.servers", kafkaBootstrapServices)
kafkaProperties.put("auto.commit.interval.ms", "3000")
// 2. 设置表明database名称、表名、表存储位置、临时文件本地存储目录
val tableName = parameter.get("TABLENAME")
val tablePath = parameter.get("TABLEPATH")
val tempPath = parameter.get("TEMPPATH")
// writerProperties代表写操作鉴权信息,当写数据到S3时,需要在writerProperties中配置桶名、AKSK
val writerProperties = newWriterProperties(tempPath)
// carbonProperties代表CarbonDataSDK属性信息,支持配置sdk内存、数据自定义格式等
val carbonProperties = newCarbonProperties()
// Flink定时将数据上传到CarbonData表空间目录是,时间周期阈值为Flink Checkpoint时间
val environment = StreamExecutionEnvironment.getExecutionEnvironment
environment.enableCheckpointing(180000)
// 3. 配置source stream. 这里主要是基于自定义DeserializeSchema完成Kafka数据解析。
// Flink中集成的CarbonDataSDK默认输入数据为字符数组,因此需要将Record解析为Array[AnyRef]或者Array[String]
val stream: DataStream[Array[AnyRef]] = environment.addSource(new FlinkKafkaConsumer011[Array[AnyRef]](
kafkaTopic, new DeserializeSchema(), kafkaProperties
))
// 4. 配置stream sink,这里首先构建carbondatasdk,可选类型为Local和S3.
val factory = CarbonWriterFactory.builder("Local").build(
"default", tableName, tablePath, new Properties, writerProperties, carbonProperties)
val streamSink = StreamingFileSink.forBulkFormat(
new Path(ProxyFileSystem.DEFAULT_URI),
factory
).build()
stream.addSink(streamSink)
// Execute the environment
environment.execute()
streamSink.close()
}
private def newWriterProperties(dataTempPath: String) = {
val properties = new Properties
properties.setProperty(CarbonLocalProperty.DATA_TEMP_PATH, dataTempPath)
properties
}
private def newCarbonProperties() = {
val properties = new Properties
properties
}
}
// 基于自定义DeserializeSchema完成Kafka数据解析示例。例如Kafka中Record为"c001,23,china,2016-02-23 09:01:30,china sz,computer design"时,解析形式为['c001','23','china']的字符数组
class DeserializeSchema extends KafkaDeserializationSchema[Array[AnyRef]] {
override def isEndOfStream(t: Array[AnyRef]): Boolean = {
false
}
override def deserialize(consumerRecord: ConsumerRecord[Array[Byte], Array[Byte]]): Array[AnyRef] = {
new String(consumerRecord.value()).split(",").map(_.trim).asInstanceOf[Array[AnyRef]]
}
override def getProducedType: TypeInformation[Array[AnyRef]] = TypeInformation.of(new TypeHint[Array[AnyRef]] {})
}
当Flink成功消费Kafka数据之后,在{$tablePath}/stage_data/,可以看到成功落盘的CarbonData数据。
在Flink目录中中放置了示例JAR文件,可以直接启动作业尝试Flink入库CarbonData表数据,作业启动示例如下所示:
bin/flink run -d -p 1 -c org.apache.carbon.flink.FlinkToCarbonDemo examples/flinktocarbondemo-1.0.jar \
--TOPIC test \
--KAFKASERVICES localhost:9092 \
--TABLENAME person \
--TABLEPATH "/user/hive/warehouse/person" \
--TEMPPATH "/tmp"
五、如何构建索引?
- Spark中执行"INSERT INTO STAGE"命令触发构建索引。
INSERT INTO person STAGE;
- Spark中尝试查询数据:
SELECT * FROM person;
就此,我们完成了Kafka到Flink到CarbonData的端到端数据湖的简单构建。
结语
本文主要介绍了如何快速构建Kafka到Flink到CarbonData的端到端数据湖DataPipe。后续,我们将继续介绍:(1)如何在数据湖中使用Spark、Hive、Presto访问同一份数据;(2)如何将Mysql等关系型数据库数据同步到CarbonData数据湖中;(3)如何在TensorFlow等AI计算引擎中使用CarbonData。敬请关注。
欢迎大家添加微信ID:xu601450868,加入CarbonData技术交流群。
标签:存储,val,Flink,50,RC2,CarbonData,Array,数据 来源: https://blog.csdn.net/marchpure_312/article/details/105948192