WorkFlow¶
- class deepfos.element.workflow.WorkFlow(element_name: str, folder_id: str = None, path: str = None, server_name: str = None)¶
工作流元素
提供流程启动、消息发送、实例和任务查询等常见工作流操作。
快速开始
from deepfos.element.workflow import WorkFlow, StatusType wf = WorkFlow('my_workflow') # 启动流程 wf.launch(launch_params={"year": "2024", "period": "3"}) # 广播消息 wf.broadcast(msg_code='MSG_001', msg_body={"key": "value"}) # 查询进行中的实例 processes = wf.list_process(status=StatusType.in_progress) # 查询当前用户的待办任务 tasks = wf.list_claim_task()
- 参数
element_name – 元素名
folder_id – 元素所在的文件夹id
path – 元素所在的文件夹绝对路径
提示
如果不提供folder_id和path,将会使用元素名和元素类型进行全局搜索。 如果找到 唯一匹配 的元素,那么一切正常,否则将会报错。
方法
async_check_exist(ele_name[, ele_type, …])异步查询元素是否存在
batch_launch_process([params])批量启动流程
broadcast(msg_code, msg_body)广播发送消息至所有订阅者
check_exist(ele_name[, ele_type, folder, …])查询元素是否存在
complete_task_by_id(task_id[, comment, …])完成任务实例
get_process_by_business_key(business_key[, …])通过业务键查询流程实例
get_process_by_param(param[, version])通过启动参数查询流程实例
get_task_by_business_key(business_key[, …])通过业务键查询任务实例
get_task_by_param(param[, version])通过启动参数查询任务实例
launch_process([param, file_path, comment])启动流程
list_claim_task([as_dataframe])查看待认领任务
list_my_task([status, as_dataframe])查看我的任务
list_process([status, as_dataframe])查看流程
send_msg_to_processes(msg_code, msg_body, …)发送消息至流程实例列表
wait_for(attr)异步等待成员变量
属性
所有版本信息
同步 API 对象
异步 API 对象
元素信息
元素类型
全局变量
启动参数
消息列表