其他分享
首页 > 其他分享> > 如何从Cassandra增加数据流读取并行性

如何从Cassandra增加数据流读取并行性

作者:互联网

我试图将大量数据(2 TB,30kkk行)从Cassandra导出到BigQuery.我的所有基础设施都在GCP上.我的Cassandra集群有4个节点(4个vCPU,26 GB内存,每个2000 GB PD(HDD)).集群中有一个种子节点.我需要在写入BQ之前转换我的数据,所以我使用的是Dataflow.工人类型是n1-highmem-2.工人和Cassandra实例位于Europe-west1-c的同一区域.我对Cassandra的限制:

Cassandra limits settings

我负责读取转换的部分管道代码位于here.

自动缩放

问题是,当我没有设置–numWorkers时,自动调整设置的工人数量(平均2个工人):

Worker number with autoscaling

负载均衡

当我设置–numWorkers = 15时,读取速率不会增加,只有2名工作人员与Cassandra通信(我可以从iftop告诉它,只有这些工作者有CPU负载~60%).

同时Cassandra节点没有很多负载(CPU使用率为20-30%).种子节点的网络和磁盘使用率比其他节点大约高2倍,但不会太高,我认为:

Network usage of the seed

Disk usage of the seed node

对于非种子节点:

Network usage of the not seed

Disk usage of the not seed node

管道发射警告

管道启动时我有一些警告:

WARNING: Size estimation of the source failed: 
org.apache.beam.sdk.io.cassandra.CassandraIO$CassandraSource@7569ea63
com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s) tried for query failed (tried: /10.132.9.101:9042 (com.datastax.driver.core.exceptions.TransportException: [/10.132.9.101:9042] Cannot connect), /10.132.9.102:9042 (com.datastax.driver.core.exceptions.TransportException: [/10.132.9.102:9042] Cannot connect), /10.132.9.103:9042 (com.datastax.driver.core.exceptions.TransportException: [/10.132.9.103:9042] Cannot connect), /10.132.9.104:9042 [only showing errors of first 3 hosts, use getErrors() for more details])

我的Cassandra集群位于GCE本地网络中,它接收到一些查询是从我的本地计算机生成的,无法访问集群(我正在使用Dataflow Eclipse插件启动管道,如here所述).这些查询是关于表的大小估计.我可以手动指定尺寸估算或从GCE实例启动pipline吗?或者我可以忽略这些警告吗?它对阅读率有影响吗?

我试图从GCE VM启动管道.连接没有问题.我的表中没有varchar列,但是我收到了这样的警告(datastax驱动程序中没有编解码器[varchar< - > java.lang.Long]). :

WARNING: Can't estimate the size
com.datastax.driver.core.exceptions.CodecNotFoundException: Codec not found for requested operation: [varchar <-> java.lang.Long]
        at com.datastax.driver.core.CodecRegistry.notFound(CodecRegistry.java:741)
        at com.datastax.driver.core.CodecRegistry.createCodec(CodecRegistry.java:588)
        at com.datastax.driver.core.CodecRegistry.access$500(CodecRegistry.java:137)
        at com.datastax.driver.core.CodecRegistry$TypeCodecCacheLoader.load(CodecRegistry.java:246)
        at com.datastax.driver.core.CodecRegistry$TypeCodecCacheLoader.load(CodecRegistry.java:232)
        at com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3628)
        at com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2336)
        at com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2295)
        at com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2208)
        at com.google.common.cache.LocalCache.get(LocalCache.java:4053)
        at com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:4057)
        at com.google.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4986)
        at com.datastax.driver.core.CodecRegistry.lookupCodec(CodecRegistry.java:522)
        at com.datastax.driver.core.CodecRegistry.codecFor(CodecRegistry.java:485)
        at com.datastax.driver.core.CodecRegistry.codecFor(CodecRegistry.java:467)
        at com.datastax.driver.core.AbstractGettableByIndexData.codecFor(AbstractGettableByIndexData.java:69)
        at com.datastax.driver.core.AbstractGettableByIndexData.getLong(AbstractGettableByIndexData.java:152)
        at com.datastax.driver.core.AbstractGettableData.getLong(AbstractGettableData.java:26)
        at com.datastax.driver.core.AbstractGettableData.getLong(AbstractGettableData.java:95)
        at org.apache.beam.sdk.io.cassandra.CassandraServiceImpl.getTokenRanges(CassandraServiceImpl.java:279)
        at org.apache.beam.sdk.io.cassandra.CassandraServiceImpl.getEstimatedSizeBytes(CassandraServiceImpl.java:135)
        at org.apache.beam.sdk.io.cassandra.CassandraIO$CassandraSource.getEstimatedSizeBytes(CassandraIO.java:308)
        at org.apache.beam.runners.direct.BoundedReadEvaluatorFactory$BoundedReadEvaluator.startDynamicSplitThread(BoundedReadEvaluatorFactory.java:166)
        at org.apache.beam.runners.direct.BoundedReadEvaluatorFactory$BoundedReadEvaluator.processElement(BoundedReadEvaluatorFactory.java:142)
        at org.apache.beam.runners.direct.TransformExecutor.processElements(TransformExecutor.java:146)
        at org.apache.beam.runners.direct.TransformExecutor.run(TransformExecutor.java:110)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)

