flume MemoryChannel 源代码解析
作者:互联网
1.先分析三个LinkedBlockingDeque<Event>类型的takeList,putList,queue
putList: 存放的是来自source生产的数据,通过调用doPut(Event event)方法,它是怎样到queue的,在每次运行doCommit的时候,会循环放到queue,事实上doCommit()放法仅仅做了putlist交给queue,
synchronized(queueLock) { if(puts > 0 ) { while(!putList.isEmpty()) { if(!queue.offer(putList.removeFirst())) { throw new RuntimeException("Queue add failed, this shouldn't be able to happen"); } } } putList.clear(); takeList.clear(); }
takelist: 每次sink消费。都会加到takelist,一般不起什么作用,可是操作失败。rollback就起作用了
protected void doRollback() {
int takes = takeList.size(); synchronized(queueLock) { Preconditions.checkState(queue.remainingCapacity() >= takeList.size(), "Not enough space in memory channel " + "queue to rollback takes. This should never happen, please report"); while(!takeList.isEmpty()) { queue.addFirst(takeList.removeLast()); } putList.clear(); } bytesRemaining.release(putByteCounter); putByteCounter = 0; takeByteCounter = 0; queueStored.release(takes); channelCounter.setChannelSize(queue.size()); } }
queue:存放的即将传递给sink的全部数据.
2.參数相应关系
transactionCapacity:设置takelist,putlist最大容量
capacity: 设置queue的最大容量
keep-alive: Semaphore的tryAcquire的timeout的參数,添加到takelist,putlist,queue超时时间
标签:flume,MemoryChannel,takes,takelist,putList,queue,takeList,源代码,size 来源: https://www.cnblogs.com/mqxnongmin/p/10571106.html