其他分享
首页 > 其他分享> > leveldb实现之写入流程

leveldb实现之写入流程

作者:互联网

leveldb的写入流程是先写入预写日志(WAL)然后写入memtable,最后通过只读memtable刷盘为sstable

知识准备

写入示例

leveldb的写入流程

#include <iostream>
#include "leveldb/db.h"
#include <cassert>

using namespace std;

namespace ld=leveldb;

int main(){
    ld::DB *db;
    ld::Options options;
    options.create_if_missing=true;

    ld::Status status=ld::DB::Open(options,"/tmp/testdb",&db);
    assert(status.ok());
    
    status=db->Put(ld::WriteOptions(),"key1","val1");
    assert(status.ok());
    
    return 0;
}

调用栈

// step 1
Status DB::Put(const WriteOptions& opt, const Slice& key, const Slice& value) { WriteBatch batch; batch.Put(key, value); return Write(opt, &batch); }
// step 2 Status DBImpl::Write(const WriteOptions& options, WriteBatch* updates) { Writer w(&mutex_); w.batch = updates; w.sync = options.sync; w.done = false; ... } 

首先将写入的键值写入到WriteBatch结构,然后调用DBImpl::Write做写入操作,下面会重点说明DBImpl函数

WriteBatch

WriteBatch是DBImpl::Write流程的主要部分,首先需要说明其原理。leveldb为了追求性能,会将write打包为batch然后批量进行wal的写入,因此leveldb的写入是原子性的,即使服务宕机,仍然可以使数据恢复。

WirteBatch只有一个私有成员变量 std::string rep_ ,存放数据

WriteBatch的编码如下:

长度 8字节 4字节 可变长度 可变长度 可变长度
内容 sequence number count record 1 record 2 record 3

 

 

 

sequence number: leveldb的序列号,由于MVCC,此处选择最近recod的sequence number

count 为记录数量

 

record的编码如下:

长度 1字节 可变长度 键大小 可变变长 值大小
内容 类型 键大小 值大小

 

 

 

WriteBatch的成员函数主要包括:

  // Store the mapping "key->value" in the database.
  void Put(const Slice& key, const Slice& value);

  // If the database contains a mapping for "key", erase it.  Else do nothing.
  void Delete(const Slice& key);

  // Clear all updates buffered in this batch.
  void Clear();

  // The size of the database changes caused by this batch.
  //
  // This number is tied to implementation details, and may change across
  // releases. It is intended for LevelDB usage metrics.
  size_t ApproximateSize() const;

  // Copies the operations in "source" to this batch.
  //
  // This runs in O(source size) time. However, the constant factor is better
  // than calling Iterate() over the source batch with a Handler that replicates
  // the operations into this batch.
  void Append(const WriteBatch& source);

  // Support for iterating over the contents of a batch.
  Status Iterate(Handler* handler) const;

 

WriteBachInternal是WriteBatch的友元类,为其辅助函数

写入流程

leveldb有函数DBImpl::Write负责写入,下属将会介绍此函数

DBImpl::Write函数主要逻辑

分步叙述主逻辑

Part 1

Status DBImpl::Write(const WriteOptions& options, WriteBatch* updates) {
  Writer w(&mutex_);
  w.batch = updates;
  w.sync = options.sync;
  w.done = false;

  MutexLock l(&mutex_);
  writers_.push_back(&w);
  while (!w.done && &w != writers_.front()) {
    w.cv.Wait();
  }
  if (w.done) {
    return w.status;
  } 
}

Part 2

Status status = MakeRoomForWrite(updates == nullptr);
  uint64_t last_sequence = versions_->LastSequence();
  Writer* last_writer = &w;
  if (status.ok() && updates != nullptr) {  // nullptr batch is for compactions
    WriteBatch* write_batch = BuildBatchGroup(&last_writer);
    // write_batch只需要写入一个seq
    WriteBatchInternal::SetSequence(write_batch, last_sequence + 1);
    last_sequence += WriteBatchInternal::Count(write_batch);

    // Add to log and apply to memtable.  We can release the lock
    // during this phase since &w is currently responsible for logging
    // and protects against concurrent loggers and concurrent writes
    // into mem_.
    {
      // 此处解锁,其他线程获取锁之后,执行加入writers_队列的动作,然后阻塞在条件变量上
      // 在线程[t1,t2,t3],第一次执行时batch中只会有t1线程的内容,随后t2和t3才会加入
      mutex_.Unlock();
      status = log_->AddRecord(WriteBatchInternal::Contents(write_batch));
      bool sync_error = false;
      if (status.ok() && options.sync) {
        status = logfile_->Sync();
        if (!status.ok()) {
          sync_error = true;
        }
      }
      if (status.ok()) {
        status = WriteBatchInternal::InsertInto(write_batch, mem_);
      }
      mutex_.Lock();
      if (sync_error) {
        // The state of the log file is indeterminate: the log record we
        // just added may or may not show up when the DB is re-opened.
        // So we force the DB into a mode where all future writes fail.
        RecordBackgroundError(status);
      }
    }
    if (write_batch == tmp_batch_) tmp_batch_->Clear();

    versions_->SetLastSequence(last_sequence);
  }

Part 3

while (true) {
    Writer* ready = writers_.front();
    writers_.pop_front();
    // 不是队首元素,则标记该写入已经完成
    if (ready != &w) {
      ready->status = status;
      ready->done = true;
      ready->cv.Signal(); // 通知
    }
    if (ready == last_writer) break;
  }

  // 通知
  if (!writers_.empty()) {
    writers_.front()->cv.Signal();
  }

主要逻辑是写入完成的出队,并且更新其写入状态

关键函数分析

MakeRoomForWrite

Status DBImpl::MakeRoomForWrite(bool force); 函数的作用是:处理L0写入过快的问题,处理memtable和immemtable以及wal,以及是否进行compaction 参数force标识是否立即刷盘   函数的主要逻辑:
  1. 根据参数force是否立即刷盘,然后决定是否允许延迟操,由变量allow_delay标识
  2. 如果bg_error_发生错误,退出循环,并返回error状态
  3. 如果allow_delay为ture,而且L0的文件数大于kL0_SlowdownWritesTrigger(默认值为8),则写入限速1ms,在sleep之前释放锁mutex_,不阻塞其他线程逻辑,并将allow_deply设置为false,单次写入只允许限速一次
  4. 如果不立即刷盘,而且memtable的近似大小仍未达到write_buffer_size,则直接退出函数,什么也不用做
  5. 代码走到此处,要么需要立即刷盘,要么大小超过write_buffer_size,如果此时存在immemtable,则通过条件变量阻塞,只到compaction完成(immemtable刷盘成功)
  6. 如果L0的文件数目超过kL0_StopWritesTrigger(默认12),则写入停止,也是通过条件变量实现,等待compaction将L0的文件数减少
  7. 最后的情况,就是创建Log文件句柄,创建memtable文件句柄,将就旧的的memtable变为immemtable,然后判断是否需要通过异步调度compatiion动作
 

 

标签:status,WriteBatch,leveldb,const,memtable,流程,写入,batch
来源: https://www.cnblogs.com/vincent72143/p/16257501.html