其他分享
首页 > 其他分享> > Kudu设计要点面面观(下篇)

Kudu设计要点面面观(下篇)

作者:互联网

目录


参考:《Kudu设计要点面面观(上篇)》,本文适用知识共享-署名-相同方式共享(CC-BY-SA)3.0协议。


事务与数据一致性


Kudu支持单行事务,但不支持多行事务(Kudu中对多行操作不满足ACID原则中的原子性),也不支持事务回滚,这点与HBase是相同的。


前面已经提到过,Kudu采用与关系数据库类似的多版本并发控制(MVCC)机制来实现事务隔离,通过为数据添加时间戳的方式实现。该时间戳不能在写入时由用户添加,但可以在执行读取(Scan)操作时指定,这样就可以读取到历史数据(UndoFile中的数据)。Kudu提供两种读模式:read-latest和read-at-snapshot,分别对应读取当前的快照以及按时间戳读取历史快照。


对于写操作而言,Kudu也提供了两种一致性模型:快照一致性(snapshot consistency)和外部一致性(external consistency)。下面来分析一下它们。


快照一致性比较简单,只保证当前执行写操作的客户端能看到自己提交的最新数据,而不保障跨客户端的可见性。它是Kudu默认的一致性模型,一般情况下都够用。但是特殊情况也同样存在:考虑用Kudu作为点击流数仓的情景,客户端A在某时刻写入了点击事件x,客户端B紧随其后写入事件y,并且这两个事件之间具有关联性。要想让所有客户端都能达到外部一致性(及时取到最新数据),必须手动将写操作完成后产生的时间戳传播(propagate)到其他客户端上,这种方式在Kudu中叫client-propagated。


很显然,client-propagated方案需要频繁地交换时间戳,其overhead比较高,所以Kudu也借鉴了Google Spanner的思路,实现了commit-wait一致性。


我们已经可以发现,保证外部一致性的重点在于事务的版本号(时间戳)必须足够准,并且每台服务器的时间都要保持精确的同步。Google Spanner提出的时间同步方案叫做TrueTime,需要原子钟等硬件的支持,可以将对时间的认知误差控制在±4ms之内。但Kudu集群都是建立在普通商用服务器上的,所以只能靠NTP和算法近似实现,该算法名为HybridTime,不详细展开了,看官可以参考论文《Technical Report: HybridTime - Accessible Global Consistency with High Clock Uncertainty》。


下图粗浅地示出commit-wait机制的原理。

640?wx_fmt=other

当一个事务获取到锁并开始执行时,它会先生成自己的时间戳,再开始事务操作。当事务执行完之后,还必须要保证后发生的事务时间戳不能比自己的时间戳小,因此最终要等待2倍的误差时间,才能结束本次事务并释放锁。


与Impala、Spark集成


Kudu本身并没有SQL外壳,仅仅提供了Java和C++ API。但是Kudu和查询引擎Impala可以近乎无缝地结合在一起,为Kudu提供SQL能力。下面的简图示出用Impala SQL对Kudu表执行简单查询的流程。

640?wx_fmt=other

可见,在Impala端会解析SQL语句并生成查询计划,然后作为客户端去连接Kudu集群,执行增删改查操作。关于Kudu与Impala的集成和查询方法,官方文档已经写得非常详细,不再赘述。


相对而言,我们更多地是编写Spark程序来执行一些对Kudu表数据的复杂分析任务。Maven上已经有Kudu与Spark的connector包,其坐标如下。



<!-- scala.bin.version: 2.11, kudu.version: 1.5.0 -->	
<dependency>	
<groupId>org.apache.kudu</groupId>	
<artifactId>kudu-spark2_${scala.bin.version}</artifactId>	
<version>${kudu.version}</version>	
</dependency>	
<dependency>	
<groupId>org.apache.kudu</groupId>	
<artifactId>kudu-spark2-tools_${scala.bin.version}</artifactId>	
<version>${kudu.version}</version>	
</dependency>

引入依赖之后,就可以用Spark SQL以及KuduContext来操作Kudu表了,一个简单的示例代码如下。



import org.apache.kudu.client._	
import collection.JavaConverters._	
	
