高性能图计算系统 Plato 在 Nebula Graph 中的实践

高性能图计算系统 Plato 在 Nebula Graph 中的实践本文首发于 Nebula Graph Community 公众号 1.图计算介绍 1.1 图数据库 vs 图计算 图数据库是面向 OLTP 场景,强调增删改查,并且一个查询往往只涉及到全图中的少量数据

高性能图计算系统 Plato 在 Nebula Graph 中的实践

本文首发于 Nebula Graph Community 公众号

高性能图计算系统 Plato 在 Nebula Graph 中的实践

1.图计算介绍

1.1 图数据库 vs 图计算

图数据库是面向 OLTP 场景,强调增删改查,并且一个查询往往只涉及到全图中的少量数据,而图计算是面向 OLAP 场景,往往是针对全图数据进行分析计算。

1.2 图计算系统分布架构

按照分布架构,图计算系统分为单机和分布式。

单机图计算系统优势在于模型简单,无需考虑分布式通讯,也无需进行图切分,但受制于单机系统资源,无法进行更大规模的图数据分析。

分布式图计算平台将图数据划分到多个机器上,从而处理更大规模的图数据,但不可避免的引入了分布式通讯的开销问题。

1.3 图的划分

图划分主要有两种方式边切割(Edge- Cut)和点切割(Vertex-Cut)。

高性能图计算系统 Plato 在 Nebula Graph 中的实践

边分割:每个点的数据只会存储在一台机器上,但有的边会被打断分到多台机器上。
如图(a)所示,点 A 的数据只存放在机器 1 上,点B 的数据只存放在机器 2 上。对于边 AB 而言,会存储在机器 1 和机器 2 上。由于点 A 和点 B 分布在不同的机器上,在迭代计算过程中,会带来通讯上的开销。

点分割:每条边只会存储在一台机器上,但有的点有可能分割,分配在多台机器上。
如图(b)所示, 边 AB 存储在机器 1 上,边 BC 存储在机器 2 上,边 CD 存储在机器 3 上,而点 B 被分配到了 1, 2 两台机器上,点 C 被分配到了 2,3 两台机器上。由于点被存储在多台机器上,维护顶点数据的一致性同样也会带来通讯上的开销。

1.4 计算模型

编程模型是针对图计算应用开发者,可分为以节点为中心的编程模型、以边或路径为中心的编程模型、以子图为中心的编程模型。

计算模型是图计算系统开发者面临的问题,主要有同步执行模型和异步执行模型。比较常见的有 BSP 模型(Bulk Synchronous Parallel Computing Model)和 GAS 模型。

BSP 模型:BSP 模型的计算过程是由一系列的迭代步组成,每个迭代步被称为超步。采用 BSP 模型的系统主要有 Pregel、Hama、Giraph 等。
BSP 模型具有水平和垂直两个方面的结构。垂直上看,BSP 模型有一系列串行的超步组成。水平上看(如图所示),一个超步又分三个阶段:

  • 本地计算阶段,每个处理器只对存储本地内存中的数据进行计算。
  • 全局通信阶段,机器节点之间相互交换数据。
  • 栅栏同步阶段,等待所有通信行为的结束。

高性能图计算系统 Plato 在 Nebula Graph 中的实践

GAS 模型:GAS 模型是在 PowerGraph 系统提出,分为信息收集阶段(Gather)、应用阶段(Apply)和分发阶段(Scatter)。

  • Gather 阶段,负责从邻居顶点收集信息。
  • Apply 阶段,负责将收集的信息在本地处理,更新到顶点上。
  • Scatter 阶段,负责发送新的信息给邻居顶点。

2. Gemini 图计算系统介绍

Gemini 在工业界较有影响力,它的主要技术点包括:CSR/CSC、push/pull、master 和 mirror、稀疏和稠密图、通信与计算协同工作、chunk-based 式分区、NUMA 感知的子分区等。

Gemini 采用边切割方式将图数据按照 chunk-based 的方式分区,并支持 Numa 结构。分区后的数据,用 CSR 存储出边信息,用 CSC 存储入边信息。在迭代计算过程中,对稀疏图采用 push 的方式更新其出边邻居,对稠密图采用 pull 的方式拉取入边邻居的信息。

如果一条边被切割,边的一端顶点为 master,另一端顶点则为 mirror。mirror 被称为占位符(placeholder) ,在 pull 的计算过程中,各个机器上的 mirror 顶点会拉取其入边邻居 master 顶点的信息进行一次计算,在 BSP 的计算模型下通过网络同步给其 master 顶点。在 push 的计算过程中,各个机器的 master 顶点会将其信息先同步给它的 mirror 顶点,再由 mirror 更新其出边邻居。

