数据库
首页 > 数据库> > 时序数据库Influx-IOx源码学习四(Run命令的执行)

时序数据库Influx-IOx源码学习四(Run命令的执行)

作者:互联网

InfluxDB是一个由InfluxData开发的开源时序数据库,专注于海量时序数据的高性能读、写、高效存储与实时分析等,在DB-Engines Ranking时序型数据库排行榜上常年排名第一。

InfluxDB可以说是当之无愧的佼佼者,但 InfluxDB CTO Paul 在 2020/12/10 号在博客中发表一篇名为:Announcing InfluxDB IOx – The Future Core of InfluxDB Built with Rust and Arrow的文章,介绍了一个新项目 InfluxDB IOx,InfluxDB 的下一代时序引擎。

接下来,我将连载对于InfluxDB IOx的源码解析过程,欢迎各位批评指正,联系方式见文章末尾。


上篇介绍到:InfluxDB-IOx的命令行及配置,详情见:https://my.oschina.net/u/3374539/blog/5017858

这章记录一下Run命令的执行过程。

 //根据用户在命令行配置的num_threads参数
 //来选择创建一个多线程的模型,还是current_thread的模型
 //后面有时间深入研究tokio的时候再来分析有什么异同
 let tokio_runtime = get_runtime(config.num_threads)?;
 //block_on会让线程一直等待方法里的future执行完成
 //这是让闭包中的方法占有了io driver 和 timer context
 tokio_runtime.block_on(async move {
        let host = config.host;
        match config.command {
            // 省略其它command ... 

            Command::Run(config) => {
                //具体去子类型里执行,然后await一个结果
                if let Err(e) = commands::run::command(logging_level, *config).await {
                    eprintln!("Server command failed: {}", e);
                    std::process::exit(ReturnCode::Failure as _)
                }
            }
        }
});

influxdb_ioxd::main方法中,忽略一些不太需要重点关注的,分别是初始化log的管理、PanicsTracingCancellationToken等。

    //初始化对象存储
    let object_store = ObjectStore::try_from(&config)?;
    //可以看到,目前已经支持了
    //1.内存(在container环境运行时候使用)
    //2.Google
    //3.S3
    //4.Azure
    //5.File 本地文件,方便开发者调试运行在云上时候的文件变化
    fn try_from(config: &Config) -> Result<Self, Self::Error> {
        match config.object_store {
            Some(ObjStoreOpt::Memory) | None => {
           //创建一个btreemap用来缓存或者搜索
           Ok(Self::new_in_memory(object_store::memory::InMemory::new()))
            }

            Some(ObjStoreOpt::Google) => {
                // 省略
            }

            Some(ObjStoreOpt::S3) => {
               // 省略
            }

            Some(ObjStoreOpt::Azure) => { 
              // 省略
            }

            Some(ObjStoreOpt::File) => match config.database_directory.as_ref() {
                Some(db_dir) => {
                    //去递归创建这个配置路径中的文件夹
                    //context也是使用的snafu来处理错误的
                    fs::create_dir_all(db_dir)
                        .context(CreatingDatabaseDirectory { path: db_dir })?;
                    //都创建完成,并且没出错误,把路径保存起来
                    Ok(Self::new_file(object_store::disk::File::new(&db_dir)))
                }
                // 如果database_directory这个参数没有配置的时候
                //使用snafu这个crate来返回一个错误
                None => MissingObjectStoreConfig {
                    object_store: ObjStoreOpt::File,
                    missing: "data-dir",
                }
                .fail(),
            },
        }
    }

关于错误处理的代码:

 #[snafu(display("Unable to create database directory {:?}: {}", path, source))]
    CreatingDatabaseDirectory {
        path: PathBuf,
        source: std::io::Error,
    },

 #[snafu(display(
        "Specified {} for the object store, required configuration missing for {}",
        object_store,
        missing
    ))]
    MissingObjectStoreConfig {
        object_store: ObjStoreOpt,
        missing: String,
    },

