大家好,我是考100分的小小码 ,祝大家学习进步,加薪顺利呀。今天说一说元数据驱动的saas功能定制_doris apache,希望您对编程的造诣更进一步.
元数据设计
如上图,Doris 的元数据主要存储4类数据:
- 用户数据信息。包括数据库、表的 Schema、分片信息等。
- 各类作业信息。如导入作业,Clone 作业、SchemaChange 作业等。
- 用户及权限信息
- 集群及节点信息
元数据目录
元数据目录通过 FE 的配置项 meta_dir 指定。
bdb/ 目录下为 bdbje 的数据存放目录。
image/ 目录下为 image 文件的存放目录。
image.[logid] 是最新的 image 文件。后缀 logid 表明 image 所包含的最后一条日志的 id。
image.ckpt 是正在写入的 image 文件,如果写入成功,会重命名为 image.[logid],并替换掉旧的 image 文件。
VERSION 文件中记录着 cluster_id。cluster_id 唯一标识一个 Doris 集群。是在 leader 第一次启动时随机生成的一个 32 位整型。也可以通过 fe 配置项 cluster_id 来指定一个 cluster id。
ROLE 文件中记录的 FE 自身的角色。只有 FOLLOWER 和 OBSERVER 两种。其中 FOLLOWER 表示 FE 为一个可选举的节点。(注意:即使是 leader 节点,其角色也为 FOLLOWER)
DDL相关源代码阅读
启动MySQL服务
org.apache.doris.qe.QeService
if (nioEnabled) {
mysqlServer = new NMysqlServer(port, scheduler);
} else {
mysqlServer = new MysqlServer(port, scheduler);
}
DDL代码调用过程
org.apache.doris.qe.ConnectProcessor#dispatch 命令识别
switch (command) {
case COM_INIT_DB:
handleInitDb();
break;
case COM_QUIT:
handleQuit();
break;
case COM_QUERY:
handleQuery();
break;
case COM_FIELD_LIST:
handleFieldList();
break;
case COM_PING:
handlePing();
break;
default:
ctx.getState().setError("Unsupported command(" + command + ")");
LOG.warn("Unsupported command(" + command + ")");
break;
org.apache.doris.qe.ConnectProcessor#analyze 词法语法解析
// Parse statement with parser generated by CUP&FLEX
SqlScanner input = new SqlScanner(new StringReader(originStmt), ctx.getSessionVariable().getSqlMode());
SqlParser parser = new SqlParser(input);
从连接中读取原始语句字符串
词法解析文件
• fe/fe-core/src/main/jflex/sql_scanner.flex
• 语法解析文件
• fe/fe-core/src/main/cup/sql_parser.cup
所有语法实现类:
StatementBase [vim org/apache/doris/analysis/StatementBase.java +33]
├── ExportStmt [vim org/apache/doris/analysis/ExportStmt.java +48]
├── ImportColumnsStmt [vim org/apache/doris/analysis/ImportColumnsStmt.java +21]
├── ImportDeleteOnStmt [vim org/apache/doris/analysis/ImportDeleteOnStmt.java +19]
├── ImportSequenceStmt [vim org/apache/doris/analysis/ImportSequenceStmt.java +19]
├── ImportWhereStmt [vim org/apache/doris/analysis/ImportWhereStmt.java +19]
├── KillStmt [vim org/apache/doris/analysis/KillStmt.java +19]
├── SetStmt [vim org/apache/doris/analysis/SetStmt.java +24]
├── UseStmt [vim org/apache/doris/analysis/UseStmt.java +33]
├── QueryStmt [vim org/apache/doris/analysis/QueryStmt.java +38]
│ ├── SelectStmt [vim org/apache/doris/analysis/SelectStmt.java +65]
│ └── SetOperationStmt [vim org/apache/doris/analysis/SetOperationStmt.java +36]
├── ShowStmt [vim org/apache/doris/analysis/ShowStmt.java +22]
│ ├── AdminShowConfigStmt [vim org/apache/doris/analysis/AdminShowConfigStmt.java +33]
│ ├── AdminShowDataSkewStmt [vim org/apache/doris/analysis/AdminShowDataSkewStmt.java +32]
│ ├── AdminShowReplicaDistributionStmt [vim org/apache/doris/analysis/AdminShowReplicaDistributionStmt.java +34]
│ ├── AdminShowReplicaStatusStmt [vim org/apache/doris/analysis/AdminShowReplicaStatusStmt.java +39]
│ ├── DescribeStmt [vim org/apache/doris/analysis/DescribeStmt.java +54]
│ ├── HelpStmt [vim org/apache/doris/analysis/HelpStmt.java +26]
│ ├── ShowAlterStmt [vim org/apache/doris/analysis/ShowAlterStmt.java +46]
│ ├── ShowAuthorStmt [vim org/apache/doris/analysis/ShowAuthorStmt.java +23]
│ ├── ShowBackendsStmt [vim org/apache/doris/analysis/ShowBackendsStmt.java +30]
│ ├── ShowBackupStmt [vim org/apache/doris/analysis/ShowBackupStmt.java +38]
│ ├── ShowBrokerStmt [vim org/apache/doris/analysis/ShowBrokerStmt.java +30]
│ ├── ShowCharsetStmt [vim org/apache/doris/analysis/ShowCharsetStmt.java +23]
│ ├── ShowClusterStmt [vim org/apache/doris/analysis/ShowClusterStmt.java +34]
│ ├── ShowCollationStmt [vim org/apache/doris/analysis/ShowCollationStmt.java +24]
│ ├── ShowColumnStatsStmt [vim org/apache/doris/analysis/ShowColumnStatsStmt.java +28]
│ ├── ShowColumnStmt [vim org/apache/doris/analysis/ShowColumnStmt.java +28]
│ ├── ShowCreateDbStmt [vim org/apache/doris/analysis/ShowCreateDbStmt.java +36]
│ ├── ShowCreateFunctionStmt [vim org/apache/doris/analysis/ShowCreateFunctionStmt.java +32]
│ ├── ShowCreateRoutineLoadStmt [vim org/apache/doris/analysis/ShowCreateRoutineLoadStmt.java +24]
│ ├── ShowCreateTableStmt [vim org/apache/doris/analysis/ShowCreateTableStmt.java +29]
│ ├── ShowDataStmt [vim org/apache/doris/analysis/ShowDataStmt.java +56]
│ ├── ShowDbIdStmt [vim org/apache/doris/analysis/ShowDbIdStmt.java +29]
│ ├── ShowDbStmt [vim org/apache/doris/analysis/ShowDbStmt.java +27]
│ ├── ShowDeleteStmt [vim org/apache/doris/analysis/ShowDeleteStmt.java +31]
│ ├── ShowDynamicPartitionStmt [vim org/apache/doris/analysis/ShowDynamicPartitionStmt.java +29]
│ ├── ShowEncryptKeysStmt [vim org/apache/doris/analysis/ShowEncryptKeysStmt.java +32]
│ ├── ShowEnginesStmt [vim org/apache/doris/analysis/ShowEnginesStmt.java +23]
│ ├── ShowEventsStmt [vim org/apache/doris/analysis/ShowEventsStmt.java +23]
│ ├── ShowExportStmt [vim org/apache/doris/analysis/ShowExportStmt.java +40]
│ ├── ShowFrontendsStmt [vim org/apache/doris/analysis/ShowFrontendsStmt.java +30]
│ ├── ShowFunctionsStmt [vim org/apache/doris/analysis/ShowFunctionsStmt.java +32]
│ ├── ShowGrantsStmt [vim org/apache/doris/analysis/ShowGrantsStmt.java +32]
│ ├── ShowIndexStmt [vim org/apache/doris/analysis/ShowIndexStmt.java +32]
│ ├── ShowLoadProfileStmt [vim org/apache/doris/analysis/ShowLoadProfileStmt.java +27]
│ ├── ShowLoadStmt [vim org/apache/doris/analysis/ShowLoadStmt.java +42]
│ ├── ShowLoadWarningsStmt [vim org/apache/doris/analysis/ShowLoadWarningsStmt.java +36]
│ ├── ShowMigrationsStmt [vim org/apache/doris/analysis/ShowMigrationsStmt.java +31]
│ ├── ShowOpenTableStmt [vim org/apache/doris/analysis/ShowOpenTableStmt.java +23]
│ ├── ShowPartitionIdStmt [vim org/apache/doris/analysis/ShowPartitionIdStmt.java +29]
│ ├── ShowPartitionsStmt [vim org/apache/doris/analysis/ShowPartitionsStmt.java +49]
│ ├── ShowPluginsStmt [vim org/apache/doris/analysis/ShowPluginsStmt.java +23]
│ ├── ShowProcStmt [vim org/apache/doris/analysis/ShowProcStmt.java +32]
│ ├── ShowProcedureStmt [vim org/apache/doris/analysis/ShowProcedureStmt.java +23]
│ ├── ShowProcesslistStmt [vim org/apache/doris/analysis/ShowProcesslistStmt.java +24]
│ ├── ShowQueryProfileStmt [vim org/apache/doris/analysis/ShowQueryProfileStmt.java +27]
│ ├── ShowRepositoriesStmt [vim org/apache/doris/analysis/ShowRepositoriesStmt.java +25]
│ ├── ShowResourcesStmt [vim org/apache/doris/analysis/ShowResourcesStmt.java +37]
│ ├── ShowRestoreStmt [vim org/apache/doris/analysis/ShowRestoreStmt.java +38]
│ ├── ShowRolesStmt [vim org/apache/doris/analysis/ShowRolesStmt.java +29]
│ ├── ShowRollupStmt [vim org/apache/doris/analysis/ShowRollupStmt.java +28]
│ ├── ShowRoutineLoadStmt [vim org/apache/doris/analysis/ShowRoutineLoadStmt.java +34]
│ ├── ShowRoutineLoadTaskStmt [vim org/apache/doris/analysis/ShowRoutineLoadTaskStmt.java +32]
│ ├── ShowSmallFilesStmt [vim org/apache/doris/analysis/ShowSmallFilesStmt.java +32]
│ ├── ShowSnapshotStmt [vim org/apache/doris/analysis/ShowSnapshotStmt.java +29]
│ ├── ShowSqlBlockRuleStmt [vim org/apache/doris/analysis/ShowSqlBlockRuleStmt.java +31]
│ ├── ShowStatusStmt [vim org/apache/doris/analysis/ShowStatusStmt.java +23]
│ ├── ShowStreamLoadStmt [vim org/apache/doris/analysis/ShowStreamLoadStmt.java +39]
│ ├── ShowSyncJobStmt [vim org/apache/doris/analysis/ShowSyncJobStmt.java +33]
│ ├── ShowTableIdStmt [vim org/apache/doris/analysis/ShowTableIdStmt.java +30]
│ ├── ShowTableStatsStmt [vim org/apache/doris/analysis/ShowTableStatsStmt.java +32]
│ ├── ShowTableStatusStmt [vim org/apache/doris/analysis/ShowTableStatusStmt.java +35]
│ ├── ShowTableStmt [vim org/apache/doris/analysis/ShowTableStmt.java +34]
│ ├── ShowTabletStmt [vim org/apache/doris/analysis/ShowTabletStmt.java +39]
│ ├── ShowTransactionStmt [vim org/apache/doris/analysis/ShowTransactionStmt.java +35]
│ ├── ShowTrashDiskStmt [vim org/apache/doris/analysis/ShowTrashDiskStmt.java +33]
│ ├── ShowTrashStmt [vim org/apache/doris/analysis/ShowTrashStmt.java +36]
│ ├── ShowTriggersStmt [vim org/apache/doris/analysis/ShowTriggersStmt.java +23]
│ ├── ShowUserPropertyStmt [vim org/apache/doris/analysis/ShowUserPropertyStmt.java +42]
│ ├── ShowUserStmt [vim org/apache/doris/analysis/ShowUserStmt.java +25]
│ ├── ShowVariablesStmt [vim org/apache/doris/analysis/ShowVariablesStmt.java +29]
│ ├── ShowViewStmt [vim org/apache/doris/analysis/ShowViewStmt.java +39]
│ ├── ShowWarningStmt [vim org/apache/doris/analysis/ShowWarningStmt.java +23]
│ └── ShowWhiteListStmt [vim org/apache/doris/analysis/ShowWhiteListStmt.java +23]
├── TransactionStmt [vim org/apache/doris/analysis/TransactionStmt.java +22]
│ ├── TransactionBeginStmt [vim org/apache/doris/analysis/TransactionBeginStmt.java +24]
│ ├── TransactionCommitStmt [vim org/apache/doris/analysis/TransactionCommitStmt.java +19]
│ └── TransactionRollbackStmt [vim org/apache/doris/analysis/TransactionRollbackStmt.java +19]
├── UnsupportedStmt [vim org/apache/doris/analysis/UnsupportedStmt.java +22]
│ └── EmptyStmt [vim org/apache/doris/analysis/EmptyStmt.java +19]
└── DdlStmt [vim org/apache/doris/analysis/DdlStmt.java +19]
├── AdminCancelRepairTableStmt [vim org/apache/doris/analysis/AdminCancelRepairTableStmt.java +33]
├── AdminCheckTabletsStmt [vim org/apache/doris/analysis/AdminCheckTabletsStmt.java +33]
├── AdminCleanTrashStmt [vim org/apache/doris/analysis/AdminCleanTrashStmt.java +34]
├── AdminRepairTableStmt [vim org/apache/doris/analysis/AdminRepairTableStmt.java +33]
├── AdminSetConfigStmt [vim org/apache/doris/analysis/AdminSetConfigStmt.java +32]
├── AdminSetReplicaStatusStmt [vim org/apache/doris/analysis/AdminSetReplicaStatusStmt.java +30]
├── AlterClusterStmt [vim org/apache/doris/analysis/AlterClusterStmt.java +29]
├── AlterColumnStatsStmt [vim org/apache/doris/analysis/AlterColumnStatsStmt.java +33]
├── AlterDatabasePropertyStmt [vim org/apache/doris/analysis/AlterDatabasePropertyStmt.java +24]
├── AlterDatabaseQuotaStmt [vim org/apache/doris/analysis/AlterDatabaseQuotaStmt.java +30]
├── AlterDatabaseRename [vim org/apache/doris/analysis/AlterDatabaseRename.java +34]
├── AlterRoutineLoadStmt [vim org/apache/doris/analysis/AlterRoutineLoadStmt.java +34]
├── AlterSqlBlockRuleStmt [vim org/apache/doris/analysis/AlterSqlBlockRuleStmt.java +31]
├── AlterSystemStmt [vim org/apache/doris/analysis/AlterSystemStmt.java +28]
├── AlterTableStatsStmt [vim org/apache/doris/analysis/AlterTableStatsStmt.java +33]
├── AlterTableStmt [vim org/apache/doris/analysis/AlterTableStmt.java +37]
├── CancelLoadStmt [vim org/apache/doris/analysis/CancelLoadStmt.java +26]
├── CreateClusterStmt [vim org/apache/doris/analysis/CreateClusterStmt.java +33]
├── CreateDataSyncJobStmt [vim org/apache/doris/analysis/CreateDataSyncJobStmt.java +36]
├── CreateDbStmt [vim org/apache/doris/analysis/CreateDbStmt.java +31]
├── CreateEncryptKeyStmt [vim org/apache/doris/analysis/CreateEncryptKeyStmt.java +30]
├── CreateFileStmt [vim org/apache/doris/analysis/CreateFileStmt.java +35]
├── CreateFunctionStmt [vim org/apache/doris/analysis/CreateFunctionStmt.java +47]
├── CreateMaterializedViewStmt [vim org/apache/doris/analysis/CreateMaterializedViewStmt.java +43]
├── CreateRepositoryStmt [vim org/apache/doris/analysis/CreateRepositoryStmt.java +28]
├── CreateResourceStmt [vim org/apache/doris/analysis/CreateResourceStmt.java +32]
├── CreateRoleStmt [vim org/apache/doris/analysis/CreateRoleStmt.java +28]
├── CreateRoutineLoadStmt [vim org/apache/doris/analysis/CreateRoutineLoadStmt.java +48]
├── CreateSqlBlockRuleStmt [vim org/apache/doris/analysis/CreateSqlBlockRuleStmt.java +37]
├── CreateTableAsSelectStmt [vim org/apache/doris/analysis/CreateTableAsSelectStmt.java +26]
├── CreateTableLikeStmt [vim org/apache/doris/analysis/CreateTableLikeStmt.java +31]
├── CreateTableStmt [vim org/apache/doris/analysis/CreateTableStmt.java +56]
├── CreateUserStmt [vim org/apache/doris/analysis/CreateUserStmt.java +36]
├── DeleteStmt [vim org/apache/doris/analysis/DeleteStmt.java +35]
├── DropClusterStmt [vim org/apache/doris/analysis/DropClusterStmt.java +31]
├── DropDbStmt [vim org/apache/doris/analysis/DropDbStmt.java +30]
├── DropEncryptKeyStmt [vim org/apache/doris/analysis/DropEncryptKeyStmt.java +28]
├── DropFileStmt [vim org/apache/doris/analysis/DropFileStmt.java +34]
├── DropFunctionStmt [vim org/apache/doris/analysis/DropFunctionStmt.java +27]
├── DropMaterializedViewStmt [vim org/apache/doris/analysis/DropMaterializedViewStmt.java +29]
├── DropRepositoryStmt [vim org/apache/doris/analysis/DropRepositoryStmt.java +27]
├── DropResourceStmt [vim org/apache/doris/analysis/DropResourceStmt.java +27]
├── DropRoleStmt [vim org/apache/doris/analysis/DropRoleStmt.java +28]
├── DropSqlBlockRuleStmt [vim org/apache/doris/analysis/DropSqlBlockRuleStmt.java +30]
├── DropTableStmt [vim org/apache/doris/analysis/DropTableStmt.java +28]
├── DropUserStmt [vim org/apache/doris/analysis/DropUserStmt.java +27]
├── EnterStmt [vim org/apache/doris/analysis/EnterStmt.java +25]
├── GrantStmt [vim org/apache/doris/analysis/GrantStmt.java +39]
├── InsertStmt [vim org/apache/doris/analysis/InsertStmt.java +66]
├── InstallPluginStmt [vim org/apache/doris/analysis/InstallPluginStmt.java +31]
├── LinkDbStmt [vim org/apache/doris/analysis/LinkDbStmt.java +31]
├── LoadStmt [vim org/apache/doris/analysis/LoadStmt.java +45]
├── MigrateDbStmt [vim org/apache/doris/analysis/MigrateDbStmt.java +29]
├── PauseRoutineLoadStmt [vim org/apache/doris/analysis/PauseRoutineLoadStmt.java +26]
├── PauseSyncJobStmt [vim org/apache/doris/analysis/PauseSyncJobStmt.java +22]
├── RecoverDbStmt [vim org/apache/doris/analysis/RecoverDbStmt.java +33]
├── RecoverPartitionStmt [vim org/apache/doris/analysis/RecoverPartitionStmt.java +32]
├── RecoverTableStmt [vim org/apache/doris/analysis/RecoverTableStmt.java +32]
├── ResumeRoutineLoadStmt [vim org/apache/doris/analysis/ResumeRoutineLoadStmt.java +26]
├── ResumeSyncJobStmt [vim org/apache/doris/analysis/ResumeSyncJobStmt.java +22]
├── RevokeStmt [vim org/apache/doris/analysis/RevokeStmt.java +32]
├── SetUserPropertyStmt [vim org/apache/doris/analysis/SetUserPropertyStmt.java +31]
├── StopRoutineLoadStmt [vim org/apache/doris/analysis/StopRoutineLoadStmt.java +23]
├── StopSyncJobStmt [vim org/apache/doris/analysis/StopSyncJobStmt.java +22]
├── SyncStmt [vim org/apache/doris/analysis/SyncStmt.java +22]
├── TruncateTableStmt [vim org/apache/doris/analysis/TruncateTableStmt.java +27]
├── UninstallPluginStmt [vim org/apache/doris/analysis/UninstallPluginStmt.java +28]
├── UpdateStmt [vim org/apache/doris/analysis/UpdateStmt.java +35]
├── AbstractBackupStmt [vim org/apache/doris/analysis/AbstractBackupStmt.java +36]
│ ├── BackupStmt [vim org/apache/doris/analysis/BackupStmt.java +29]
│ └── RestoreStmt [vim org/apache/doris/analysis/RestoreStmt.java +33]
├── BaseViewStmt [vim org/apache/doris/analysis/BaseViewStmt.java +39]
│ ├── AlterViewStmt [vim org/apache/doris/analysis/AlterViewStmt.java +31]
│ └── CreateViewStmt [vim org/apache/doris/analysis/CreateViewStmt.java +33]
└── CancelStmt [vim org/apache/doris/analysis/CancelStmt.java +19]
├── CancelAlterSystemStmt [vim org/apache/doris/analysis/CancelAlterSystemStmt.java +28]
├── CancelAlterTableStmt [vim org/apache/doris/analysis/CancelAlterTableStmt.java +31]
└── CancelBackupStmt [vim org/apache/doris/analysis/CancelBackupStmt.java +30]
org.apache.doris.qe.StmtExecutor#execute(TUniqueId)
analyze(context.getSessionVariable().toThrift()); //语义解析
if (isForwardToMaster()) {
forwardToMaster(); //转发处理
if (masterOpExecutor != null && masterOpExecutor.getQueryId() != null) {
context.setQueryId(masterOpExecutor.getQueryId());
}
return;
} else {
LOG.debug("no need to transfer to Master. stmt: {}", context.getStmtId());
}
//命令执行
else if (parsedStmt instanceof DdlStmt) {
handleDdlStmt();
} else if (parsedStmt instanceof ShowStmt) {
handleShow();
} else if (parsedStmt instanceof KillStmt) {
handleKill();
}
语义解析
判断含义的正确性
@Override
public void analyze(Analyzer analyzer) throws UserException {
super.analyze(analyzer);
tableName.analyze(analyzer);
checkTblPriv(ConnectContext.get(), tableName.getDb(),
tableName.getTbl(), PrivPredicate.CREATE)
analyzeEngineName();
keysDesc.analyze(columnDefs);
for (ColumnDef columnDef : columnDefs) {
columnDef.analyze(engineName.equals("olap"));
}
partitionDesc.analyze(columnDefs, properties);
distributionDesc.analyze(columnSet);
}
验证名称是否合法
权限是否正确
分区是否合法
列类型是否合法
转发处理
Master、Follower、Observer
只有Master有元数据的修改能力
所有需要修改元数据的操作,需要转发到Master去执行
转发类型:
FORWARD_NO_SYNC
FORWARD_WITH_SYNC
NO_FORWARD
DDL 采用 FORWARD_WITH_SYNC
命令执行
org.apache.doris.qe.DdlExecutor#execute()
//根据语句类型执行相应的函数
if (ddlStmt instanceof CreateTableStmt) {
catalog.createTable((CreateTableStmt) ddlStmt);
}
支持多种表类型, 除了olap 表, 其余都为映射表
if (engineName.equals("olap")) {
createOlapTable(db, stmt);
return;
} else if (engineName.equals("odbc")) {
createOdbcTable(db, stmt);
return;
} else if (engineName.equals("mysql")) {
createMysqlTable(db, stmt);
return;
} else if (engineName.equals("broker")) {
createBrokerTable(db, stmt);
return;
} else if (engineName.equalsIgnoreCase("elasticsearch") || engineName.equalsIgnoreCase("es")) {
createEsTable(db, stmt);
return;
} else if (engineName.equalsIgnoreCase("hive")) {
createHiveTable(db, stmt);
return;
}
org.apache.doris.catalog.Catalog#createOlapTable
//将语法对象转为元数据对象
String tableName = stmt.getTableName();
LOG.debug("begin create olap table: {}", tableName);
// create columns
List<Column> baseSchema = stmt.getColumns();
validateColumns(baseSchema);
// create partition info
PartitionDesc partitionDesc = stmt.getPartitionDesc();
PartitionInfo partitionInfo = null;
//创建table对象
long tableId = Catalog.getCurrentCatalog().getNextId();
OlapTable olapTable = new OlapTable(tableId, tableName, baseSchema, keysType, partitionInfo,
distributionInfo, indexes);
// 创建Partition 对象
if (partitionInfo.getType() == PartitionType.UNPARTITIONED) {
// this is a 1-level partitioned table
// use table name as partition name
String partitionName = tableName;
long partitionId = partitionNameToId.get(partitionName);
// create partition
Partition partition = createPartitionWithIndices()
olapTable.addPartition(partition);
}
//添加元数据并进行持久化
Pair<Boolean, Boolean> result = db.createTableWithLock(olapTable, false, stmt.isSetIfNotExists());
org.apache.doris.catalog.Catalog#createPartitionWithIndices
分区是表的实体
// create base index first.
Preconditions.checkArgument(baseIndexId != -1);
MaterializedIndex baseIndex = new MaterializedIndex(baseIndexId, IndexState.NORMAL);
// create partition with base index
Partition partition = new Partition(partitionId, partitionName, baseIndex, distributionInfo);
// add to index map
Map<Long, MaterializedIndex> indexMap = new HashMap<>();
indexMap.put(baseIndexId, baseIndex);
// create rollup index if has
for (long indexId : indexIdToMeta.keySet()) {
if (indexId == baseIndexId) {
continue;
}
MaterializedIndex rollup = new MaterializedIndex(indexId, IndexState.NORMAL);
indexMap.put(indexId, rollup);
}
for (Map.Entry<Long, MaterializedIndex> entry : indexMap.entrySet()) {
// create tablets
int schemaHash = indexMeta.getSchemaHash();
TabletMeta tabletMeta = new TabletMeta(dbId, tableId, partitionId, indexId, schemaHash, storageMedium);
createTablets(clusterName, index, ReplicaState.NORMAL, distributionInfo, version, versionHash,
replicaAlloc, tabletMeta, tabletIdSet);
// add create replica task for olap
short shortKeyColumnCount = indexMeta.getShortKeyColumnCount();
TStorageType storageType = indexMeta.getStorageType();
List<Column> schema = indexMeta.getSchema();
KeysType keysType = indexMeta.getKeysType();
int totalTaskNum = index.getTablets().size() * totalReplicaNum;
MarkedCountDownLatch<Long, Long> countDownLatch = new MarkedCountDownLatch<Long, Long>(totalTaskNum);
AgentBatchTask batchTask = new AgentBatchTask();
for (Tablet tablet : index.getTablets()) {
long tabletId = tablet.getId();
for (Replica replica : tablet.getReplicas()) {
long backendId = replica.getBackendId();
countDownLatch.addMark(backendId, tabletId);
CreateReplicaTask task = new CreateReplicaTask(backendId, dbId, tableId,
partitionId, indexId, tabletId,
shortKeyColumnCount, schemaHash,
version, versionHash,
keysType,
storageType, storageMedium,
schema, bfColumns, bfFpp,
countDownLatch,
indexes,
isInMemory,
tabletType);
task.setStorageFormat(storageFormat);
batchTask.addTask(task);
// add to AgentTaskQueue for handling finish report.
// not for resending task
AgentTaskQueue.addTask(task);
}
}
AgentTaskExecutor.submit(batchTask);
}
整体流程:
- 创建Partition 对象
- 创建MaterializedIndex对象
- 对于每个MaterializedIndex对象 创建创建Tablet
- 创建replica并下发任务到BE
// estimate timeout
long timeout = Config.tablet_create_timeout_second * 1000L * totalTaskNum;
timeout = Math.min(timeout, Config.max_create_table_timeout_second * 1000);
try {
ok = countDownLatch.await(timeout, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
LOG.warn("InterruptedException: ", e);
ok = false;
}
等待BE执行任务完成
org.apache.doris.catalog.Database#createTableWithLock
idToTable.put(table.getId(), table);
nameToTable.put(table.getName(), table);
lowerCaseToTableName.put(tableName.toLowerCase(), tableName);
if (!isReplay) {
// Write edit log
CreateTableInfo info = new CreateTableInfo(fullQualifiedName, table);
Catalog.getCurrentCatalog().getEditLog().logCreateTable(info);
}
if (table.getType() == TableType.ELASTICSEARCH) {
Catalog.getCurrentCatalog().getEsRepository().registerTable((EsTable) table);
}
将table添加到DataBase对象里
判断是否replay
写入元数据日志
流程总结:
FE与BE交互
FE与BE交互流程图
FE 发送任务
BE执行
BE汇报执行结果
FE汇总结果
AgentBatchTask batchTask = new AgentBatchTask();
for (Tablet tablet : index.getTablets()) {
long tabletId = tablet.getId();
for (Replica replica : tablet.getReplicas()) {
long backendId = replica.getBackendId();
countDownLatch.addMark(backendId, tabletId);
CreateReplicaTask task = new CreateReplicaTask(backendId, dbId, tableId,
partitionId, indexId, tabletId,
shortKeyColumnCount, schemaHash,
version, versionHash,
keysType,
storageType, storageMedium,
schema, bfColumns, bfFpp,
countDownLatch,
indexes,
isInMemory,
tabletType);
task.setStorageFormat(storageFormat);
batchTask.addTask(task);
// add to AgentTaskQueue for handling finish report.
// not for resending task
AgentTaskQueue.addTask(task);
}
}
AgentTaskExecutor.submit(batchTask);
AgentBatchTask:
收集Task并按照Be分组
AgentTaskExecutor:
发送AgentBatchTask
AgentTaskQueue:
处理任务完成的上报
BE任务接收
be/src/agent/agent_server.cpp
接收Task
// resend request when something is wrong(BE may need some logic to guarantee idempotence.
void AgentServer::submit_tasks(TAgentResult& agent_result,
const std::vector<TAgentTaskRequest>& tasks) {
Status ret_st;
// TODO check master_info here if it is the same with that of heartbeat rpc
if (_master_info.network_address.hostname == "" || _master_info.network_address.port == 0) {
Status ret_st = Status::Cancelled("Have not get FE Master heartbeat yet");
ret_st.to_thrift(&agent_result.status);
return;
}
for (auto task : tasks) {
VLOG_RPC << "submit one task: " << apache::thrift::ThriftDebugString(task).c_str();
TTaskType::type task_type = task.task_type;
int64_t signature = task.signature;
#define HANDLE_TYPE(t_task_type, work_pool, req_member)
case t_task_type:
if (task.__isset.req_member) {
work_pool->submit_task(task);
} else {
ret_st = Status::InvalidArgument(strings::Substitute(
"task(signature=$0) has wrong request member", signature));
}
break;
...
ret_st.to_thrift(&agent_result.status);
}
工作线程
while (_is_work) {
TAgentTaskRequest agent_task_req;
TCreateTabletReq create_tablet_req;
{
lock_guard<Mutex> worker_thread_lock(_worker_thread_lock);
while (_is_work && _tasks.empty()) {
_worker_thread_condition_variable.wait();
}
if (!_is_work) {
return;
}
//从队列中取出任务
agent_task_req = _tasks.front();
create_tablet_req = agent_task_req.create_tablet_req;
_tasks.pop_front();
//执行
OLAPStatus create_status = _env->storage_engine()->create_tablet(create_tablet_req);
TFinishTaskRequest finish_task_request;
finish_task_request.__set_finish_tablet_infos(finish_tablet_infos);
finish_task_request.__set_backend(_backend);
finish_task_request.__set_report_version(_s_report_version);
finish_task_request.__set_task_type(agent_task_req.task_type);
finish_task_request.__set_signature(agent_task_req.signature);
finish_task_request.__set_task_status(task_status);
//汇报结果
_finish_task(finish_task_request);
}
处理任务汇报
org.apache.doris.service.FrontendServiceImpl#finishTask
org.apache.doris.master.MasterImpl#finishTask
FE、BE通过Thrift协议通信
错误处理
org.apache.doris.task.AgentTaskQueue 存储正在执行的Task
org.apache.doris.master.ReportHandler#handleReport
org.apache.doris.master.ReportHandler#taskReport
BE: Report tasks/olap tablet/disk state to the master server
FE master 处理任务,超时会进行重试
private static void taskReport(long backendId, Map<TTaskType, Set<Long>> runningTasks) {
...
// to escape sending duplicate agent task to be
if (task.shouldResend(taskReportTime)) {
batchTask.addTask(task);
}
...
}
元数据持久化
Edit类似WAL
BDBJE 分布式KV存储
元数据持久化:org.apache.doris.catalog.Database#createTableWithLock
public Pair<Boolean, Boolean> createTableWithLock(Table table, boolean isReplay, boolean setIfNotExist) {
...
//更新内存
nameToTable.put(table.getName(), table);
// Write edit log
//构建元数据日志
CreateTableInfo info = new CreateTableInfo(fullQualifiedName, table);
//写入元数据日志
Catalog.getCurrentCatalog().getEditLog().logCreateTable(info);
...
}
元数据回放
元数据回放发生在FE leader 给 其他FE节点同步的时候
逐一回放元数据
在内存中复原元数据
org.apache.doris.catalog.Catalog#replayCreateTable
public void replayCreateTable(String dbName, Table table) {
Database db = this.fullNameToDb.get(dbName);
db.createTableWithLock(table, true, false);
...
}
如何实现一个新的语句
fe/fe-core/src/main/cup/sql_parser.cup 语法文件
KW_CREATE opt_external:isExternal KW_TABLE opt_if_not_exists:ifNotExists table_name:name
LPAREN column_definition_list:columns COMMA index_definition_list:indexes RPAREN opt_engine:engineName
opt_keys:keys
opt_comment:tableComment
opt_partition:partition
opt_distribution:distribution
opt_rollup:index
opt_properties:tblProperties
opt_ext_properties:extProperties
{:
RESULT = new CreateTableStmt(ifNotExists, isExternal, name, columns, indexes, engineName, keys, partition,
distribution, tblProperties, extProperties, tableComment, index);
:}
fe/fe-core/src/main/jflex/sql_scanner.flex 词法文件
keywordMap.put("create", new Integer(SqlParserSymbols.KW_CREATE));
keywordMap.put("cross", new Integer(SqlParserSymbols.KW_CROSS));
keywordMap.put("cube", new Integer(SqlParserSymbols.KW_CUBE));
keywordMap.put("current", new Integer(SqlParserSymbols.KW_CURRENT));
keywordMap.put("current_user", new Integer(SqlParserSymbols.KW_CURRENT_USER));
keywordMap.put("data", new Integer(SqlParserSymbols.KW_DATA));
keywordMap.put("database", new Integer(SqlParserSymbols.KW_DATABASE));
词法语法的代码生成:
cd fe/ && mvn clean install –DskipTests
• SqlScanner.java
• SqlParser.java
• SqlParserSymbols.java
实现新语句步骤总结:
- 定义词法语法文件
- 实现对应的语句类,比如CreateTableStmt
- 实现元数据修改的方法,如Catalog.createTable()
- 定义对应操作的元数据日志类,如CreateTableInfo
- 实现元数据日志的写入
- 实现对应的replay方法,如Catalog.replayCreateTable()
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
转载请注明出处: https://daima100.com/5700.html