leveldb实现之写入流程
作者:互联网
leveldb的写入流程是先写入预写日志(WAL)然后写入memtable,最后通过只读memtable刷盘为sstable
知识准备
写入示例
leveldb的写入流程
#include <iostream> #include "leveldb/db.h" #include <cassert> using namespace std; namespace ld=leveldb; int main(){ ld::DB *db; ld::Options options; options.create_if_missing=true; ld::Status status=ld::DB::Open(options,"/tmp/testdb",&db); assert(status.ok()); status=db->Put(ld::WriteOptions(),"key1","val1"); assert(status.ok()); return 0; }
调用栈
// step 1
Status DB::Put(const WriteOptions& opt, const Slice& key, const Slice& value) { WriteBatch batch; batch.Put(key, value); return Write(opt, &batch); }
// step 2 Status DBImpl::Write(const WriteOptions& options, WriteBatch* updates) { Writer w(&mutex_); w.batch = updates; w.sync = options.sync; w.done = false; ... }
首先将写入的键值写入到WriteBatch结构,然后调用DBImpl::Write做写入操作,下面会重点说明DBImpl函数
WriteBatch
WriteBatch是DBImpl::Write流程的主要部分,首先需要说明其原理。leveldb为了追求性能,会将write打包为batch然后批量进行wal的写入,因此leveldb的写入是原子性的,即使服务宕机,仍然可以使数据恢复。
WirteBatch只有一个私有成员变量 std::string rep_ ,存放数据
WriteBatch的编码如下:
长度 | 8字节 | 4字节 | 可变长度 | 可变长度 | 可变长度 |
内容 | sequence number | count | record 1 | record 2 | record 3 |
sequence number: leveldb的序列号,由于MVCC,此处选择最近recod的sequence number
count 为记录数量
record的编码如下:
长度 | 1字节 | 可变长度 | 键大小 | 可变变长 | 值大小 |
内容 | 类型 | 键大小 | 键 | 值大小 | 值 |
WriteBatch的成员函数主要包括:
// Store the mapping "key->value" in the database. void Put(const Slice& key, const Slice& value); // If the database contains a mapping for "key", erase it. Else do nothing. void Delete(const Slice& key); // Clear all updates buffered in this batch. void Clear(); // The size of the database changes caused by this batch. // // This number is tied to implementation details, and may change across // releases. It is intended for LevelDB usage metrics. size_t ApproximateSize() const; // Copies the operations in "source" to this batch. // // This runs in O(source size) time. However, the constant factor is better // than calling Iterate() over the source batch with a Handler that replicates // the operations into this batch. void Append(const WriteBatch& source); // Support for iterating over the contents of a batch. Status Iterate(Handler* handler) const;
WriteBachInternal是WriteBatch的友元类,为其辅助函数
写入流程
leveldb有函数DBImpl::Write负责写入,下属将会介绍此函数
DBImpl::Write函数主要逻辑
分步叙述主逻辑
Part 1
Status DBImpl::Write(const WriteOptions& options, WriteBatch* updates) { Writer w(&mutex_); w.batch = updates; w.sync = options.sync; w.done = false; MutexLock l(&mutex_); writers_.push_back(&w); while (!w.done && &w != writers_.front()) { w.cv.Wait(); } if (w.done) { return w.status; } }
- Write函数接受一个WriteBatch,以及写入的参数,sync的意思是wal是否直接刷盘,done是否此updates已经写入完成
- 写入支持并发,并会写入到一个队列writers_内,通过条件变量来实现生产者消费者。将多个线程的写入合并,来提升写入的性能。后面会详细说明其实现方式
- 只有在队列队首,而且写入没有完成才会执行下述的逻辑。
- 如果执行到 w.one==true则直接退出,此时表明数据被其他线程成功写入了(因为写入之后,会更新Writer的写入状态)。
Part 2
Status status = MakeRoomForWrite(updates == nullptr); uint64_t last_sequence = versions_->LastSequence(); Writer* last_writer = &w; if (status.ok() && updates != nullptr) { // nullptr batch is for compactions WriteBatch* write_batch = BuildBatchGroup(&last_writer); // write_batch只需要写入一个seq WriteBatchInternal::SetSequence(write_batch, last_sequence + 1); last_sequence += WriteBatchInternal::Count(write_batch); // Add to log and apply to memtable. We can release the lock // during this phase since &w is currently responsible for logging // and protects against concurrent loggers and concurrent writes // into mem_. { // 此处解锁,其他线程获取锁之后,执行加入writers_队列的动作,然后阻塞在条件变量上 // 在线程[t1,t2,t3],第一次执行时batch中只会有t1线程的内容,随后t2和t3才会加入 mutex_.Unlock(); status = log_->AddRecord(WriteBatchInternal::Contents(write_batch)); bool sync_error = false; if (status.ok() && options.sync) { status = logfile_->Sync(); if (!status.ok()) { sync_error = true; } } if (status.ok()) { status = WriteBatchInternal::InsertInto(write_batch, mem_); } mutex_.Lock(); if (sync_error) { // The state of the log file is indeterminate: the log record we // just added may or may not show up when the DB is re-opened. // So we force the DB into a mode where all future writes fail. RecordBackgroundError(status); } } if (write_batch == tmp_batch_) tmp_batch_->Clear(); versions_->SetLastSequence(last_sequence); }
- MakeRoomForWrite主要的工作为:处理L0增长过快,选择是否写限速或者写停止、memtable的刷盘逻辑以及memtable所对应的wal的处理逻辑。(下文还会对此函数详细论述)
- 获取此写入的sequence num,每个写都会有个递增的数值
- BuildBatchGroup比较简单,将队列的writers_的写入合并为一个WriteBatch。writes_为stl队列数据结构,通过迭代器遍历,然后通过WriteBatchInternal辅助类对WriteBatch操作,得出结果
- 此处释放锁的原因是为了提升性能,wal写入和写入memtable比较耗时,此处释放锁后,其他的线程的写入可以入队writes_内,但是不会向下执行逻辑
- InsertInto将WriteBatch的内容写入到memtable,以后会写一个关于memtable的文章
- AddRecord将WriteBatch写入wal,以后会写一个wal的文章
Part 3
while (true) { Writer* ready = writers_.front(); writers_.pop_front(); // 不是队首元素,则标记该写入已经完成 if (ready != &w) { ready->status = status; ready->done = true; ready->cv.Signal(); // 通知 } if (ready == last_writer) break; } // 通知 if (!writers_.empty()) { writers_.front()->cv.Signal(); }
主要逻辑是写入完成的出队,并且更新其写入状态
关键函数分析
MakeRoomForWrite
Status DBImpl::MakeRoomForWrite(bool force); 函数的作用是:处理L0写入过快的问题,处理memtable和immemtable以及wal,以及是否进行compaction 参数force标识是否立即刷盘 函数的主要逻辑:- 根据参数force是否立即刷盘,然后决定是否允许延迟操,由变量allow_delay标识
- 如果bg_error_发生错误,退出循环,并返回error状态
- 如果allow_delay为ture,而且L0的文件数大于kL0_SlowdownWritesTrigger(默认值为8),则写入限速1ms,在sleep之前释放锁mutex_,不阻塞其他线程逻辑,并将allow_deply设置为false,单次写入只允许限速一次
- 如果不立即刷盘,而且memtable的近似大小仍未达到write_buffer_size,则直接退出函数,什么也不用做
- 代码走到此处,要么需要立即刷盘,要么大小超过write_buffer_size,如果此时存在immemtable,则通过条件变量阻塞,只到compaction完成(immemtable刷盘成功)
- 如果L0的文件数目超过kL0_StopWritesTrigger(默认12),则写入停止,也是通过条件变量实现,等待compaction将L0的文件数减少
- 最后的情况,就是创建Log文件句柄,创建memtable文件句柄,将就旧的的memtable变为immemtable,然后判断是否需要通过异步调度compatiion动作
标签:status,WriteBatch,leveldb,const,memtable,流程,写入,batch 来源: https://www.cnblogs.com/vincent72143/p/16257501.html