在 BSP 的通信阶段,每台机器 Node_i 发送给它的下一个机器 Node_i+1,最后一个机器会发送给第一个机器。在每台机器发送的同时也会收到 Node_i-1 的信息,收到信息后会立即执行本地计算。通讯和计算的重叠可以隐藏通信时间,提升整体的效率。

更多细节可以参考论文《Gemini: A Computation-Centric Distributed Graph Processing System》。

3. Plato 图计算系统与 Nebula Graph 的集成

3.1 Plato 图计算系统介绍

Plato 是腾讯开源的基于 Gemni 论文实现的工业级图计算系统。Plato 可运行在通用的 x86 集群,如 Kubernetes 集群、Yarn 集群等。在文件系统层面,Plato 提供了多种接口支持主流的文件系统,如 HDFS、Ceph 等等。

高性能图计算系统 Plato 在 Nebula Graph 中的实践

3.2 与 Nebula Graph 的集成

我们基于 Plato 做了二次开发,以接入 Nebula Graph 数据源。

3.2.1 Nebula Graph 作为输入和输出数据源

增加 Plato 的数据源,支持将 Nebula Graph 作为输入和输出数据源,直接从 Nebula Graph 中读取数据进行图计算,并将计算结果直接写回到 Nebula Graph 中。

Nebula Graph 的存储层提供了针对 partition 的 scan 接口,很容易通过该接口批量扫出顶点和边数据:

ScanEdgeIter scanEdgeWithPart(std::string spaceName,
                                  int32_t partID,
                                  std::string edgeName,
                                  std::vector<std::string> propNames,
                                  int64_t limit = DEFAULT_LIMIT,
                                  int64_t startTime = DEFAULT_START_TIME,
                                  int64_t endTime = DEFAULT_END_TIME,
                                  std::string filter = "",
                                  bool onlyLatestVersion = false,
                                  bool enableReadFromFollower = true);

ScanVertexIter scanVertexWithPart(std::string spaceName,
                                      int32_t partId,
                                      std::string tagName,
                                      std::vector<std::string> propNames,
                                      int64_t limit = DEFAULT_LIMIT,
                                      int64_t startTime = DEFAULT_START_TIME,
                                      int64_t endTime = DEFAULT_END_TIME,
                                      std::string filter = "",
                                      bool onlyLatestVersion = false,
                                      bool enableReadFromFollower = true);

实践中,我们首先获取指定 space 下的 partition 分布情况,并将每个 partition 的 scan 任务分别分配给 Plato 集群的各个节点上,每个节点再进一步将 partition 的 scan 任务分配给运行在该节点的各个线程上,以达到并行快速的读取数据。图计算完成之后,将计算结果通过 Nebula client 并行写入 Nebula Graph。

3.2.2 分布式 ID 编码器

Gemini 和 Plato 的要求顶点 ID 从 0 开始连续递增,但绝大多数的真实数据顶点 ID 并不满足这个需求,尤其是 Nebula Graph 从 2.0 版本开始支持 string 类型 ID。

因此,在计算之前,我们需要将原始的 ID 从 int 或 string 类型转换为从 0 开始连续递增的 int。Plato 内部实现了一个单机版的 ID 编码器,即 Plato 集群的每台机器均冗余存储所有 ID 的映射关系。当点的数量比较多时,每台机器仅 ID 映射表的存储就需上百 GB 的内存,因为我们需要实现分布式的 ID 映射器,将 ID 映射关系切成多份,分开存储。

我们通过哈希将原始 ID 打散在不同的机器,并行地分配全局从 0 开始连续递增的 ID。生成 ID 映射关系后,每台机器都会存有 ID 映射表的一部分。随后再将边数据分别按起点和终点哈希,发送到对应的机器进行编码,最终得到的数据即为可用于计算的数据。当计算运行结束后,需要数据需要映射回业务 ID,其过程和上述也是类似的。

3.2.3 补充算法

我们在 Plato 的基础上增加了 sssp、apsp、jaccard similarity、三角计数等算法,并为每个算法增加了输入和输出到 Nebula Graph 数据源的支持。目前支持的算法有:

