atlas开源_apache atlas docker

atlas开源_apache atlas dockerApache Atlas 架构设计及源代码分析, 以Hive建库为例,分析元数据采集的主体流程

[Apache Atlas] Atlas 架构设计及源代码简单分析

Apache Atlas 架构图

image

Atlas 支持多数据源接入:Hive、HBase、Storm等

Type System

Type

Atlas 中定义了一些元数据类型

── AtlasBaseTypeDef
  │   ├── AtlasEnumDef
  │   └── AtlasStructDef
  │       ├── AtlasBusinessMetadataDef
  │       ├── AtlasClassificationDef
  │       ├── AtlasEntityDef
  │       └── AtlasRelationshipDef
  ├── AtlasStructType
  │   ├── AtlasBusinessMetadataType
  │   ├── AtlasClassificationType
  │   ├── AtlasRelationshipType
  │   └── AtlasEntityType
  │       └── AtlasRootEntityType
  ├── AtlasType
  │   ├── AtlasArrayType
  │   ├── AtlasBigDecimalType
  │   ├── AtlasBigIntegerType
  │   ├── AtlasByteType
  │   ├── AtlasDateType
  │   ├── AtlasDoubleType
  │   ├── AtlasEnumType
  │   ├── AtlasFloatType
  │   ├── AtlasIntType
  │   ├── AtlasLongType
  │   ├── AtlasMapType
  │   ├── AtlasObjectIdType
  │   ├── AtlasShortType
  │   ├── AtlasStringType
  │   └── AtlasStructType
  │       ├── AtlasBusinessMetadataType
  │       ├── AtlasClassificationType
  │       ├── AtlasEntityType
  │       └── AtlasRelationshipType
  ├── AtlasTypeDefStore
  │   └── AtlasTypeDefGraphStore
  │       └── AtlasTypeDefGraphStoreV2
  └── StructTypeDefinition
      └── HierarchicalTypeDefinition
          ├── ClassTypeDefinition
          └── TraitTypeDefinition

Entity

Entity 是基于类型的具体实现

AtlasEntity
  ├── AtlasEntityExtInfo
  │   ├── AtlasEntitiesWithExtInfo
  │   └── AtlasEntityWithExtInfo
  ├── AtlasEntityStore
  │   └── AtlasEntityStoreV2
  ├── AtlasEntityStream
  │   └── AtlasEntityStreamForImport
  ├── AtlasEntityType
  │   └── AtlasRootEntityType
  └── IAtlasEntityChangeNotifier
      ├── AtlasEntityChangeNotifier
      └── EntityChangeNotifierNop

Attributes

针对模型定义属性

AtlasAttributeDef
      └── AtlasRelationshipAttributeDef

AtlasAttributeDef 属性字段:

private String                   name;
private String                   typeName;
private boolean                  isOptional;
private Cardinality              cardinality;
private int                      valuesMinCount;
private int                      valuesMaxCount;
private boolean                  isUnique;
private boolean                  isIndexable;
private boolean                  includeInNotification;
private String                   defaultValue;
private String                   description;
private int                      searchWeight = DEFAULT_SEARCHWEIGHT;
private IndexType                indexType    = null;
private List<AtlasConstraintDef> constraints;
private Map<String, String>      options;
private String                   displayName;

具体实现:

db:
    "name":        "db",
    "typeName":    "hive_db",
    "isOptional":  false,
    "isIndexable": true,
    "isUnique":    false,
    "cardinality": "SINGLE"
    
    
columns:
    "name":        "columns",
    "typeName":    "array<hive_column>",
    "isOptional":  optional,
    "isIndexable": true,
    “isUnique":    false,
    "constraints": [ { "type": "ownedRef" } ]  
  • isComposite – 是否复合
  • isIndexable – 是否索引
  • isUnique – 是否唯一
  • multiplicity – 指示此属性是(必需的/可选的/还是可以是多值)的

System specific types and their significance

Referenceable

This type represents all entities that can be searched for using a unique attribute called qualifiedName.

  ├── Referenceable
  ├── ReferenceableDeserializer
  ├── ReferenceableSerializer
  └── V1SearchReferenceableSerializer

Hooks

以Hive元信息采集为例分析采集过程:

全量导入

import-hive.sh

"${JAVA_BIN}" ${JAVA_PROPERTIES} -cp "${CP}" 
org.apache.atlas.hive.bridge.HiveMetaStoreBridge $IMPORT_ARGS
importTables
  └── importDatabases        [addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridge.java +295]
      └── importHiveMetadata [addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridge.java +289]

上面是调用过程:

importTables -> importTable –> registerInstances