管道读取代码

// Read data from Cassandra table
PCollection<Model> pcollection = p.apply(CassandraIO.<Model>read()
        .withHosts(Arrays.asList("10.10.10.101", "10.10.10.102", "10.10.10.103", "10.10.10.104")).withPort(9042)
        .withKeyspace(keyspaceName).withTable(tableName)
        .withEntity(Model.class).withCoder(SerializableCoder.of(Model.class))
        .withConsistencyLevel(CASSA_CONSISTENCY_LEVEL));

// Transform pcollection to KV PCollection by rowName
PCollection<KV<Long, Model>> pcollection_by_rowName = pcollection
        .apply(ParDo.of(new DoFn<Model, KV<Long, Model>>() {
            @ProcessElement
            public void processElement(ProcessContext c) {
                c.output(KV.of(c.element().rowName, c.element()));
            }
        }));

拆分数(Stackdriver日志)

W  Number of splits is less than 0 (0), fallback to 1 
I  Number of splits is 1 
W  Number of splits is less than 0 (0), fallback to 1 
I  Number of splits is 1 
W  Number of splits is less than 0 (0), fallback to 1 
I  Number of splits is 1 

我试过了什么

没有效果:

>将读取一致性级别设置为ONE
> nodetool setstreamthroughput 1000,nodetool setinterdcstreamthroughput 1000
>增加Cassandra读取并发性(在cassandra.yaml中):concurrent_reads:32
>设置不同数量的工人1-40.

一些影响:
 我将@Spkff建议设为numSplits = 10.现在我可以在日志中看到:

I  Murmur3Partitioner detected, splitting 
W  Can't estimate the size 
W  Can't estimate the size 
W  Number of splits is less than 0 (0), fallback to 10 
I  Number of splits is 10 
W  Number of splits is less than 0 (0), fallback to 10 
I  Number of splits is 10 
I  Splitting source org.apache.beam.sdk.io.cassandra.CassandraIO$CassandraSource@6d83ee93 produced 10 bundles with total serialized response size 20799 
I  Splitting source org.apache.beam.sdk.io.cassandra.CassandraIO$CassandraSource@25d02f5c produced 10 bundles with total serialized response size 19359 
I  Splitting source [0, 1) produced 1 bundles with total serialized response size 1091 
I  Murmur3Partitioner detected, splitting 
W  Can't estimate the size 
I  Splitting source [0, 0) produced 0 bundles with total serialized response size 76 
W  Number of splits is less than 0 (0), fallback to 10 
I  Number of splits is 10 
I  Splitting source org.apache.beam.sdk.io.cassandra.CassandraIO$CassandraSource@2661dcf3 produced 10 bundles with total serialized response size 18527 

但我有另一个例外:

java.io.IOException: Failed to start reading from source: org.apache.beam.sdk.io.cassandra.Cassandra...
(5d6339652002918d): java.io.IOException: Failed to start reading from source: org.apache.beam.sdk.io.cassandra.CassandraIO$CassandraSource@5f18c296
    at com.google.cloud.dataflow.worker.WorkerCustomSources$BoundedReaderIterator.start(WorkerCustomSources.java:582)
    at com.google.cloud.dataflow.worker.util.common.worker.ReadOperation$SynchronizedReaderIterator.start(ReadOperation.java:347)
    at com.google.cloud.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:183)
    at com.google.cloud.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:148)
    at com.google.cloud.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:68)
    at com.google.cloud.dataflow.worker.DataflowWorker.executeWork(DataflowWorker.java:336)
    at com.google.cloud.dataflow.worker.DataflowWorker.doWork(DataflowWorker.java:294)
    at com.google.cloud.dataflow.worker.DataflowWorker.getAndPerformWork(DataflowWorker.java:244)
    at com.google.cloud.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.doWork(DataflowBatchWorkerHarness.java:135)
    at com.google.cloud.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.call(DataflowBatchWorkerHarness.java:115)
    at com.google.cloud.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.call(DataflowBatchWorkerHarness.java:102)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