我们来测试一下错误的场景,来看看是否符合代码的预期。

// 不传入路径
 cargo run run --object-store file
    Finished dev [unoptimized + debuginfo] target(s) in 0.42s
     Running `./influxdb_iox run --object-store file`
Apr 15 13:38:34.352  INFO influxdb_iox::influxdb_ioxd: Using File for object storage
Server command failed: Run: Specified File for the object store, required configuration missing for data-dir

//传入一个创建不了的路径
cargo run run --object-store file --data-dir /root/1/1
    Finished dev [unoptimized + debuginfo] target(s) in 0.47s
     Running `./influxdb_iox run --object-store file --data-dir /root/1/1`
Apr 15 13:45:26.664  INFO influxdb_iox::influxdb_ioxd: Using File for object storage
Server command failed: Run: Unable to create database directory "/root/1/1": Read-only file system (os error 30)

可以看到是符合预期的,bingo


//创建一个空的结构体
let connection_manager = ConnectionManager {};
//创建AppServer结构体用来保存基本的信息
//server_config里就是保存的对象存储的信息及线程配置
//如果num_worker_threads没有填写,默认就使用cpu数量
let app_server = Arc::new(AppServer::new(connection_manager, server_config));
//不设置这个writer_id能启动,但是不能做任何操作
if let Some(id) = config.writer_id {
        //compare and set 一个非0的数值,错误就打印一个指定的panic
        app_server.set_id(id).expect("writer id already set");
        //校验所有的配置
        if let Err(e) = app_server.load_database_configs().await {
            error!(
                "unable to load database configurations from object storage: {}",
                e
            )
        }
    } else {
        warn!("server ID not set. ID must be set via the INFLUXDB_IOX_ID config or API before writing or querying data.");
    }

接下来进入load_database_configs方法看看,

let list_result = self
            .store
            //把write_id和配置的文件路径组合一下,作为一个目录
            //遍历文件夹中的所有东西,用一个BTreeSet存所有子文件夹
            //用Vec存下所有的文件信息,包括路径、修改时间、大小等
            .list_with_delimiter(&self.root_path()?)
            .await
            .context(StoreError)?;
        //拿到配置的server的write_id
        let server_id = self.require_id()?;

        let handles: Vec<_> = list_result
            //配置的文件夹下的所有文件夹
            .common_prefixes
            .into_iter()
            //全部进行map转换
            .map(|mut path| {
                let store = Arc::clone(&self.store);
                let config = Arc::clone(&self.config);
                let exec = Arc::clone(&self.exec);
                //先找database的相关信息文件,名字叫rules.pb
                path.set_file_name(DB_RULES_FILE_NAME);
                //感觉是需要io来读取文件内容,所以开一个异步
                tokio::task::spawn(async move {
                    let mut res = get_store_bytes(&path, &store).await;

                    //省略错误处理。。

                    let res = res.unwrap().freeze();
                    //解析文件内容,根据文件名可以看出是个pb文件。
                    match DatabaseRules::decode(res) {
                        Err(e) => {
                            //省略错误。。
                        }
                        //根据解析出来的文件内容,在内存中恢复回来db的相关信息
                        Ok(rules) => match config.create_db(rules) {
                            Err(e) => error!("error adding database to config: {}", e),
                            //提交一个后台任务,用来不断的检测chunks的状态
                            //比如达到了某个大小,然后写入到存储等
                            Ok(handle) => handle.commit(server_id, store, exec),
                        },
                    }
                })
            })
            .collect();
        //等待所有任务完成
        futures::future::join_all(handles).await;

这里就启动完成了一个基本的服务,创建了存储路径、初始化数据库的基本配置、启动了一个用来刷盘、整理chunk的后台任务。