AtlasEntitiesWithExtInfo ret = null;
EntityMutationResponse   response        = atlasClientV2.createEntities(entities);
List<AtlasEntityHeader>  createdEntities = response.getEntitiesByOperation(EntityMutations.EntityOperation.CREATE);

if (CollectionUtils.isNotEmpty(createdEntities)) {
    ret = new AtlasEntitiesWithExtInfo();

    for (AtlasEntityHeader createdEntity : createdEntities) {
        AtlasEntityWithExtInfo entity = atlasClientV2.getEntityByGuid(createdEntity.getGuid());

        ret.addEntity(entity.getEntity());

        if (MapUtils.isNotEmpty(entity.getReferredEntities())) {
            for (Map.Entry<String, AtlasEntity> entry : entity.getReferredEntities().entrySet()) {
                ret.addReferredEntity(entry.getKey(), entry.getValue());
            }
        }

        LOG.info("Created {} entity: name={}, guid={}", entity.getEntity().getTypeName(), entity.getEntity().getAttribute(ATTRIBUTE_QUALIFIED_NAME), entity.getEntity().getGuid());
    }
}

通过Http Post 的请求将库表数据更新至Atlas

atlasClientV2有很多Http接口

Atlas HTTP 客户端API:

image

实时监听

HiveHook implements ExecuteWithHookContext

ExecuteWithHookContext is a new interface that the Pre/Post Execute Hook can run with the HookContext.

实现run()方法来对Hive 相关事件做处理

Hive相关事件:

BaseHiveEvent
      ├── AlterTableRename
      ├── CreateHiveProcess
      ├── DropDatabase
      ├── DropTable
      ├── CreateDatabase
      │   └── AlterDatabase
      └── CreateTable
          └── AlterTable
              └── AlterTableRenameCol

以create database 为例分析流程:

//处理Hook 上下文信息
AtlasHiveHookContext context = 
new AtlasHiveHookContext(this, oper, hookContext, getKnownObjects(), isSkipTempTables());

//建库事件处理,提取相关库信息
event = new CreateDatabase(context);

if (event != null) {
    final UserGroupInformation ugi = hookContext.getUgi() == null ? Utils.getUGI() : hookContext.getUgi();
    super.notifyEntities(ActiveEntityFilter.apply(event.getNotificationMessages()), ugi);
}


public enum HookNotificationType {
    TYPE_CREATE, TYPE_UPDATE, ENTITY_CREATE, ENTITY_PARTIAL_UPDATE, ENTITY_FULL_UPDATE, ENTITY_DELETE,
    ENTITY_CREATE_V2, ENTITY_PARTIAL_UPDATE_V2, ENTITY_FULL_UPDATE_V2, ENTITY_DELETE_V2
}

//操作用户获取
if (context.isMetastoreHook()) {
    try {
        ugi = SecurityUtils.getUGI();
    } catch (Exception e) {
        //do nothing
    }
} else {
    ret = getHiveUserName();

    if (StringUtils.isEmpty(ret)) {
        ugi = getUgi();
    }
}

if (ugi != null) {
    ret = ugi.getShortUserName();
}

if (StringUtils.isEmpty(ret)) {
    try {
        ret = UserGroupInformation.getCurrentUser().getShortUserName();
    } catch (IOException e) {
        LOG.warn("Failed for UserGroupInformation.getCurrentUser() ", e);

        ret = System.getProperty("user.name");
    }
}

主要:

获取实体信息, 传递Hook message的类型、操作用户

notifyEntities 可以看出其他组件HBase、impala也会调用该方法进行消息的发送

image

public static void notifyEntities(List<HookNotification> messages, UserGroupInformation ugi, int maxRetries) {
    if (executor == null) { // send synchronously
        notifyEntitiesInternal(messages, maxRetries, ugi, notificationInterface, logFailedMessages, failedMessagesLogger);
    } else {
        executor.submit(new Runnable() {
            @Override
            public void run() {
                notifyEntitiesInternal(messages, maxRetries, ugi, notificationInterface, logFailedMessages, failedMessagesLogger);
            }
        });
    }
}

消息通知框架:

NotificationInterface
      ├── AtlasFileSpool
      └── AbstractNotification
          ├── KafkaNotification
          └── Spooler

数据写入Kafka中:

@Override
public void sendInternal(NotificationType notificationType, List<String> messages) throws NotificationException {
    KafkaProducer producer = getOrCreateProducer(notificationType);

    sendInternalToProducer(producer, notificationType, messages);
}

根据NotificationType写入指定topic 中:

private static final Map<NotificationType, String> PRODUCER_TOPIC_MAP = new HashMap<NotificationType, String>() {
    {
        put(NotificationType.HOOK, ATLAS_HOOK_TOPIC);
        put(NotificationType.ENTITIES, ATLAS_ENTITIES_TOPIC);
    }
};

NOTIFICATION_HOOK_TOPIC_NAME("atlas.notification.hook.topic.name", "ATLAS_HOOK"),
NOTIFICATION_ENTITIES_TOPIC_NAME("atlas.notification.entities.topic.name", "ATLAS_ENTITIES"),

数据主要写入两个Topic中: ATLAS_ENTITIES、ATLAS_HOOK

ATLAS_HOOK是写入Hook事件消息, 创建库的事件元数据信息会写入该Topic中

如何唯一确定一个库:

public String getQualifiedName(Database db) {
    return getDatabaseName(db) + QNAME_SEP_METADATA_NAMESPACE + getMetadataNamespace();
}

dbName@clusterName 确定唯一性

外延应用

一个基于Hive hook 实现Impala 元数据刷新的用例:
AutoRefreshImpala:https://github.com/Observe-secretly/AutoRefreshImpala

参考

[1] Apache Atlas – Data Governance and Metadata framework for Hadoop
[2] Apache Atlas 源码

本文作者: chaplinthink, 关注领域:大数据、基础架构、系统设计, 一个热爱学习、分享的大数据工程师

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
转载请注明出处: https://daima100.com/5846.html

(0)
上一篇 2023-04-22
下一篇 2023-04-22

相关推荐

  • Python函数调用的使用方法

    Python函数调用的使用方法Python是一种高级编程语言,它让程序员可以更快地编写代码,并通过函数调用来实现代码的复用。函数是Python中一组语句的集合,可以接受参数,并且可以返回一个结果。在本文中,我们将介绍如何使用Python函数调用。

    2024-07-12
    44
  • Python列表查找

    Python列表查找Python是一种广泛使用的高级编程语言,它广泛应用于Web开发、数据分析、机器学习等领域。Python内置了很多有用的数据类型,其中列表(List)是使用最广泛的一种。Python列表具有动态性和灵活性,可以存储不同类型的数据,并且可以进行复杂的操作。列表查找是Python编程中非常重要的一部分,本文将详细介绍Python中列表查找的各种方法。

    2024-06-17
    42
  • 做一个平台,让对手发来一个很烂的sql,然后系统返回一个优化好的sql(一)「建议收藏」

    做一个平台,让对手发来一个很烂的sql,然后系统返回一个优化好的sql(一)「建议收藏」
    我们要做的事情大概分3步: 1、需求的描述(主要把问题点抛出来) 2、解决如何优化sql性能的问题(hive关于一条sql的生命周期源码的分析),也就是如何…

    2023-04-06
    156
  • 数据库自学笔记(2)-[通俗易懂]

    数据库自学笔记(2)-[通俗易懂]1.HAVING和WHERE: WHERE 和 HAVING 的作用对象不一样。WHERE作用于基本表或视图,挑出满足条件的元组。HAVING作用于组(group),一般配合GROUP BY 使用。

    2023-01-23
    149
  • MySQL8.0 新特性 Hash Join「建议收藏」

    MySQL8.0 新特性 Hash Join「建议收藏」概述&背景 MySQL一直被人诟病没有实现HashJoin,最新发布的8.0.18已经带上了这个功能,令人欣喜。有时候在想,MySQL为什么一直不支持HashJoin呢?我想可能是因为MySQ

    2022-12-23
    176
  • Python中eval函数的作用

    Python中eval函数的作用Python中的eval函数是一个很常用的内置函数,其主要作用是将字符串str当做一个表达式来运行,并返回表达式所代表的值。eval函数的语法如下:

    2024-05-26
    70
  • springboot2集成cas 单点登录_java properties

    springboot2集成cas 单点登录_java propertiesCouchbase 虽然是MongoDB最有潜力的竞争对手,但是其在数据库方面的开源程度还远远不够,很多高级特性都没有开放出来。目前虽然被SpringBoot加持有了一些光环,但是其文档和代码注释方…

    2023-03-24
    162
  • Sql Server 2008 【存储过程】 死锁 查询和杀死[通俗易懂]

    Sql Server 2008 【存储过程】 死锁 查询和杀死[通俗易懂]1 . 使用数据库中,可能出现死锁, 导致程序 无法正常使用. Create procedure [dbo].[sp_who_lock] ( @bKillPID Bit=0 — 0: 查询 1: 结

    2023-01-22
    146

发表回复

您的电子邮箱地址不会被公开。 必填项已用*标注