文件名 算法名称 分类
apsp.cc 全对最短路径 路径
sssp.cc 单源最短路径 路径
tree_stat.cc 树深度/宽度 图特征
nstepdegrees.cc n阶度 图特征
hyperanf.cc 图平均距离估算 图特征
triangle_count.cc 三角计数 图特征
kcore.cc 节点中心性
pagerank.cc Pagerank 节点中心性
bnc.cc Betweenness 节点中心性
cnc.cc 接近中心性(Closeness Centrality) 节点中心性
cgm.cc 连通分量计算 社区发现
lpa.cc 标签传播 社区发现
hanp.cc HANP 社区发现
metapath_randomwalk.cc 图表示学习
node2vec_randomwalk.cc 图表示学习
fast_unfolding.cc louvain 聚类
infomap_simple.cc 聚类
jaccard_similarity.cc 相似度
mutual.cc 其他
torch.cc 其他
bfs.cc 广度优先遍历 其他

4. Plato 部署安装与运行

4.1 集群部署

Plato 采用 MPI 进行进程间通信,在集群上部署 Plato 时,需要将 Plato 安装在相同的目录下,或者使用 NFS。操作方法见:https://mpitutorial.com/tutorials/running-an-mpi-cluster-within-a-lan/

4.2 运行算法的脚本及配置文件

scripts/run_pagerank_local.sh

#!/bin/bash

PROJECT="$(cd "$(dirname "$0")" && pwd)/.."

MAIN="./bazel-bin/example/pagerank" # process name

WNUM=3
WCORES=8

#INPUT=${INPUT:="$PROJECT/data/graph/v100_e2150_ua_c3.csv"}
INPUT=${INPUT:="nebula:${PROJECT}/scripts/nebula.conf"}
#OUTPUT=${OUTPUT:="hdfs://192.168.8.149:9000/_test/output"}
OUTPUT=${OUTPUT:="nebula:$PROJECT/scripts/nebula.conf"}
IS_DIRECTED=${IS_DIRECTED:=true}  # let plato auto add reversed edge or not
NEED_ENCODE=${NEED_ENCODE:=true}
VTYPE=${VTYPE:=uint32}

ALPHA=-1
PART_BY_IN=false

EPS=${EPS:=0.0001}
DAMPING=${DAMPING:=0.8}
ITERATIONS=${ITERATIONS:=5}

export MPIRUN_CMD=${MPIRUN_CMD:="${PROJECT}/3rd/mpich-3.2.1/bin/mpiexec.hydra"}

PARAMS+=" --threads ${WCORES}"
PARAMS+=" --input ${INPUT} --output ${OUTPUT} --is_directed=${IS_DIRECTED} --need_encode=${NEED_ENCODE} --vtype=${VTYPE}"
PARAMS+=" --iterations ${ITERATIONS} --eps ${EPS} --damping ${DAMPING}"

# env for JAVA && HADOOP
export LD_LIBRARY_PATH=${JAVA_HOME}/jre/lib/amd64/server:${LD_LIBRARY_PATH}

# env for hadoop
export CLASSPATH=${HADOOP_HOME}/etc/hadoop:`find ${HADOOP_HOME}/share/hadoop/ | awk "{path=path":"$0}END{print path}"`
export LD_LIBRARY_PATH="${HADOOP_HOME}/lib/native":${LD_LIBRARY_PATH}

chmod 777 ./${MAIN}
${MPIRUN_CMD} -n ${WNUM} -f ${PROJECT}/scripts/cluster ./${MAIN} ${PARAMS}
exit $?

参数说明

  • INPUT 参数和 OUPUT 参数分别指定算法的输入数据源和输出数据源,目前支持本地 csv 文件、HDFS文件、 Nebula Graph。当输入输出数据源为 Nebula Graph 时,INPUTOUPUT 形式为 nebula:/path/to/nebula.conf
  • WNUM 为集群所有机器所运行的进程数之和,推荐每台机器运行为 1 或者 NUMA node 数个进程,WCORE 为每个进程的线程数,推荐最大设置为机器的硬件线程数。

scripts/nebula.conf

## read/write
--retry=3 # 连接 Nebula Graph 时的重试次数
--space=sf30 # 要读取或写入的 space 名称

## read from nebula
--meta_server_addrs=192.168.8.94:9559 # Nebula Graph 的 metad 服务地址
--edge=LIKES # 要读取的边的名称
#--edge_data_field # 要读取的作为边的权重属性的名称
--read_batch_size=10000 # 每次 scan 时的 batch 的大小

## write to nebula
--graph_server_addrs=192.168.8.94:9669 # Nebula Graph 的 graphd 服务地址
--user=root # graphd 服务的登陆用户名
--password=nebula # graphd 服务的登陆密码
# insert or update
--mode=insert # 写回 Nebula Graph 时采用的模式: insert/update
--tag=pagerank # 写回到 Nebula Graph 的 tag 名称
--prop=pr # 写回到 Nebula Graph 的 tag 对应的属性名称
--type=double # 写回到 Nebula Graph 的 tag 对应的属性的类型
--write_batch_size=1000 # 写回时的 batch 大小
--err_file=/home/plato/err.txt # 写回失败的数据所存储的文件

