DeepPipeline元素

前言

DeepPipeline 是一个基于 Dagster 的数据流处理框架,提供了完整的异步和同步执行能力。本文档详细介绍了 DeepPipeline 类的公共接口。

运行数据流

构建了一个test_02的数据流(生成模拟订单数据,整理成数据集形式输出每个订单号和金额)

e3a3c095599947ac9bbc12feda2d6096

1.获取run_id(run_async)和(run_batch)

参考接口文档中的异步启动数据流和批量异步启动数据流获取执行id或者执行id列表。

以异步启动数据流(run_async)为例进行接口使用说明。

参数说明: - parameter (Any) – 执行参数,默认为空对象 - in_process (bool) – 是否在同一个进程执行,默认为 False

返回值: - 执行ID

使用示例:

[6]:
from deepfos.element.deep_pipeline import DeepPipeline

# 创建DeepPipeline实例
pipeline = DeepPipeline("test_02")

# 异步启动数据流,获取run_id
run_id = pipeline.run_async(
    parameter={"key": "value"},
    in_process=False
)
run_id
[6]:
'a89f3e9d-d1bf-4baf-a53a-8bf0c6259e55'

通过接口调用test_02之后返回的run_id与监控中生成的运行实例ID一致

79978b82244b4807837829d7d508c172 1ba8d4a546f84d0988685de5a90c5ec6

2.通过run_id获取执行结果(result)

参考接口文档中的获取异步执行结果,可以通过run_id来获取执行结果。

获取异步执行结果(result)接口使用说明。

参数说明: - run_id (str) – 执行ID(由 run_async 或 run_batch 返回) - timeout (Optional[int]) – 超时时间(秒),可选

返回值: - 执行结果

使用示例:

[7]:
from deepfos.element.deep_pipeline import DeepPipeline

# 创建DeepPipeline实例
pipeline = DeepPipeline("test_02")

# 输入获取的run_id
run_id = '1b8fbcfe-12d6-49db-bed7-3f58a4e3683f'

# 通过run_id获取异步执行结果
result = pipeline.result(
    run_id=run_id,           # 执行ID
    # 可以根据实际数据流的复杂程度调整timeout值,建议从60秒开始测试,逐步调整到合适数值。
    # 若是简单的数据流可以不需要此参数
    # timeout=60               # 超时时间(秒)
)
print(f"执行结果: {result}")
执行结果: {'状态': '执行成功', '数据量': 5, '结果集': [{'订单号': 'ORD001', '金额': 908}, {'订单号': 'ORD002', '金额': 546}, {'订单号': 'ORD003', '金额': 590}, {'订单号': 'ORD004', '金额': 960}, {'订单号': 'ORD005', '金额': 172}]}

通过run_async接口获取的id或者在监控中任意选择一运行实例id,通过result接口获取的执行结果和在监控中查看的执行结果一致。

45b3bda547364a849744f8d17c37136d f0a1c3f5c92a4b9996a59a526b32f517

3.同步直接获取执行结果

参考接口文档中的同步启动数据流,可以直接运行数据流获取执行结果。

同步启动数据流(run)接口使用说明。

参数说明: - parameter (Any) – 执行参数,默认为空对象 - timeout (Optional[int]) – 超时时间(秒),可选 - in_process (bool) – 是否在同一个进程执行,默认为 True

返回值: - 执行结果

使用示例:

[5]:
from deepfos.element.deep_pipeline import DeepPipeline

# 创建DeepPipeline实例
pipeline = DeepPipeline("test_02")

# 同步启动数据流
result = pipeline.run(
    parameter={"key": "value"},  # 执行参数
    in_process=True              # 是否在同一个进程执行
    # 可以根据实际数据流的复杂程度调整timeout值,建议从60秒开始测试,逐步调整到合适数值。
    # 若是简单的数据流可以不需要此参数
    # timeout=60               # 超时时间(秒)
)
print(f"执行结果: {result}")
执行结果: {'状态': '执行成功', '数据量': 5, '结果集': [{'订单号': 'ORD001', '金额': 900}, {'订单号': 'ORD002', '金额': 402}, {'订单号': 'ORD003', '金额': 613}, {'订单号': 'ORD004', '金额': 188}, {'订单号': 'ORD005', '金额': 905}]}