提交celery任务功能¶
在 v1.0.42 后,本项目提供了 提交celery任务 功能,可以绕过python server直接向celery提交python元素执行任务,避免在调用接口过程中因网络等原因造成的失败,并减少对python server服务端(gunicorn)的资源占用
警告
该功能需python server版本在2.1.0.44以上
使用样例¶
异步执行¶
例如需要在业务代码中,执行 同一应用内 其他Python元素的作业,则可通过如下代码实现:
from deepfos.element.pyscript import PythonScript
# 提供需执行的Python元素信息
script = PythonScript(element_name='test_task', path='/')
# 发送执行任务的请求并获得任务实例
task = script.run_async(parameter={'a': '1'})
此时,celery worker会收到一个执行 test_task 元素脚本的任务请求,脚本入参为 {‘a’: ‘1’}
对于 run_async 返回的任务实例,可以使用 get_result 方法得到执行结果
print(task.get_result())
get_result 方法可以反复执行,且仅涉及当前任务实例结果的获取,不会产生新的任务
已有的非结束状态的任务实例可以使用 terminate 方法结束
task.terminate()
同步执行¶
如需同步阻塞地一次性执行Python元素作业,亦可使用 run 方法,该方法将发送任务并等待执行结果,返回值即为任务的执行结果
from deepfos.element.pyscript import PythonScript
# 提供需执行的Python元素信息
script = PythonScript(element_name='test_task', path='/')
# 发送执行任务的请求并获得任务返回
print(script.run(parameter={'a': '1'}))
执行情况¶
在获取执行结果( run 或 get_result )时,以如下列举情况处理:
情况描述 |
效果 |
---|---|
执行正常结束 |
返回脚本的return值 |
执行期间被terminate |
抛出 PyTaskRevokedError, 错误信息:python脚本被中断执行 |
执行期间发生错误(包括worker发生OOM时被系统kill) |
抛出 PyTaskRunTimeError, 错误信息:python脚本执行时发生错误,错误信息: …. |
调用获取执行结果时提供了timeout,且等待执行超时 |
抛出 ResultTimeOutError, 错误信息:等待python执行结果超时 |
在celery中无法找到任务信息 |
抛出 PyTaskInvalidError, 错误信息:无效的python任务ID |
线上执行与本地执行的区别¶
线上执行与本地执行在方法使用上没有差别,在 element/pyscript.py 内会自行判断执行的环境是否支持直接向celery发送任务信息
如不支持,例如在本地执行时,则会改为:
调用python server的 /script/run 接口实现任务的发送
在同步调用或获取结果时,使用 /script/result 接口
在取消任务时使用 /script/terminate 接口
PythonScript配置项¶
PythonScript 类用于配置python脚本,支持如下配置项
参数名称 |
参数说明 |
是否必须 |
数据类型 |
---|---|---|---|
element_name |
元素名 |
是 |
string |
folder_id |
元素所在的文件夹id |
与path二选一 |
string |
path |
元素所在的文件夹绝对路径 |
与folder_id二选一 |
string |
task_name |
任务名称 |
否 |
string |
should_log |
仅在线上执行时有效, 脚本是否记录执行日志, 线下执行时,该值与脚本元素 配置项“记录执行日志”保持一致 |
否 |
bool |