scripts/cluster

cluster 文件指定要运行该算法所在的集群机器的 IP

192.168.15.3
192.168.15.5
192.168.15.6

以上为 Plato 在 Nebula Graph 中的应用,目前该功能集成在 Nebula Graph 企业版中,如果你使用的是开源版本的 Nebula Graph,需按照自己的需求自己对接 Plato。


交流图数据库技术?加入 Nebula 交流群请先填写下你的 Nebula 名片,Nebula 小助手会拉你进群~~

关注公众号

Nebula Graph:一个开源的分布式图数据库

原文地址:https://www.cnblogs.com/nebulagraph/archive/2022/03/02/15953546.html

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
转载请注明出处: https://daima100.com/5476.html

(0)
上一篇 2023-05-07
下一篇 2023-05-07

相关推荐

  • Python String长度函数:计算字符串长度

    Python String长度函数:计算字符串长度在Python中,字符串是一种很常用的数据类型,它是由一系列字符组成的序列。Python给我们提供了一个内置函数len(),用于计算字符串的长度。根据官方文档的介绍:len(s)函数返回对象(字符、字节、列表、字典等)的长度或元素个数。在本文中,我们聚焦在字符串上。

    2024-04-12
    82
  • python不存在并堆栈溢出(python会不会溢出)

    python不存在并堆栈溢出(python会不会溢出)stack overflow是堆栈溢出。堆栈溢出的产生是由于过多的函数调用,导致调用堆栈无法容纳这些调用的返回地址,一般在递归中产生。堆栈溢出很可能由无限递归(Infinite recursion)产生,但也可能仅仅是过多的堆栈层级。请对应检查一下。

    2023-12-03
    125
  • Python安装tar.gz教程

    Python安装tar.gz教程对于一些在Linux系统下进行Python开发的人来说,安装Python tar.gz是一个非常常见的任务。Python tar.gz是源代码压缩包,通常有一些优点,比如自由度更高、安装更灵活、可以自己手动编译。在这篇文章中,我将提供一些简单且易于跟随的步骤,以便让读者可以轻松地完成Python tar.gz的安装。

    2024-06-28
    58
  • SSL加密_ssl安全错误 sql

    SSL加密_ssl安全错误 sqlMsSQL使用加密连接SSL/TLS 说明 应用程序通过未加密的通道与数据库服务器通信, 这可能会造成重大的安全风险。在这种情况下, 攻击者可以修改用户输入的数据, 甚至对数据库服务器执行任意 SQL

    2022-12-26
    151
  • Python Coursepoint Plus: 为你的编程技能升级提供一站式解决方案

    Python Coursepoint Plus: 为你的编程技能升级提供一站式解决方案随着信息技术的发展,编程已经成为一个非常重要的技能。编程不仅在IT行业中得到广泛应用,而且在其他行业中也已经变得非常重要。学习编程不仅可以提升个人技能,而且可以帮助人们更好地理解和掌握计算机科学,这对未来的职业和事业发展都是非常有帮助的。

    2023-12-05
    107
  • Python CGI程序:动态生成网页内容

    Python CGI程序:动态生成网页内容
    CGI的全称是Common Gateway Interface,也就是通用网关接口。它是连接Web服务器和CGI脚本(或程序)的接口标准。通过CGI,Web服务器可以将请求连接到后台程序,并且将程序的输出返回到客户端浏览器,实现了Dynamically Generating Web Pages的效果。目前,CGI已经被一些新的更优秀的解决方案所替代,但是CGI仍然是编写交互式Web应用程序的基础技术。

    2024-03-05
    77
  • centos7+ mysql5.7 升级到mysql8+「终于解决」

    centos7+ mysql5.7 升级到mysql8+「终于解决」参考网址吧。 按如下网址提供的实操步骤,顺利完成了mysql5.6的卸载和mysql8.20的安装。 注意: 1、在初始化指令步骤,会自动生成一个数据库root密码。注意先保存下来。防止无法登录。 …

    2023-02-26
    151
  • mysql怎么做主从复制_MySQL主从同步

    mysql怎么做主从复制_MySQL主从同步主从复制原理 Mysql 中有一个binlog 二进制日志,这个日志会记录下所有修改了的SQL 语句,从服务器把主服务器上的binlog二进制日志在指定的位置开始复制主服务器所进行修改的语句到从服务器

    2023-02-17
    143

发表回复

您的电子邮箱地址不会被公开。 必填项已用*标注