others linux服务器运维 django3 监控 k8s golang 数据库 大数据 前端 devops 理论基础 java oracle 运维日志

sqllineage解析sql列级血缘并提交到datahub

访问量:2335 创建时间:2023-04-14

文档引用:https://blog.csdn.net/LCriska/article/details/129329492

from sqllineage.runner import LineageRunner

import datahub.emitter.mce_builder as builder
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.emitter.rest_emitter import DatahubRestEmitter
from datahub.metadata.com.linkedin.pegasus2avro.dataset import (
    DatasetLineageType,
    FineGrainedLineage,
    FineGrainedLineageDownstreamType,
    FineGrainedLineageUpstreamType,
    Upstream,
    UpstreamLineage,
)
import sys

'''
    解析目标sql文件的HiveSQL生成列级血缘,提交到datahub
    sql文件路径作为命令行参数传入脚本
    提交到datahub的platform = hive
'''


# 库名设置
def datasetUrn(tableName):
    return builder.make_dataset_urn("hive", tableName)  # platform = hive


# 表、列级信息设置
def fieldUrn(tableName, fieldName):
    return builder.make_schema_field_urn(datasetUrn(tableName), fieldName)


# 目标sql文件路径
sqlFilePath = sys.argv[1]

sqlFile = open(sqlFilePath, mode='r', encoding='utf-8')

sql = sqlFile.read().__str__()

# 获取sql血缘
result = LineageRunner(sql)

# 获取sql中的下游表名
targetTableName = result.target_tables[0].__str__()

print(result)

print('===============')

# 打印列级血缘结果
result.print_column_lineage()

print('===============')

# 获取列级血缘
lineage = result.get_column_lineage

# 字段级血缘list
fineGrainedLineageList = []

# 用于冲突检查的上游list
upStreamsList = []

# 遍历列级血缘
for columnTuples in lineage():
    # 上游list
    upStreamStrList = []

    # 下游list
    downStreamStrList = []

    # 逐个字段遍历
    for column in columnTuples:

        # 元组中最后一个元素为下游表名与字段名,其他元素为上游表名与字段名

        # 遍历到最后一个元素,为下游表名与字段名
        if columnTuples.index(column) == len(columnTuples) - 1:
            downStreamFieldName = column.raw_name.__str__()
            downStreamTableName = column.__str__().replace('.' + downStreamFieldName, '').__str__()

            # print('下游表名:' + downStreamTableName)
            # print('下游字段名:' + downStreamFieldName)

            downStreamStrList.append(fieldUrn(downStreamTableName, downStreamFieldName))
        else:
            upStreamFieldName = column.raw_name.__str__()
            upStreamTableName = column.__str__().replace('.' + upStreamFieldName, '').__str__()

            # print('上游表名:' + upStreamTableName)
            # print('上游字段名:' + upStreamFieldName)

            upStreamStrList.append(fieldUrn(upStreamTableName, upStreamFieldName))

            # 用于检查上游血缘是否冲突
            upStreamsList.append(Upstream(dataset=datasetUrn(upStreamTableName), type=DatasetLineageType.TRANSFORMED))

    fineGrainedLineage = FineGrainedLineage(upstreamType=FineGrainedLineageUpstreamType.DATASET,
                                            upstreams=upStreamStrList,
                                            downstreamType=FineGrainedLineageDownstreamType.FIELD_SET,
                                            downstreams=downStreamStrList)

    fineGrainedLineageList.append(fineGrainedLineage)

fieldLineages = UpstreamLineage(
    upstreams=upStreamsList, fineGrainedLineages=fineGrainedLineageList
)

lineageMcp = MetadataChangeProposalWrapper(
    entityUrn=datasetUrn(targetTableName),  # 下游表名
    aspect=fieldLineages
)

# 调用datahub REST API
emitter = DatahubRestEmitter('http://datahub-gms:8080') # datahub gms server

# Emit metadata!
emitter.emit_mcp(lineageMcp)

执行脚本,然后查看datahub 的ui

python3 sql-lineage-to-datahub.py target.sql
登陆评论: 使用GITHUB登陆