// Read a table from Kudu	
val df = spark.read	
  .options(Map("kudu.master" -> "kudu.master:7051", "kudu.table" -> "kudu_table"))	
  .format("kudu").load	
	
// Query using the Spark API...	
df.select("id").filter("id >= 5").show()	
	
// ...or register a temporary table and use SQL	
df.createOrReplaceTempView("kudu_table")	
val filteredDF = spark.sql("select id from kudu_table where id >= 5").show()	
	
// Use KuduContext to create, delete, or write to Kudu tables	
val kuduContext = new KuduContext("kudu.master:7051", spark.sparkContext)	
	
// Create a new Kudu table from a DataFrame schema	
// NB: No rows from the DataFrame are inserted into the table	
kuduContext.createTable(	
"test_table", df.schema, Seq("key"),	
new CreateTableOptions()	
        .setNumReplicas(1)	
        .addHashPartitions(List("key").asJava, 3))	
	
// Insert data	
kuduContext.insertRows(df, "test_table")	
	
// Delete data	
kuduContext.deleteRows(filteredDF, "test_table")	
	
// Upsert data	
kuduContext.upsertRows(df, "test_table")	
	
// Update data	
val alteredDF = df.select("id", $"count" + 1)	
kuduContext.updateRows(filteredRows, "test_table")	
	
// Data can also be inserted into the Kudu table using the data source, though the methods on	
// KuduContext are preferred	
// NB: The default is to upsert rows; to perform standard inserts instead, set operation = insert	
// in the options map	
// NB: Only mode Append is supported	
df.write	
  .options(Map("kudu.master"-> "kudu.master:7051", "kudu.table"-> "test_table"))	
  .mode("append")	
  .format("kudu").save	
	
// Check for the existence of a Kudu table	
kuduContext.tableExists("another_table")	
	
// Delete a Kudu table	
kuduContext.deleteTable("unwanted_table")

需要注意的是,Spark on Kudu不支持有大写字母和非ASCII字符的表名、列名,必须预先处理。另外,不等于(<>)和or谓词不会下推给Kudu,而是由Spark任务来处理。like谓词同理,当有通配符时,只有以通配符结尾的语句(如like 'some%')才会下推给Kudu。


Benchmarking


在TPC-H数据集上进行测试,Impala on Kudu的查询时间比Impala on HDFS (Parquet) 平均缩短了三成。


640?wx_fmt=other


使用TPC-H中的lineitem表(原始数据大小约62GB)进行Impala on Kudu与Phoenix on HBase的对比测试,包括数据的载入与4种查询。Phoenix on HBase的表划分为100个哈希分区,Kudu表划分为100个Tablet。


640?wx_fmt=other

测试结果如下。

640?wx_fmt=other

可见,Phoenix on HBase的方案只有在基于RowKey的查询时有性能优势,并且领先幅度不大。而Impala on Kudu在执行基于列的查询和全表扫描时,效率远远高于HBase。当然,这与HBase偏OLTP的设计思想有关,并不能说明Kudu可以完全取代HBase。


另外,论文中还用了雅虎的YCSB数据集测试随机读写能力。

640?wx_fmt=other

结果如下,整体上看,Kudu的随机读写与HBase相比都或多或少地落后,其中zipfian数据集(符合Zipf's Law,即长尾分布)上的差距比较大,而uniform数据集(符合均匀分布)上的差距比较小。这也是自然的,要想兼顾OLAP的效率,必然要在OLTP方面做出一些牺牲。Kudu也在持续优化随机读写,不过那是新版的事情了。

640?wx_fmt=other

当前的主要不足


Kudu现在可以基本满足我们对于OLTP+OLAP混合型分析的需求,但是它毕竟还年轻,采用的设计方案也较新,因此不可避免地还存在一些短板,在实际使用时需要提前避开一些坑。以我们生产环境中部署的1.5版本举例如下:


简单调优方法

我们的Kudu服务与Hadoop基础服务和Impala一起部署在10个节点上(每个节点双路E5 12C/24T,256G RAM,6TB SAS HDD),3个Master,10个TServer。以下是我们根据集群实际情况对一些主要参数进行的调优:








— THE END —

640?wx_fmt=jpeg

标签:下篇,table,面面观,kudu,Kudu,HBase,Impala,Spark
来源: https://blog.51cto.com/u_14222592/2897061