Caused by: com.datastax.driver.core.exceptions.SyntaxError: line 1:53 mismatched character 'p' expecting '$'
    at com.datastax.driver.core.exceptions.SyntaxError.copy(SyntaxError.java:58)
    at com.datastax.driver.core.exceptions.SyntaxError.copy(SyntaxError.java:24)
    at com.datastax.driver.core.DriverThrowables.propagateCause(DriverThrowables.java:37)
    at com.datastax.driver.core.DefaultResultSetFuture.getUninterruptibly(DefaultResultSetFuture.java:245)
    at com.datastax.driver.core.AbstractSession.execute(AbstractSession.java:68)
    at com.datastax.driver.core.AbstractSession.execute(AbstractSession.java:43)
    at org.apache.beam.sdk.io.cassandra.CassandraServiceImpl$CassandraReaderImpl.start(CassandraServiceImpl.java:80)
    at com.google.cloud.dataflow.worker.WorkerCustomSources$BoundedReaderIterator.start(WorkerCustomSources.java:579)
    ... 14 more
Caused by: com.datastax.driver.core.exceptions.SyntaxError: line 1:53 mismatched character 'p' expecting '$'
    at com.datastax.driver.core.Responses$Error.asException(Responses.java:144)
    at com.datastax.driver.core.DefaultResultSetFuture.onSet(DefaultResultSetFuture.java:179)
    at com.datastax.driver.core.RequestHandler.setFinalResult(RequestHandler.java:186)
    at com.datastax.driver.core.RequestHandler.access$2500(RequestHandler.java:50)
    at com.datastax.driver.core.RequestHandler$SpeculativeExecution.setFinalResult(RequestHandler.java:817)
    at com.datastax.driver.core.RequestHandler$SpeculativeExecution.onSet(RequestHandler.java:651)
    at com.datastax.driver.core.Connection$Dispatcher.channelRead0(Connection.java:1077)
    at com.datastax.driver.core.Connection$Dispatcher.channelRead0(Connection.java:1000)
    at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:363)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:349)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:341)
    at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:287)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:363)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:349)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:341)
    at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:363)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:349)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:341)
    at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:293)
    at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:267)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:363)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:349)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:341)
    at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1334)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:363)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:349)
    at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:926)
    at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:129)
    at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:642)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:565)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:479)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:441)
    at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858)
    at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144)
    ... 1 more

也许有一个错误:CassandraServiceImpl.java#L220

这句话看起来像是错误的:CassandraServiceImpl.java#L207

我对CassandraIO代码所做的更改

正如@jkff所提议的那样,我以我需要的方式改变了CassandraIO:

@VisibleForTesting
protected List<BoundedSource<T>> split(CassandraIO.Read<T> spec,
                                              long desiredBundleSizeBytes,
                                              long estimatedSizeBytes) {
  long numSplits = 1;
  List<BoundedSource<T>> sourceList = new ArrayList<>();
  if (desiredBundleSizeBytes > 0) {
    numSplits = estimatedSizeBytes / desiredBundleSizeBytes;
  }
  if (numSplits <= 0) {
    LOG.warn("Number of splits is less than 0 ({}), fallback to 10", numSplits);
    numSplits = 10;
  }

  LOG.info("Number of splits is {}", numSplits);

  Long startRange = MIN_TOKEN;
  Long endRange = MAX_TOKEN;
  Long startToken, endToken;

  String pk = "$pk";
  switch (spec.table()) {
  case "table1":
          pk = "table1_pk";
          break;
  case "table2":
  case "table3":
          pk = "table23_pk";
          break;
  }

  endToken = startRange;
  Long incrementValue = endRange / numSplits - startRange / numSplits;
  String splitQuery;
  if (numSplits == 1) {
    // we have an unique split
    splitQuery = QueryBuilder.select().from(spec.keyspace(), spec.table()).toString();
    sourceList.add(new CassandraIO.CassandraSource<T>(spec, splitQuery));
  } else {
    // we have more than one split
    for (int i = 0; i < numSplits; i++) {
      startToken = endToken;
      endToken = startToken + incrementValue;
      Select.Where builder = QueryBuilder.select().from(spec.keyspace(), spec.table()).where();
      if (i > 0) {
        builder = builder.and(QueryBuilder.gte("token(" + pk + ")", startToken));
      }
      if (i < (numSplits - 1)) {
        builder = builder.and(QueryBuilder.lt("token(" + pk + ")", endToken));
      }
      sourceList.add(new CassandraIO.CassandraSource(spec, builder.toString()));
    }
  }
  return sourceList;
}

解决方法:

我认为这应该被归类为CassandraIO中的一个错误.我提交了BEAM-3424.您可以尝试构建自己的Beam版本,默认值1更改为100或类似的东西,而此问题正在修复.

我还在尺寸估算期间提交了BEAM-3425的bug.

标签:java,cassandra,google-bigquery,google-cloud-platform,google-cloud-dataflow
来源: https://codeday.me/bug/20190701/1348021.html