大家好,我是考100分的小小码 ,祝大家学习进步,加薪顺利呀。今天说一说时序数据库Influx-IOx源码学习四(Run命令的执行)「建议收藏」,希望您对编程的造诣更进一步.
欢迎关注公众号:
上篇介绍到:InfluxDB-IOx的命令行及配置,详情见:https://my.oschina.net/u/3374539/blog/5017858
这章记录一下Run命令的执行过程。
//根据用户在命令行配置的num_threads参数
//来选择创建一个多线程的模型,还是current_thread的模型
//后面有时间深入研究tokio的时候再来分析有什么异同
let tokio_runtime = get_runtime(config.num_threads)?;
//block_on会让线程一直等待方法里的future执行完成
//这是让闭包中的方法占有了io driver 和 timer context
tokio_runtime.block_on(async move {
let host = config.host;
match config.command {
// 省略其它command ...
Command::Run(config) => {
//具体去子类型里执行,然后await一个结果
if let Err(e) = commands::run::command(logging_level, *config).await {
eprintln!("Server command failed: {}", e);
std::process::exit(ReturnCode::Failure as _)
}
}
}
});
在influxdb_ioxd::main
方法中,忽略一些不太需要重点关注的,分别是初始化log
的管理、PanicsTracing
、CancellationToken
等。
//初始化对象存储
let object_store = ObjectStore::try_from(&config)?;
//可以看到,目前已经支持了
//1.内存(在container环境运行时候使用)
//2.Google
//3.S3
//4.Azure
//5.File 本地文件,方便开发者调试运行在云上时候的文件变化
fn try_from(config: &Config) -> Result<Self, Self::Error> {
match config.object_store {
Some(ObjStoreOpt::Memory) | None => {
//创建一个btreemap用来缓存或者搜索
Ok(Self::new_in_memory(object_store::memory::InMemory::new()))
}
Some(ObjStoreOpt::Google) => {
// 省略
}
Some(ObjStoreOpt::S3) => {
// 省略
}
Some(ObjStoreOpt::Azure) => {
// 省略
}
Some(ObjStoreOpt::File) => match config.database_directory.as_ref() {
Some(db_dir) => {
//去递归创建这个配置路径中的文件夹
//context也是使用的snafu来处理错误的
fs::create_dir_all(db_dir)
.context(CreatingDatabaseDirectory { path: db_dir })?;
//都创建完成,并且没出错误,把路径保存起来
Ok(Self::new_file(object_store::disk::File::new(&db_dir)))
}
// 如果database_directory这个参数没有配置的时候
//使用snafu这个crate来返回一个错误
None => MissingObjectStoreConfig {
object_store: ObjStoreOpt::File,
missing: "data-dir",
}
.fail(),
},
}
}
关于错误处理的代码:
#[snafu(display("Unable to create database directory {:?}: {}", path, source))]
CreatingDatabaseDirectory {
path: PathBuf,
source: std::io::Error,
},
#[snafu(display(
"Specified {} for the object store, required configuration missing for {}",
object_store,
missing
))]
MissingObjectStoreConfig {
object_store: ObjStoreOpt,
missing: String,
},
我们来测试一下错误的场景,来看看是否符合代码的预期。
// 不传入路径
cargo run run --object-store file
Finished dev [unoptimized + debuginfo] target(s) in 0.42s
Running `./influxdb_iox run --object-store file`
Apr 15 13:38:34.352 INFO influxdb_iox::influxdb_ioxd: Using File for object storage
Server command failed: Run: Specified File for the object store, required configuration missing for data-dir
//传入一个创建不了的路径
cargo run run --object-store file --data-dir /root/1/1
Finished dev [unoptimized + debuginfo] target(s) in 0.47s
Running `./influxdb_iox run --object-store file --data-dir /root/1/1`
Apr 15 13:45:26.664 INFO influxdb_iox::influxdb_ioxd: Using File for object storage
Server command failed: Run: Unable to create database directory "/root/1/1": Read-only file system (os error 30)
可以看到是符合预期的,bingo
//创建一个空的结构体
let connection_manager = ConnectionManager {};
//创建AppServer结构体用来保存基本的信息
//server_config里就是保存的对象存储的信息及线程配置
//如果num_worker_threads没有填写,默认就使用cpu数量
let app_server = Arc::new(AppServer::new(connection_manager, server_config));
//不设置这个writer_id能启动,但是不能做任何操作
if let Some(id) = config.writer_id {
//compare and set 一个非0的数值,错误就打印一个指定的panic
app_server.set_id(id).expect("writer id already set");
//校验所有的配置
if let Err(e) = app_server.load_database_configs().await {
error!(
"unable to load database configurations from object storage: {}",
e
)
}
} else {
warn!("server ID not set. ID must be set via the INFLUXDB_IOX_ID config or API before writing or querying data.");
}
接下来进入load_database_configs
方法看看,
let list_result = self
.store
//把write_id和配置的文件路径组合一下,作为一个目录
//遍历文件夹中的所有东西,用一个BTreeSet存所有子文件夹
//用Vec存下所有的文件信息,包括路径、修改时间、大小等
.list_with_delimiter(&self.root_path()?)
.await
.context(StoreError)?;
//拿到配置的server的write_id
let server_id = self.require_id()?;
let handles: Vec<_> = list_result
//配置的文件夹下的所有文件夹
.common_prefixes
.into_iter()
//全部进行map转换
.map(|mut path| {
let store = Arc::clone(&self.store);
let config = Arc::clone(&self.config);
let exec = Arc::clone(&self.exec);
//先找database的相关信息文件,名字叫rules.pb
path.set_file_name(DB_RULES_FILE_NAME);
//感觉是需要io来读取文件内容,所以开一个异步
tokio::task::spawn(async move {
let mut res = get_store_bytes(&path, &store).await;
//省略错误处理。。
let res = res.unwrap().freeze();
//解析文件内容,根据文件名可以看出是个pb文件。
match DatabaseRules::decode(res) {
Err(e) => {
//省略错误。。
}
//根据解析出来的文件内容,在内存中恢复回来db的相关信息
Ok(rules) => match config.create_db(rules) {
Err(e) => error!("error adding database to config: {}", e),
//提交一个后台任务,用来不断的检测chunks的状态
//比如达到了某个大小,然后写入到存储等
Ok(handle) => handle.commit(server_id, store, exec),
},
}
})
})
.collect();
//等待所有任务完成
futures::future::join_all(handles).await;
这里就启动完成了一个基本的服务,创建了存储路径、初始化数据库的基本配置、启动了一个用来刷盘、整理chunk的后台任务。
接下来就是启动连接相关的了。
//从启动命令行中读取grpc的地址
let grpc_bind_addr = config.grpc_bind_address;
//绑定这个地址
let socket = tokio::net::TcpListener::bind(grpc_bind_addr)
.await
.context(StartListeningGrpc { grpc_bind_addr })?;
//真正的协议启动
let grpc_server = rpc::serve(socket, Arc::clone(&app_server), frontend_shutdown.clone()).fuse();
//同样的启动http相关的服务,使用的hyper库
let bind_addr = config.http_bind_address;
let addr = AddrIncoming::bind(&bind_addr).context(StartListeningHttp { bind_addr })?;
let http_server = http::serve(addr, Arc::clone(&app_server), frontend_shutdown.clone()).fuse();
//省略后面的停止流程。。。
然后看grpc的启动的服务
//启动起来健康检查的服务
let stream = TcpListenerStream::new(socket);
let (mut health_reporter, health_service) = tonic_health::server::health_reporter();
//标识相对应的服务已经是可以提供服务的状态了
let services = [
generated_types::STORAGE_SERVICE,
generated_types::IOX_TESTING_SERVICE,
generated_types::ARROW_SERVICE,
];
for service in &services {
health_reporter
.set_service_status(service, tonic_health::ServingStatus::Serving)
.await;
}
//增加一堆使用grpc的服务,并启动起来
tonic::transport::Server::builder()
.add_service(health_service)
.add_service(testing::make_server())
.add_service(storage::make_server(Arc::clone(&server)))
.add_service(flight::make_server(Arc::clone(&server)))
.add_service(write::make_server(Arc::clone(&server)))
.add_service(management::make_server(Arc::clone(&server)))
.add_service(operations::make_server(server))
.serve_with_incoming_shutdown(stream, shutdown.cancelled())
.await
然后是http相关的启动
pub async fn serve<M>(
addr: AddrIncoming,
server: Arc<AppServer<M>>,
shutdown: CancellationToken,
) -> Result<(), hyper::Error>
where
M: ConnectionManager + Send + Sync + Debug + "static,
{
//初始化路由相关的信息
let router = router(server);
let service = RouterService::new(router).unwrap();
//启动服务
hyper::Server::builder(addr)
.serve(service)
.with_graceful_shutdown(shutdown.cancelled())
.await
}
顺便看一下都提供了哪些地址可以被访问的:
Router::builder()
.data(server)
//写了一个拦截,打印请求参数和返回结果
.middleware(Middleware::pre(|req| async move {
debug!(request = ?req, "Processing request");
Ok(req)
}))
.middleware(Middleware::post(|res| async move {
debug!(response = ?res, "Successfully processed request");
Ok(res)
})) // this endpoint is for API backward compatibility with InfluxDB 2.x
.post("/api/v2/write", write::<M>)
.get("/health", health)
.get("/metrics", handle_metrics)
.get("/iox/api/v1/databases/:name/query", query::<M>)
.get("/iox/api/v1/databases/:name/wal/meta", get_wal_meta::<M>)
.get("/api/v1/partitions", list_partitions::<M>)
.post("/api/v1/snapshot", snapshot_partition::<M>)
//错误的时候调用的处理拦截
.err_handler_with_info(error_handler)
.build()
.unwrap()
做一个/health
的测试:
curl localhost:8080/health
OK%
可以看到成功返回了值。
到这里基本启动就完成了,后面再用到的时候会继续对启动里的细节做研究,比如Panics
,Log
等等吧,欢迎持续关注。
祝玩儿的开心
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
转载请注明出处: https://daima100.com/6299.html