编程语言
首页 > 编程语言> > TiKV源码分析(一)

TiKV源码分析(一)

作者:互联网

关于TiDB与TiKV学习总结

TiKV

关于TiKV主要的研究点在KV的存储,TiDB将SQL转成了KV数据对,TiKV就是将KV数据进行存储并提供查询,对于多节点中每节点的数据一致性和Percolator事务prewrite/commit等就是主要要解决的事情,从而实现稳定的分布式存储。

TiKV知识点

  1. Google Spanner、TrueTime API
  2. Service:
  1. Storage:

本章序:从主框架RaftStore开始

peerfsm处理日志消息的raft状态变更,applyfsm对日志进行落盘,这两者都是围绕raftstore与日志展开,这里涉及到的Engine是RaftEngine,关于实际数据写入RocksDB数据库的情况之后记录。

batch.rs中看状态机的调度方法:所有Fsm之间的关联使用的是状态机的驱动,Poll在等待handle_raft_ready处理完之后会通过一个状态机的驱动结构reschedule_fsms(fsm_cnt,ReschedulePolicy)(Reschedule来记录一个状态及接下来的调度策略是移除、释放还是重新调度,重新调度会传入当前fsm在数组中的下标,重新将该fsm加入到调度中)。
用栈的弹入弹出操作来模拟一个驱动流程:
1.每次循环进入时会调用fetch_fsm方法,尝试去从fsm_receiver中获取当前传入的fsm,加入到batch中
2.使用该fsm的handle_normal/handle_control函数对该fsm进行处理
3.处理完后弹出上一个batch中的normal驱动机,出栈再入栈或者直接入栈加入新的下一个驱动机进行ReschedulePolicy::Schedule
4.Schedule所做的就是调用batch.reschedule(&self.router,r)对新加入的fsm进行schedule->schedule中使用send方法发送至下一个fsm的路径位置。
store.rs和apply.rs中实现了batch.rs中定义的trait方法,fsm加入后就可以调度store.rs和apply.rs中的具现化方法来执行每个fsm对应的操作。

store.rs中的BatchSystem入手,其中有一个HandlerBuilder,里面含有一个Poller,使用PollHandler实现,PollHandler由Apply.rs和Peer.rs一人提供一个,可以通过Batch中的Poller获取ApplyFsm和PeerFsm的信息从而poll相应的Fsm进行下一步操作。

BatchSystem(HandlerBuilder)->spawn->start_poller->poller.poll()->(PollHandler).handle_control/handle_normal

HandlerBuilder结构体中的handler为PollHandler,HandlerBuilder的build()就是为了得到其中的PollHandler,根据PollHandler又可以构建出Poller结构体,从而可以调用Poller中的Poll方法,PollHandler中存在apply.rs(ApplyPoller)和store.rs(RaftPoller)中,提供了Handle_Control/Handle_Normal主要函数,会在下面分析。在Poll中可以不断的去调用handle函数处理状态机中的各种事件。
PollHandler中提供了handle_normal和handle_control,方便BatchSystem调用对应的FSM。

接下来分别从整体框架、peer的handle_normal和apply的handle_normal开始分析:

Apply与Peer的交互框架

主要考虑如下交互,交互形式以channel的tx/rx作为消息通道,消息中使用cb进行消息传递,也可以使用rx.recv/tx.send进行。

cb对结果执行下一步操作tx.send发送结果回peer需要区分开,cb完全可以执行tx.send发送结果回peer的操作,还可以包括失败数据的gc,还可以处理需要多步操作才能完成的请求等等。在apply代码中会先执行cb再执行tx.send向peer发送响应,因此某些情况可以自己定制cb用于向peer返回特定的结果,不仅限于Snapshot的结果(读请求)或Ok的结果(写请求)。

基于cb的消息传递

消息中会带上callback,用于记录一条消息完成后需要自己定义的一些操作:
peerfsm发送给applyfsm的ApplyTask::Apply中会有一个proposal,
proposal中记录了callback,apply接收后proposal存在pending_cmds中,proposal中的callback以后暂时存在applied_batch中

