TiKV源码分析(一)
作者:互联网
关于TiDB与TiKV学习总结
TiKV
关于TiKV主要的研究点在KV的存储,TiDB将SQL转成了KV数据对,TiKV就是将KV数据进行存储并提供查询,对于多节点中每节点的数据一致性和Percolator事务prewrite/commit等就是主要要解决的事情,从而实现稳定的分布式存储。
TiKV知识点
- Google Spanner、TrueTime API
- Service:
- gRPC(接TiDB)(kv_get/kv_scan/kv_prewrite/kv_commit API、coprocessor API、raw KV API、future下接Storage)
(接其他TiKVNode)(Raft&&Batch_Raft) - Raft(step/propose/tick/ready/advance、Leader/Follower)
- MultiRaft、状态机
- Log与Raft RocksDB、Region
- Storage:
- Percolator(Prewrite+Commit)
- MVCC
- 解析Log写落盘KV RocksDB
- Coprocessor(TiDB单表聚合算子计算下推)
- Snapshot
本章序:从主框架RaftStore开始
peerfsm处理日志消息的raft状态变更,applyfsm对日志进行落盘,这两者都是围绕raftstore与日志展开,这里涉及到的Engine是RaftEngine,关于实际数据写入RocksDB数据库的情况之后记录。
从batch.rs中看状态机的调度方法:所有Fsm之间的关联使用的是状态机的驱动,Poll在等待handle_raft_ready处理完之后会通过一个状态机的驱动结构reschedule_fsms(fsm_cnt,ReschedulePolicy)(Reschedule来记录一个状态及接下来的调度策略是移除、释放还是重新调度,重新调度会传入当前fsm在数组中的下标,重新将该fsm加入到调度中)。
用栈的弹入弹出操作来模拟一个驱动流程:
1.每次循环进入时会调用fetch_fsm方法,尝试去从fsm_receiver中获取当前传入的fsm,加入到batch中
2.使用该fsm的handle_normal/handle_control函数对该fsm进行处理
3.处理完后弹出上一个batch中的normal驱动机,出栈再入栈或者直接入栈加入新的下一个驱动机进行ReschedulePolicy::Schedule
4.Schedule所做的就是调用batch.reschedule(&self.router,r)对新加入的fsm进行schedule->schedule中使用send方法发送至下一个fsm的路径位置。
store.rs和apply.rs中实现了batch.rs中定义的trait方法,fsm加入后就可以调度store.rs和apply.rs中的具现化方法来执行每个fsm对应的操作。
从store.rs中的BatchSystem入手,其中有一个HandlerBuilder,里面含有一个Poller,使用PollHandler实现,PollHandler由Apply.rs和Peer.rs一人提供一个,可以通过Batch中的Poller获取ApplyFsm和PeerFsm的信息从而poll相应的Fsm进行下一步操作。
BatchSystem(HandlerBuilder)->spawn->start_poller->poller.poll()->(PollHandler).handle_control/handle_normal
HandlerBuilder结构体中的handler为PollHandler,HandlerBuilder的build()就是为了得到其中的PollHandler,根据PollHandler又可以构建出Poller结构体,从而可以调用Poller中的Poll方法,PollHandler中存在apply.rs(ApplyPoller)和store.rs(RaftPoller)中,提供了Handle_Control/Handle_Normal主要函数,会在下面分析。在Poll中可以不断的去调用handle函数处理状态机中的各种事件。
PollHandler中提供了handle_normal和handle_control,方便BatchSystem调用对应的FSM。
接下来分别从整体框架、peer的handle_normal和apply的handle_normal开始分析:
Apply与Peer的交互框架
主要考虑如下交互,交互形式以channel的tx/rx作为消息通道,消息中使用cb进行消息传递,也可以使用rx.recv/tx.send进行。
- ApplyTask::Apply peerfsm发给applyfsm(发送的proposal中记录了cb)
- PeerMsg::ApplyRes applyfsm返回apply的结果给peerfsm(一般为cb中的返回的结果)
- ApplyTaskRes::Apply peerfsm接收applyfsm的返回结果(收尾动作,此时cb已经执行完成)
cb对结果执行下一步操作与tx.send发送结果回peer需要区分开,cb完全可以执行tx.send发送结果回peer的操作,还可以包括失败数据的gc,还可以处理需要多步操作才能完成的请求等等。在apply代码中会先执行cb再执行tx.send向peer发送响应,因此某些情况可以自己定制cb用于向peer返回特定的结果,不仅限于Snapshot的结果(读请求)或Ok的结果(写请求)。
基于cb的消息传递
消息中会带上callback,用于记录一条消息完成后需要自己定义的一些操作:
peerfsm发送给applyfsm的ApplyTask::Apply中会有一个proposal,
proposal中记录了callback,apply接收后proposal存在pending_cmds中,proposal中的callback以后暂时存在applied_batch中
//ApplyDelegate主要记录的是对apply处理的各种方法,ApplyContext主要记录的是apply中会用到的各种数据
//apply中的pending_cmds(在结构体ApplyDelegate中)记录proposal
/// Handles proposals, and appends the commands to the apply delegate.
fn append_proposal(&mut self, props_drainer: Drain<Proposal<EK::Snapshot>>) {
let (region_id, peer_id) = (self.delegate.region_id(), self.delegate.id());
...
for p in props_drainer {
let cmd = PendingCmd::new(p.index, p.term, p.cb);
if p.is_conf_change {
if let Some(cmd) = self.delegate.pending_cmds.take_conf_change() {
// if it loses leadership before conf change is replicated, there may be
// a stale pending conf change before next conf change is applied. If it
// becomes leader again with the stale pending conf change, will enter
// this block, so we notify leadership may have been changed.
notify_stale_command(region_id, peer_id, self.delegate.term, cmd);
}
self.delegate.pending_cmds.set_conf_change(cmd);
} else {
self.delegate.pending_cmds.append_normal(cmd);
}
}
}
//apply中applied_batch记录(在ApplyContext结构体中)
fn handle_raft_entry_normal<W: WriteBatch<EK>>(
&mut self,
apply_ctx: &mut ApplyContext<EK, W>,
entry: &Entry,
) -> ApplyResult<EK::Snapshot> {
...
while let Some(mut cmd) = self.pending_cmds.pop_normal(std::u64::MAX, term - 1) {
if let Some(cb) = cmd.cb.take() {
apply_ctx
.applied_batch
.push_cb(cb, cmd_resp::err_resp(Error::StaleCommand, term));
}
}
}
写入数据库以后实际调用proposal中的回调函数,一般可能是记录了这个结果对应的下一步操作的,对于某个需要两步请求发送才能完成的操作,在第一个实现后会紧接着执行第二步;当然也可以为tx.send(Snapshot)/tx.send(Ok{})向peer发送响应结果;还可以对发送失败的消息做一些gc处理操作。在回调函数完成后会有finish_for收尾将实际的结果发送至peer中(为tx.send(Snapshot)/tx.send(Ok{}))。因此cb所做的事可以包含tx.send。
/// Writes all the changes into RocksDB.
/// If it returns true, all pending writes are persisted in engines.
pub fn write_to_db(&mut self) -> bool {
let need_sync = self.sync_log_hint;
//写RocksDB
...
// Take the applied commands and their callback
let ApplyCallbackBatch {
cmd_batch,
batch_max_level,
mut cb_batch,
} = mem::replace(&mut self.applied_batch, ApplyCallbackBatch::new());
// Call it before invoking callback for preventing Commit is executed before Prewrite is observed.
self.host
.on_flush_applied_cmd_batch(batch_max_level, cmd_batch, &self.engine);
// Invoke callbacks
let now = Instant::now();
for (cb, resp) in cb_batch.drain(..) {
if let Some(times) = cb.get_request_times() {
for t in times {
self.apply_time
.observe(duration_to_sec(now.saturating_duration_since(*t)));
}
}
cb.invoke_with_response(resp);
}
self.apply_time.flush();
self.apply_wait.flush();
need_sync
}
关于rx与tx的通道建立
在peer中会有apply_router.schedule_task,该函数定义在apply中,用于对apply发送来自peer的消息,未来会在apply中进行日志等数据的落盘。
//peer中
//activate函数中:
ctx.apply_router
.schedule_task(self.region_id, ApplyTask::register(self));
//handle_raft_ready_append中
//在ready前尽量从applyfsm中获取较新的Shapshot,在发送ApplyTask::apply(apply)消息后也需要获取一次Snapshot,避免保留旧快照导致应用日志等数据时存在问题。
ctx.apply_router
.schedule_task(self.region_id, ApplyTask::Snapshot(gen_task));
//handle_raft_committed_entries中
//发送apply是里面是记录了vec<proposal<S>的,propose里可以定义callback,
//在handle_raft_committed_entries中当在raftstore中得到ready以后,才会从ready中取出committed_entries,去到applyfsm中将日志进行落盘操作。
ctx.apply_router
.schedule_task(self.region_id, ApplyTask::apply(apply));
//在fsm/peer中还存在等从apply中获取一些帮助peer进行状态转换的消息如下:
//on_merge_result
self.ctx
.apply_router
.schedule_task(job.region_id, ApplyTask::destroy(job.region_id, false));
//on_catch_up_logs_for_merge
self.ctx.apply_router.schedule_task(self.fsm.region_id(),
ApplyTask::LogsUpToDate(self.fsm.peer.catch_up_logs.take().unwrap()),
);
//on_capture_change
//注意这里发送一个propose方法,cb是会在propose_raft_command-> self.fsm.peer.propose(self.ctx, cb, msg, resp, diskfullopt)->self.raft_group.propose(ctx.to_vec(), data)中延迟调用,可以看到这里在调用cb的节点时还会发送ApplyTask::Change的请求。
self.propose_raft_command(
msg,
Callback::Read(Box::new(move |resp| {
// Return the error
if resp.response.get_header().has_error() {
cb.invoke_read(resp);
return;
}
apply_router.schedule_task(
region_id,
ApplyTask::Change {
cmd,
region_epoch,
cb,
},
)
})),
在handle_normal中,会先接收发往apply的消息,之后根据得到的情况将msg_buf传给normal.handle_tasks,normal是applyfsm的一个实现,applyfsm本身自带一个Receiver:
//apply中的接收
while self.msg_buf.len() < self.messages_per_tick {
match normal.receiver.try_recv() {
Ok(msg) => self.msg_buf.push(msg),
Err(TryRecvError::Empty) => {
expected_msg_count = Some(0);
break;
}
Err(TryRecvError::Disconnected) => {
normal.delegate.stopped = true;
expected_msg_count = Some(0);
break;
}
}
}
normal.handle_tasks(&mut self.apply_ctx, &mut self.msg_buf);
在handle_tasks中第一步会接收到Msg::Registration(reg)消息,收到消息后会设置好此次注册传输消息的tx和rx。
调用handle_normal->handle_tasks->handle_registration(reg)->from_registration,此函数中会设置fsmdelegate的tx和rx,之后的消息传递就从这里的tx通过schedule_task传至apply,得到了一个delegate之后,我们就可以调用delegate相关的方法对peer传来的命令进行处理。
//apply中的注册
impl<EK> ApplyFsm<EK>
where
EK: KvEngine,
{
/// Handles peer registration. When a peer is created, it will register an apply delegate.
fn handle_registration(&mut self, reg: Registration) {
...
self.delegate.term = reg.term;
self.delegate.clear_all_commands_as_stale();
self.delegate = ApplyDelegate::from_registration(reg);
}
fn from_registration(reg: Registration) -> (LooseBoundedSender<Msg<EK>>, Box<ApplyFsm<EK>>) {
let (tx, rx) = loose_bounded(usize::MAX);
let delegate = ApplyDelegate::from_registration(reg);
(
tx,
Box::new(ApplyFsm {
delegate,
receiver: rx,
mailbox: None,
}),
)
}
apply处理完后会得到一个apply_res数组(记录在ApplyContext),该数组不为空时就会通过flush操作将未处理的写操作数据落盘将返回结果发送PeerMsg给到peerfsm,这个感觉是作为callback的一个加强操作,如果callback没有定义好,则默认一定会在某些需要发送结果的操作中必须发送返回结果。
//apply中发送回复消息至peer,一般在handle_snapshot/end等操作时调用flush
pub fn flush(&mut self) -> bool {
...
// Write to engine
// raftstore.sync-log = true means we need prevent data loss when power failure.
// take raft log gc for example, we write kv WAL first, then write raft WAL,
// if power failure happen, raft WAL may synced to disk, but kv WAL may not.
let is_synced = self.write_to_db();
if !self.apply_res.is_empty() {
let apply_res = mem::take(&mut self.apply_res);
self.notifier.notify(apply_res);
}
}
//关于notify的可能的实现:
pub struct TestNotifier<EK: KvEngine> {
tx: Sender<PeerMsg<EK>>,
}
impl<EK: KvEngine> Notifier<EK> for TestNotifier<EK> {
fn notify(&self, apply_res: Vec<ApplyRes<EK::Snapshot>>) {
for r in apply_res {
let res = TaskRes::Apply(r);
let _ = self.tx.send(PeerMsg::ApplyRes { res });
}
}
}
当peerfsm收到来自apply的applyres之后会进行一些收尾工作,对于上面提到的ApplyRes:Apply,会用post_apply后续更新 apply_state/applied_index_term/metrics
//peer中
fn on_apply_res(&mut self, res: ApplyTaskRes<EK::Snapshot>) {
fail_point!("on_apply_res", |_| {});
match res {
ApplyTaskRes::Apply(mut res) => {
debug!(
"async apply finish";
"region_id" => self.region_id(),
"peer_id" => self.fsm.peer_id(),
"res" => ?res,
);
self.on_ready_result(&mut res.exec_res, &res.metrics);
if self.fsm.stopped {
return;
}
self.fsm.has_ready |= self.fsm.peer.post_apply(
self.ctx,
res.apply_state,
res.applied_index_term,
&res.metrics,
);
// After applying, several metrics are updated, report it to pd to
// get fair schedule.
if self.fsm.peer.is_leader() {
self.register_pd_heartbeat_tick();
self.register_split_region_check_tick();
}
}
ApplyTaskRes::Destroy {
region_id,
peer_id,
merge_from_snapshot,
} => {
assert_eq!(peer_id, self.fsm.peer.peer_id());
if !merge_from_snapshot {
self.destroy_peer(false);
} else {
// Wait for its target peer to apply snapshot and then send `MergeResult` back
// to destroy itself
let mut meta = self.ctx.store_meta.lock().unwrap();
// The `need_atomic` flag must be true
assert!(*meta.destroyed_region_for_snap.get(®ion_id).unwrap());
let target_region_id = *meta.targets_map.get(®ion_id).unwrap();
let is_ready = meta
.atomic_snap_regions
.get_mut(&target_region_id)
.unwrap()
.get_mut(®ion_id)
.unwrap();
*is_ready = true;
}
}
}
}
Peer中的具体操作
handle_normal(store.rs中)=>(PeerFsmDelegate)handle_msgs->处理各种PeerMsg,对于RaftCommand构建self.fsm.batch_req_builder后(add,build)调用propose_raft_command->self.fsm.peer.propose(raft_group RawNode<PeerStorage<EK,ER>>)->propose_normal->raft_group propose
propose是从fsm.batch_req_builder.build(&mut self.ctx_raft_metrics)中取得cmd,再通过self.propose_raft_command(cmd.request,cmd.callback,DiskFullOpt::NotAllowedOnFull)会设置好proposal封装进apply中。在peer中的发送proposal至apply进行落盘主要是peer调用apply_router.schedule_task(region_id,msg)函数,
let mut apply = Apply::new(
self.peer_id(),
self.region_id,
self.term(),
committed_entries,
cbs,//Vec<Proposal<S>>,
);
apply.on_schedule(&ctx.raft_metrics);
...
ctx.apply_router
.schedule_task(self.region_id, ApplyTask::apply(apply));
propose之后在peer中会有raftstore状态转换得到ready(大多节点都认可了该消息,已得到保障可以进行落盘),ready在collect_ready中进行处理。
ready结构是一个self.raft_group.ready(),在ready中是会存在参数peer_id的(还有snapshot/hs/entries/msgs等) peer
会记录在ctx.store_meta.readers中,在handle_raft_ready_append中产生了一个ready并加入ready_res数组,之后调用post_raft_ready_append->send->send_raft_message,将msgs传递到对应的apply。
handle_normal(store.rs中)=>collect_ready->
1.对于raft消息是通过on_raft_message得到的,peer在收到raft消息之后会调用Raft::step,最终成功后走committed_entries去到applyfsm。ready是handle_raft_ready_append中raft_group ready得来的,会反应在Committed_entries中->handle_raft_committed_entries->ctx.apply_router.schedule_task(self.region_id,ApplyTask::apply(apply))想applyfsm发送apply请求(apply中记录了committed_entries)。
所以说raft与apply之间的关联就在于ready与committed_entries。
2.self.fsm.peer.handle_raft_ready_append(self.ctx)得到ready后self.ctx.ready_res.push将ready结果push进去
3.调用store里的handle_raft_ready对ready_res中的结果进行写入,kv的写入kvEngine,raft的写入raftEngine调用post_raft_ready_append,做一些对ready的善后操作(在handle_raft_ready_append中会处理PollContext会调用handle_raft_ready,用于更新各种数据至kv/raft中后将InvokeContext结果返回给handle_raft_ready_append),然后调用当前fsm handler中的end()方法对当前fsm进行收尾。
对raft数据做完ready处理后就以cpmmitted_entries的形式进入到applyfsm部分进行apply操作。
5.在PollContext中有router(RaftRouter)/apply_router(ApplyRouter),对于apply_router主要调用的是schedule_task方法。
ctx.apply_router
.schedule_task(self.region_id, ApplyTask::Snapshot(gen_task));
//将消息发送给ApplyFsm
self.send(ctx, msgs); //将数据发送给其他Store
Apply中的具体操作
Apply的Builder中有RaftPollerBuilder和ApplyRouter,RaftPollerBuilder中有engines和store。
Apply的HandlerBuilder就是基于Builder,里面封装了ApplyPoller->ApplyContext->ApplyDelegate->WriteBatch的关于写数据库put/delete方法以及处理一些命令的代码。
handle_normal->handle_tasks->handle_apply/resume_pending->
1.append_proposal->append_normal(将命令push back至pending_cmds)
2.handle_raft_committed_entries->
2.1handle_raft_entry_normal->commit->commit_opt->write_to_db->write_opt并通过callback的invoke_with_response(resp)处理返回ReadResponse或WriteResponse的结果。
2.2handle_raft_entry_normal->process_raft_cmd->
2.2.1apply_raft_cmd->exec_raft_cmd->exec_write_cmd/exec_admin_cmd->WriteBatch的handle_put/handle_delete->wb.put()
2.2.2find_pending(将pending_cmds里的命令取出)得到命令对应的callback
2.2.3将找出的callback push至apply_ctx.applied_batch,等到apply_raft_cmd中返回的执行结果
3.handle_raft_committed_entries最终得到的结果就是raft的运行结果,使用这个结果对apply_ctx做一些finish_for收尾操作,将得到的results放入apply_ctx的apply_res数组。返回到最开始的handle_normal也就结束了。
此时在flush函数中有如下操作,会调用notifier.notify
if !self.apply_res.is_empty() {
let apply_res = mem::take(&mut self.apply_res);
self.notifier.notify(apply_res);
}
impl<EK: KvEngine> Notifier<EK> for TestNotifier<EK> {
fn notify(&self, apply_res: Vec<ApplyRes<EK::Snapshot>>) {
for r in apply_res {
let res = TaskRes::Apply(r);
let _ = self.tx.send(PeerMsg::ApplyRes { res });
}
}
fn notify_one(&self, _: u64, msg: PeerMsg<EK>) {
let _ = self.tx.send(msg);
}
fn clone_box(&self) -> Box<dyn Notifier<EK>> {
Box::new(self.clone())
}
}
当schedule实际完成处理以后,需要返回结果的就返回一个peermsg,不需要的就将不返回,所以apply中的handle_normal主要是对apply_ctx进行处理,当最终在exec_raft_cmd中对命令实际完成后就一路向上返回最终的处理结果。
fn batch_messages<E>(router: &ApplyRouter<E>, region_id: u64, msgs: Vec<Msg<E>>)
where
E: KvEngine,
{
let (notify1, wait1) = mpsc::channel();
let (notify2, wait2) = mpsc::channel();
router.schedule_task(
region_id,
Msg::Validate(
region_id,
Box::new(move |_| {
notify1.send(()).unwrap();
wait2.recv().unwrap();
}),
),
);
wait1.recv().unwrap();
for msg in msgs {
router.schedule_task(region_id, msg);
}
notify2.send(()).unwrap();
}
...
//此处可以看到proposal中带上了callback,对于得到返回结果会有怎样的处理,在这里当得到write结果时,需要想resp_tx发送结果
let (resp_tx, resp_rx) = mpsc::channel();
let p = proposal(
false,
1,
0,
Callback::write(Box::new(move |resp: WriteResponse| {
resp_tx.send(resp.response).unwrap();
})),
);
router.schedule_task(
1,
Msg::apply(apply(1, 1, 0, vec![new_entry(0, 1, true)], vec![p])),//此处写resp_rx
);
// unregistered region should be ignored and notify failed.
let resp = resp_rx.recv_timeout(Duration::from_secs(3)).unwrap();
...
//此处可以看到一个主要的操作是将peer需要的信息通过schedule_task下发给到applyfsm,之后peer可以从rx中获取返回applyres(snapshot)即可。
let (snap_tx, _) = mpsc::sync_channel(0);
batch_messages(
&router,
2,
vec![
Msg::apply(apply(1, 2, 11, vec![new_entry(5, 5, false)], vec![])),
Msg::Snapshot(GenSnapTask::new_for_test(2, snap_tx)),
],
);
let apply_res = match rx.recv_timeout(Duration::from_secs(3)) {
Ok(PeerMsg::ApplyRes {
res: TaskRes::Apply(res),
..
}) => res,
e => panic!("unexpected apply result: {:?}", e),
};
一些琐碎信息
关于RaftRouter和Transport的关系:Transport是发给Store的,RaftRouter是发给Region的,参数为Router.force_send(region_id,msg),
Peer中用到了router中的地方:send_raft_commad/handle_raft_commited_entries/handle_raft_ready_append/activate
一个Region可以通过范围拆分分布在多个Store上,也就是所谓的MultiRaft。
使用Transport时可以直接使用Transport内RaftClient中的send方法
ctx.trans.send(SendMsg)/
RaftMessage::default/
handle_raft_ready_append: eraftpb::Message/
post_raft_ready_append: ready.take_messages() (raft_group.ready() raft_group是RawNode::New)
handle_raft_ready_advance: light_rd.take_messages()(raft_group.advance_append(ready))。
send_extra_message/send_raft_message/perpare_raft_message() (Raft_Message::Default)
一些消息参数:
PeerMsg::RaftCommand/
PeerMsg::RaftMessage/
PeerMsg::ApplyRes/
PeerMsg::Start/
PeerMsg::CasualMsg/
PeerMsg::replicate/
ControlMsg::LatencyInspect/
StoreMsg::RaftMessage/
StoreMsg::Tick/
StoreMsg::Start/
标签:分析,handle,self,TiKV,peer,源码,raft,ready,apply 来源: https://blog.csdn.net/Parallel2333/article/details/120028307