提交celery任务功能

在 v1.0.42 后,本项目提供了 提交celery任务 功能,可以绕过python server直接向celery提交python元素执行任务,避免在调用接口过程中因网络等原因造成的失败,并减少对python server服务端(gunicorn)的资源占用

警告

该功能需python server版本在2.1.0.44以上

使用样例

异步执行

例如需要在业务代码中,执行 同一应用内 其他Python元素的作业,则可通过如下代码实现:

from deepfos.element.pyscript import PythonScript

# 提供需执行的Python元素信息
script = PythonScript(element_name='test_task', path='/')

# 发送执行任务的请求并获得任务实例
task = script.run_async(parameter={'a': '1'})

此时,celery worker会收到一个执行 test_task 元素脚本的任务请求,脚本入参为 {‘a’: ‘1’}

对于 run_async 返回的任务实例,可以使用 get_result 方法得到执行结果

print(task.get_result())

get_result 方法可以反复执行,且仅涉及当前任务实例结果的获取,不会产生新的任务

已有的非结束状态的任务实例可以使用 terminate 方法结束

task.terminate()

同步执行

如需同步阻塞地一次性执行Python元素作业,亦可使用 run 方法,该方法将发送任务并等待执行结果,返回值即为任务的执行结果

from deepfos.element.pyscript import PythonScript

# 提供需执行的Python元素信息
script = PythonScript(element_name='test_task', path='/')

# 发送执行任务的请求并获得任务返回
print(script.run(parameter={'a': '1'}))

执行情况

在获取执行结果( runget_result )时,以如下列举情况处理:

情况描述

效果

执行正常结束

返回脚本的return值

执行期间被terminate

抛出 PyTaskRevokedError, 错误信息:python脚本被中断执行

执行期间发生错误(包括worker发生OOM时被系统kill)

抛出 PyTaskRunTimeError, 错误信息:python脚本执行时发生错误,错误信息: ….

调用获取执行结果时提供了timeout,且等待执行超时

抛出 ResultTimeOutError, 错误信息:等待python执行结果超时

在celery中无法找到任务信息

抛出 PyTaskInvalidError, 错误信息:无效的python任务ID

线上执行与本地执行的区别

线上执行与本地执行在方法使用上没有差别,在 element/pyscript.py 内会自行判断执行的环境是否支持直接向celery发送任务信息

如不支持,例如在本地执行时,则会改为:

  • 调用python server的 /script/run 接口实现任务的发送

  • 在同步调用或获取结果时,使用 /script/result 接口

  • 在取消任务时使用 /script/terminate 接口

PythonScript配置项

PythonScript 类用于配置python脚本,支持如下配置项

参数名称

参数说明

是否必须

数据类型

element_name

元素名

string

folder_id

元素所在的文件夹id

与path二选一

string

path

元素所在的文件夹绝对路径

与folder_id二选一

string

task_name

任务名称

string

should_log

仅在线上执行时有效, 脚本是否记录执行日志, 线下执行时,该值与脚本元素 配置项“记录执行日志”保持一致

bool