//ApplyDelegate主要记录的是对apply处理的各种方法,ApplyContext主要记录的是apply中会用到的各种数据
//apply中的pending_cmds(在结构体ApplyDelegate中)记录proposal
    /// Handles proposals, and appends the commands to the apply delegate.
    fn append_proposal(&mut self, props_drainer: Drain<Proposal<EK::Snapshot>>) {
        let (region_id, peer_id) = (self.delegate.region_id(), self.delegate.id());
...
        for p in props_drainer {
            let cmd = PendingCmd::new(p.index, p.term, p.cb);
            if p.is_conf_change {
                if let Some(cmd) = self.delegate.pending_cmds.take_conf_change() {
                    // if it loses leadership before conf change is replicated, there may be
                    // a stale pending conf change before next conf change is applied. If it
                    // becomes leader again with the stale pending conf change, will enter
                    // this block, so we notify leadership may have been changed.
                    notify_stale_command(region_id, peer_id, self.delegate.term, cmd);
                }
                self.delegate.pending_cmds.set_conf_change(cmd);
            } else {
                self.delegate.pending_cmds.append_normal(cmd);
            }
        }
    }

//apply中applied_batch记录(在ApplyContext结构体中)
    fn handle_raft_entry_normal<W: WriteBatch<EK>>(
        &mut self,
        apply_ctx: &mut ApplyContext<EK, W>,
        entry: &Entry,
    ) -> ApplyResult<EK::Snapshot> {
    ...
        while let Some(mut cmd) = self.pending_cmds.pop_normal(std::u64::MAX, term - 1) {
            if let Some(cb) = cmd.cb.take() {
                apply_ctx
                    .applied_batch
                    .push_cb(cb, cmd_resp::err_resp(Error::StaleCommand, term));
            }
        }
}

写入数据库以后实际调用proposal中的回调函数,一般可能是记录了这个结果对应的下一步操作的,对于某个需要两步请求发送才能完成的操作,在第一个实现后会紧接着执行第二步;当然也可以为tx.send(Snapshot)/tx.send(Ok{})向peer发送响应结果;还可以对发送失败的消息做一些gc处理操作。在回调函数完成后会有finish_for收尾将实际的结果发送至peer中(为tx.send(Snapshot)/tx.send(Ok{}))。因此cb所做的事可以包含tx.send。


    /// Writes all the changes into RocksDB.
    /// If it returns true, all pending writes are persisted in engines.
    pub fn write_to_db(&mut self) -> bool {
        let need_sync = self.sync_log_hint;
//写RocksDB
...
        // Take the applied commands and their callback
        let ApplyCallbackBatch {
            cmd_batch,
            batch_max_level,
            mut cb_batch,
        } = mem::replace(&mut self.applied_batch, ApplyCallbackBatch::new());
        // Call it before invoking callback for preventing Commit is executed before Prewrite is observed.
        self.host
            .on_flush_applied_cmd_batch(batch_max_level, cmd_batch, &self.engine);
        // Invoke callbacks
        let now = Instant::now();
        for (cb, resp) in cb_batch.drain(..) {
            if let Some(times) = cb.get_request_times() {
                for t in times {
                    self.apply_time
                        .observe(duration_to_sec(now.saturating_duration_since(*t)));
                }
            }
            cb.invoke_with_response(resp);
        }
        self.apply_time.flush();
        self.apply_wait.flush();
        need_sync
    }

关于rx与tx的通道建立

在peer中会有apply_router.schedule_task,该函数定义在apply中,用于对apply发送来自peer的消息,未来会在apply中进行日志等数据的落盘。

//peer中
//activate函数中:
        ctx.apply_router
            .schedule_task(self.region_id, ApplyTask::register(self));
//handle_raft_ready_append中
//在ready前尽量从applyfsm中获取较新的Shapshot,在发送ApplyTask::apply(apply)消息后也需要获取一次Snapshot,避免保留旧快照导致应用日志等数据时存在问题。
        ctx.apply_router
            .schedule_task(self.region_id, ApplyTask::Snapshot(gen_task));