接下来就是启动连接相关的了。

    //从启动命令行中读取grpc的地址
    let grpc_bind_addr = config.grpc_bind_address;
    //绑定这个地址
    let socket = tokio::net::TcpListener::bind(grpc_bind_addr)
        .await
        .context(StartListeningGrpc { grpc_bind_addr })?;
    //真正的协议启动
    let grpc_server = rpc::serve(socket, Arc::clone(&app_server), frontend_shutdown.clone()).fuse();

    //同样的启动http相关的服务,使用的hyper库
    let bind_addr = config.http_bind_address;
    let addr = AddrIncoming::bind(&bind_addr).context(StartListeningHttp { bind_addr })?;
    let http_server = http::serve(addr, Arc::clone(&app_server), frontend_shutdown.clone()).fuse();

    //省略后面的停止流程。。。

然后看grpc的启动的服务

    //启动起来健康检查的服务
    let stream = TcpListenerStream::new(socket);
    let (mut health_reporter, health_service) = tonic_health::server::health_reporter();
    //标识相对应的服务已经是可以提供服务的状态了
    let services = [
        generated_types::STORAGE_SERVICE,
        generated_types::IOX_TESTING_SERVICE,
        generated_types::ARROW_SERVICE,
    ];
    for service in &services {
        health_reporter
            .set_service_status(service, tonic_health::ServingStatus::Serving)
            .await;
    }

   //增加一堆使用grpc的服务,并启动起来
    tonic::transport::Server::builder()
        .add_service(health_service)
        .add_service(testing::make_server())
        .add_service(storage::make_server(Arc::clone(&server)))
        .add_service(flight::make_server(Arc::clone(&server)))
        .add_service(write::make_server(Arc::clone(&server)))
        .add_service(management::make_server(Arc::clone(&server)))
        .add_service(operations::make_server(server))
        .serve_with_incoming_shutdown(stream, shutdown.cancelled())
        .await

然后是http相关的启动

pub async fn serve<M>(
    addr: AddrIncoming,
    server: Arc<AppServer<M>>,
    shutdown: CancellationToken,
) -> Result<(), hyper::Error>
where
    M: ConnectionManager + Send + Sync + Debug + 'static,
{
    //初始化路由相关的信息
    let router = router(server);
    let service = RouterService::new(router).unwrap();
    //启动服务
    hyper::Server::builder(addr)
        .serve(service)
        .with_graceful_shutdown(shutdown.cancelled())
        .await
}

顺便看一下都提供了哪些地址可以被访问的:

 Router::builder()
        .data(server)
        //写了一个拦截,打印请求参数和返回结果
        .middleware(Middleware::pre(|req| async move {
            debug!(request = ?req, "Processing request");
            Ok(req)
        }))
        .middleware(Middleware::post(|res| async move {
            debug!(response = ?res, "Successfully processed request");
            Ok(res)
        })) // this endpoint is for API backward compatibility with InfluxDB 2.x
        .post("/api/v2/write", write::<M>)
        .get("/health", health)
        .get("/metrics", handle_metrics)
        .get("/iox/api/v1/databases/:name/query", query::<M>)
        .get("/iox/api/v1/databases/:name/wal/meta", get_wal_meta::<M>)
        .get("/api/v1/partitions", list_partitions::<M>)
        .post("/api/v1/snapshot", snapshot_partition::<M>)
        //错误的时候调用的处理拦截
        .err_handler_with_info(error_handler)
        .build()
        .unwrap()

做一个/health的测试:

curl localhost:8080/health
OK%

可以看到成功返回了值。


到这里基本启动就完成了,后面再用到的时候会继续对启动里的细节做研究,比如PanicsLog等等吧,欢迎持续关注。

祝玩儿的开心


欢迎关注微信公众号:

或添加微信好友: liutaohua001

关键词:站长资讯中心站长资讯站长新闻

标签:Run,service,IOx,object,server,源码,let,config,store
来源: https://www.cnblogs.com/0591jb/p/14753368.html