//handle_raft_committed_entries中
//发送apply是里面是记录了vec<proposal<S>的,propose里可以定义callback,
//在handle_raft_committed_entries中当在raftstore中得到ready以后,才会从ready中取出committed_entries,去到applyfsm中将日志进行落盘操作。
        ctx.apply_router
            .schedule_task(self.region_id, ApplyTask::apply(apply));
//在fsm/peer中还存在等从apply中获取一些帮助peer进行状态转换的消息如下:
//on_merge_result
        self.ctx
            .apply_router
            .schedule_task(job.region_id, ApplyTask::destroy(job.region_id, false));
//on_catch_up_logs_for_merge            
        self.ctx.apply_router.schedule_task(self.fsm.region_id(),
         ApplyTask::LogsUpToDate(self.fsm.peer.catch_up_logs.take().unwrap()),
                );     
//on_capture_change     
//注意这里发送一个propose方法,cb是会在propose_raft_command-> self.fsm.peer.propose(self.ctx, cb, msg, resp, diskfullopt)->self.raft_group.propose(ctx.to_vec(), data)中延迟调用,可以看到这里在调用cb的节点时还会发送ApplyTask::Change的请求。
       self.propose_raft_command(
            msg,
            Callback::Read(Box::new(move |resp| {
                // Return the error
                if resp.response.get_header().has_error() {
                    cb.invoke_read(resp);
                    return;
                }
                apply_router.schedule_task(
                    region_id,
                    ApplyTask::Change {
                        cmd,
                        region_epoch,
                        cb,
                    },
                )
            })),

handle_normal中,会先接收发往apply的消息,之后根据得到的情况将msg_buf传给normal.handle_tasks,normal是applyfsm的一个实现,applyfsm本身自带一个Receiver:

//apply中的接收
        while self.msg_buf.len() < self.messages_per_tick {
            match normal.receiver.try_recv() {
                Ok(msg) => self.msg_buf.push(msg),
                Err(TryRecvError::Empty) => {
                    expected_msg_count = Some(0);
                    break;
                }
                Err(TryRecvError::Disconnected) => {
                    normal.delegate.stopped = true;
                    expected_msg_count = Some(0);
                    break;
                }
            }
        }
        normal.handle_tasks(&mut self.apply_ctx, &mut self.msg_buf);

在handle_tasks中第一步会接收到Msg::Registration(reg)消息,收到消息后会设置好此次注册传输消息的tx和rx。
调用handle_normal->handle_tasks->handle_registration(reg)->from_registration,此函数中会设置fsmdelegate的tx和rx,之后的消息传递就从这里的tx通过schedule_task传至apply,得到了一个delegate之后,我们就可以调用delegate相关的方法对peer传来的命令进行处理。

//apply中的注册
   impl<EK> ApplyFsm<EK>
where
    EK: KvEngine,
{
    /// Handles peer registration. When a peer is created, it will register an apply delegate.
        fn handle_registration(&mut self, reg: Registration) {
...
        self.delegate.term = reg.term;
        self.delegate.clear_all_commands_as_stale();
        self.delegate = ApplyDelegate::from_registration(reg);
    }
    fn from_registration(reg: Registration) -> (LooseBoundedSender<Msg<EK>>, Box<ApplyFsm<EK>>) {
        let (tx, rx) = loose_bounded(usize::MAX);
        let delegate = ApplyDelegate::from_registration(reg);
        (
            tx,
            Box::new(ApplyFsm {
                delegate,
                receiver: rx,
                mailbox: None,
            }),
        )
    }

apply处理完后会得到一个apply_res数组(记录在ApplyContext),该数组不为空时就会通过flush操作将未处理的写操作数据落盘将返回结果发送PeerMsg给到peerfsm,这个感觉是作为callback的一个加强操作,如果callback没有定义好,则默认一定会在某些需要发送结果的操作中必须发送返回结果。

//apply中发送回复消息至peer,一般在handle_snapshot/end等操作时调用flush
    pub fn flush(&mut self) -> bool {
...
        // Write to engine
        // raftstore.sync-log = true means we need prevent data loss when power failure.
        // take raft log gc for example, we write kv WAL first, then write raft WAL,
        // if power failure happen, raft WAL may synced to disk, but kv WAL may not.
        let is_synced = self.write_to_db();

        if !self.apply_res.is_empty() {
            let apply_res = mem::take(&mut self.apply_res);
            self.notifier.notify(apply_res);
        }
    }

//关于notify的可能的实现:
    pub struct TestNotifier<EK: KvEngine> {
        tx: Sender<PeerMsg<EK>>,
    }

    impl<EK: KvEngine> Notifier<EK> for TestNotifier<EK> {
        fn notify(&self, apply_res: Vec<ApplyRes<EK::Snapshot>>) {
            for r in apply_res {
                let res = TaskRes::Apply(r);
                let _ = self.tx.send(PeerMsg::ApplyRes { res });
            }
        }
    }

当peerfsm收到来自apply的applyres之后会进行一些收尾工作,对于上面提到的ApplyRes:Apply,会用post_apply后续更新 apply_state/applied_index_term/metrics

//peer中
    fn on_apply_res(&mut self, res: ApplyTaskRes<EK::Snapshot>) {
        fail_point!("on_apply_res", |_| {});
        match res {
            ApplyTaskRes::Apply(mut res) => {
                debug!(
                    "async apply finish";
                    "region_id" => self.region_id(),
                    "peer_id" => self.fsm.peer_id(),
                    "res" => ?res,
                );
                self.on_ready_result(&mut res.exec_res, &res.metrics);
                if self.fsm.stopped {
                    return;
                }
                self.fsm.has_ready |= self.fsm.peer.post_apply(
                    self.ctx,
                    res.apply_state,
                    res.applied_index_term,
                    &res.metrics,
                );
                // After applying, several metrics are updated, report it to pd to
                // get fair schedule.
                if self.fsm.peer.is_leader() {
                    self.register_pd_heartbeat_tick();
                    self.register_split_region_check_tick();
                }
            }
            ApplyTaskRes::Destroy {
                region_id,
                peer_id,
                merge_from_snapshot,
            } => {
                assert_eq!(peer_id, self.fsm.peer.peer_id());
                if !merge_from_snapshot {
                    self.destroy_peer(false);
                } else {
                    // Wait for its target peer to apply snapshot and then send `MergeResult` back
                    // to destroy itself
                    let mut meta = self.ctx.store_meta.lock().unwrap();
                    // The `need_atomic` flag must be true
                    assert!(*meta.destroyed_region_for_snap.get(&region_id).unwrap());

                    let target_region_id = *meta.targets_map.get(&region_id).unwrap();
                    let is_ready = meta
                        .atomic_snap_regions
                        .get_mut(&target_region_id)
                        .unwrap()
                        .get_mut(&region_id)
                        .unwrap();
                    *is_ready = true;
                }
            }
        }
    }

Peer中的具体操作

handle_normal(store.rs中)=>(PeerFsmDelegate)handle_msgs->处理各种PeerMsg,对于RaftCommand构建self.fsm.batch_req_builder后(add,build)调用propose_raft_command->self.fsm.peer.propose(raft_group RawNode<PeerStorage<EK,ER>>)->propose_normal->raft_group propose

propose是从fsm.batch_req_builder.build(&mut self.ctx_raft_metrics)中取得cmd,再通过self.propose_raft_command(cmd.request,cmd.callback,DiskFullOpt::NotAllowedOnFull)会设置好proposal封装进apply中。在peer中的发送proposal至apply进行落盘主要是peer调用apply_router.schedule_task(region_id,msg)函数,

 let mut apply = Apply::new(
                self.peer_id(),
                self.region_id,
                self.term(),
                committed_entries,
                cbs,//Vec<Proposal<S>>,
            );
            apply.on_schedule(&ctx.raft_metrics);
...
            ctx.apply_router
                .schedule_task(self.region_id, ApplyTask::apply(apply));

propose之后在peer中会有raftstore状态转换得到ready(大多节点都认可了该消息,已得到保障可以进行落盘),ready在collect_ready中进行处理。
ready结构是一个self.raft_group.ready(),在ready中是会存在参数peer_id的(还有snapshot/hs/entries/msgs等) peer
会记录在ctx.store_meta.readers中,在handle_raft_ready_append中产生了一个ready并加入ready_res数组,之后调用post_raft_ready_append->send->send_raft_message,将msgs传递到对应的apply。

handle_normal(store.rs中)=>collect_ready->
1.对于raft消息是通过on_raft_message得到的,peer在收到raft消息之后会调用Raft::step,最终成功后走committed_entries去到applyfsm。ready是handle_raft_ready_appendraft_group ready得来的,会反应在Committed_entries中->handle_raft_committed_entries->ctx.apply_router.schedule_task(self.region_id,ApplyTask::apply(apply))想applyfsm发送apply请求(apply中记录了committed_entries)。
所以说raft与apply之间的关联就在于ready与committed_entries。
2.self.fsm.peer.handle_raft_ready_append(self.ctx)得到ready后self.ctx.ready_res.push将ready结果push进去
3.调用store里的handle_raft_ready对ready_res中的结果进行写入,kv的写入kvEngine,raft的写入raftEngine调用post_raft_ready_append,做一些对ready的善后操作(在handle_raft_ready_append中会处理PollContext会调用handle_raft_ready,用于更新各种数据至kv/raft中后将InvokeContext结果返回给handle_raft_ready_append),然后调用当前fsm handler中的end()方法对当前fsm进行收尾。
对raft数据做完ready处理后就以cpmmitted_entries的形式进入到applyfsm部分进行apply操作。
5.在PollContext中有router(RaftRouter)/apply_router(ApplyRouter),对于apply_router主要调用的是schedule_task方法。

ctx.apply_router
.schedule_task(self.region_id, ApplyTask::Snapshot(gen_task));
//将消息发送给ApplyFsm
self.send(ctx, msgs); //将数据发送给其他Store

Apply中的具体操作

Apply的Builder中有RaftPollerBuilder和ApplyRouter,RaftPollerBuilder中有engines和store。
Apply的HandlerBuilder就是基于Builder,里面封装了ApplyPoller->ApplyContext->ApplyDelegate->WriteBatch的关于写数据库put/delete方法以及处理一些命令的代码。

handle_normal->handle_tasks->handle_apply/resume_pending->
1.append_proposal->append_normal(将命令push back至pending_cmds)
2.handle_raft_committed_entries->
2.1handle_raft_entry_normal->commit->commit_opt->write_to_db->write_opt并通过callback的invoke_with_response(resp)处理返回ReadResponse或WriteResponse的结果。
2.2handle_raft_entry_normal->process_raft_cmd->
2.2.1apply_raft_cmd->exec_raft_cmd->exec_write_cmd/exec_admin_cmd->WriteBatch的handle_put/handle_delete->wb.put()
2.2.2find_pending(将pending_cmds里的命令取出)得到命令对应的callback
2.2.3将找出的callback push至apply_ctx.applied_batch,等到apply_raft_cmd中返回的执行结果
3.handle_raft_committed_entries最终得到的结果就是raft的运行结果,使用这个结果对apply_ctx做一些finish_for收尾操作,将得到的results放入apply_ctx的apply_res数组。返回到最开始的handle_normal也就结束了。
此时在flush函数中有如下操作,会调用notifier.notify

        if !self.apply_res.is_empty() {
            let apply_res = mem::take(&mut self.apply_res);
            self.notifier.notify(apply_res);
        }
        
    impl<EK: KvEngine> Notifier<EK> for TestNotifier<EK> {
        fn notify(&self, apply_res: Vec<ApplyRes<EK::Snapshot>>) {
            for r in apply_res {
                let res = TaskRes::Apply(r);
                let _ = self.tx.send(PeerMsg::ApplyRes { res });
            }
        }
        fn notify_one(&self, _: u64, msg: PeerMsg<EK>) {
            let _ = self.tx.send(msg);
        }
        fn clone_box(&self) -> Box<dyn Notifier<EK>> {
            Box::new(self.clone())
        }
    }

当schedule实际完成处理以后,需要返回结果的就返回一个peermsg,不需要的就将不返回,所以apply中的handle_normal主要是对apply_ctx进行处理,当最终在exec_raft_cmd中对命令实际完成后就一路向上返回最终的处理结果。

    fn batch_messages<E>(router: &ApplyRouter<E>, region_id: u64, msgs: Vec<Msg<E>>)
    where
        E: KvEngine,
    {
        let (notify1, wait1) = mpsc::channel();
        let (notify2, wait2) = mpsc::channel();
        router.schedule_task(
            region_id,
            Msg::Validate(
                region_id,
                Box::new(move |_| {
                    notify1.send(()).unwrap();
                    wait2.recv().unwrap();
                }),
            ),
        );
        wait1.recv().unwrap();

        for msg in msgs {
            router.schedule_task(region_id, msg);
        }

        notify2.send(()).unwrap();
    }

...
//此处可以看到proposal中带上了callback,对于得到返回结果会有怎样的处理,在这里当得到write结果时,需要想resp_tx发送结果
        let (resp_tx, resp_rx) = mpsc::channel();
        let p = proposal(
            false,
            1,
            0,
            Callback::write(Box::new(move |resp: WriteResponse| {
                resp_tx.send(resp.response).unwrap();
            })),
        );
        router.schedule_task(
            1,
            Msg::apply(apply(1, 1, 0, vec![new_entry(0, 1, true)], vec![p])),//此处写resp_rx
        );
        // unregistered region should be ignored and notify failed.
        let resp = resp_rx.recv_timeout(Duration::from_secs(3)).unwrap();
...
//此处可以看到一个主要的操作是将peer需要的信息通过schedule_task下发给到applyfsm,之后peer可以从rx中获取返回applyres(snapshot)即可。
        let (snap_tx, _) = mpsc::sync_channel(0);
        batch_messages(
            &router,
            2,
            vec![
                Msg::apply(apply(1, 2, 11, vec![new_entry(5, 5, false)], vec![])),
                Msg::Snapshot(GenSnapTask::new_for_test(2, snap_tx)),
            ],
        );
        let apply_res = match rx.recv_timeout(Duration::from_secs(3)) {
            Ok(PeerMsg::ApplyRes {
                res: TaskRes::Apply(res),
                ..
            }) => res,
            e => panic!("unexpected apply result: {:?}", e),
        };

一些琐碎信息

关于RaftRouter和Transport的关系:Transport是发给Store的,RaftRouter是发给Region的,参数为Router.force_send(region_id,msg),
Peer中用到了router中的地方:send_raft_commad/handle_raft_commited_entries/handle_raft_ready_append/activate

一个Region可以通过范围拆分分布在多个Store上,也就是所谓的MultiRaft。
使用Transport时可以直接使用Transport内RaftClient中的send方法
ctx.trans.send(SendMsg)/
RaftMessage::default/
handle_raft_ready_append: eraftpb::Message/
post_raft_ready_append: ready.take_messages() (raft_group.ready() raft_group是RawNode::New)
handle_raft_ready_advance: light_rd.take_messages()(raft_group.advance_append(ready))。
send_extra_message/send_raft_message/perpare_raft_message() (Raft_Message::Default)

一些消息参数:
PeerMsg::RaftCommand/
PeerMsg::RaftMessage/
PeerMsg::ApplyRes/
PeerMsg::Start/
PeerMsg::CasualMsg/
PeerMsg::replicate/
ControlMsg::LatencyInspect/
StoreMsg::RaftMessage/
StoreMsg::Tick/
StoreMsg::Start/

标签:分析,handle,self,TiKV,peer,源码,raft,ready,apply
来源: https://blog.csdn.net/Parallel2333/article/details/120028307