并发编程¶
在 v1.1 中,本项目引入了若干新特性,将使调用 element 和 db 内异步方法的并发编程变得更为容易。
本文将以 element 内方法的调用为例,就如下三种写法介绍在本项目中的并发编程:
基于多线程的并发编程
基于协程的并发编程
元素类提供的协程并发特性
基于多线程的并发编程¶
deepfos项目初始(v1.0.0)即在 deepfos.lib.concurrency
中提供了线程池类 ThreadCtxExecutor
可以通过如下写法将有 I/O 开销的 element 内同步方法放入线程池中,达到并发的效果:
from deepfos.lib.concurrency import ThreadCtxExecutor
from deepfos.element.datatable import DataTableMySQL
test_db = DataTableMySQL('test')
with ThreadCtxExecutor(max_workers=3) as exc:
futures = [
exc.submit(test_db.delete, where=(test_db.table.id == 1)),
exc.submit(test_db.delete, where=(test_db.table.id == 2)),
exc.submit(test_db.delete, where=(test_db.table.id == 3)),
]
如需获得返回,可通过调用 futures
中任务的 result
来获取:
from deepfos.lib.concurrency import ThreadCtxExecutor
from deepfos.element.datatable import DataTableMySQL
test_db = DataTableMySQL('test')
results = []
with ThreadCtxExecutor(max_workers=3) as exc:
futures = [
exc.submit(test_db.delete, where=(test_db.table.id == 1)),
exc.submit(test_db.delete, where=(test_db.table.id == 2)),
exc.submit(test_db.delete, where=(test_db.table.id == 3)),
]
for f in as_completed(futures):
results.append(f.result())
ThreadCtxExecutor
类本身继承自 ThreadPoolExecutor
,用法与其一致,具体用法可参考:
https://docs.python.org/3/library/concurrent.futures.html?concurrent.futures.ThreadPoolExecutor#threadpoolexecutor
基于协程的并发编程¶
deepfos项目中所有 deepfos.api
内封装的接口方法,皆为通过发送http异步请求的方式实现。
协程相较于线程,开销更小,切换更快,执行顺序明确,因此在本项目的中并发编程中,更为推荐使用。
基于协程的并发调用 element 内方法示例:
import asyncio
from deepfos.element.datatable import AsyncDataTableMySQL
async def batch_delete():
test_db = AsyncDataTableMySQL('test')
results = await asyncio.gather(
test_db.delete(where=(test_db.table.id == 1)),
test_db.delete(where=(test_db.table.id == 2)),
test_db.delete(where=(test_db.table.id == 3)),
)
return results
asyncio.run(batch_delete())
所有 element 和 db 内都有异步实现,名称以 Async
为开头与同步类做区分
其中异步方法的定义,可参考示例如下:
async def delete(
self,
where: Union[str, Term, EmptyCriterion],
) -> CustomSqlRespDTO:
在异步类中,涉及接口请求的方法,皆为异步方法,因此需要以异步编程的方式调用,即使用 async/await
语法。
关于异步编程,具体可参考: https://docs.python.org/3/library/asyncio.html
元素类提供的协程并发特性¶
使用异步编程固然可以得到较高的并发收益,但是由于异步方法编写具有的传染性,对于已有项目的异步重构可能是成本巨大的。
对此,在 v1.1 中,deepfos提供了如下配置项:
OPTION.general.parallel_mode: 是否开启并发模式
该配置项默认关闭,开启后,可以在调用 element 和 db 模块中有异步实现的同步方法时, 将调用的行为封装为一种代理类对象,在该代理类对象中, 对方法的同步调用,将变为把对其的异步调用放入协程中, 只有在使用方法的返回或程序退出前,会获取异步方法的返回结果或等待其执行结束。
理想情况下,启用该配置项后,多处对有异步实现的同步方法的调用在结果被使用或程序退出前, 其异步方法是并发执行的,从而达到更高的执行效率,且该过程在编程中是无感的,适用于对已有代码进行并发化改造。
基于该配置项的并发编程示例:
from deepfos.options import OPTION
from deepfos.element.datatable import DataTableMySQL
OPTION.general.parallel_mode = True
test_db = DataTableMySQL(element_name='test')
test_db.delete(where=(test_db.table.id == 1))
test_db.delete(where=(test_db.table.id == 2))
test_db.delete(where=(test_db.table.id == 3))
在程序退出前,3次对 delete
方法的调用将在协程内并发执行。
警告
并发执行意味着执行顺序是不固定的,如果有对执行顺序的要求,可参考 compute的用法
对于需要使用返回值进一步操作的情况,可参考如下:
import pandas as pd
from deepfos.options import OPTION
from deepfos.element.datatable import DataTableMySQL
OPTION.general.parallel_mode = True
test_db = DataTableMySQL(element_name='test')
left = test_db.select(where=(test_db.table.id == '1'))
right = test_db.select(where=(test_db.table.id == '2'))
# Some other operations without using left or right ...
result = pd.concat([left, right])
在 pd.concat
操作前,两次对 select
方法的调用将并发执行。
注意事项¶
如前文所述,并发特性的实现本质,是将对有异步实现的同步方法的调用封装为代理类对象,因此会有一些情况无法触发到对实际值的获取操作。
这一般发生在对结果值的直接右操作,或在C-extension中编写了绕过Python内置方法直接操作该值时,在这种情况下,会导致 TypeError
等报错发生。
对此,deepfos提供了一个保底方法: compute
,可以保证调用该方法后得到的值为代理类对象的实际值,而非代理类对象本身。
使用示例:
from deepfos.options import OPTION
from deepfos.element.datatable import DataTableMySQL
from deepfos.lib.concurrency import compute
OPTION.general.parallel_mode = True
test_db = DataTableMySQL(element_name='test')
c = test_db.count(where=(test_db.table.id == '1'))
print(compute(c))
目前已知无法支持的用法:
对于代理类对象的实际值为 字符串 ,且代理类对象出现在字符串
in
操作左边:Proxy_value in "abcde"
补充¶
compute
可用在对方法触发的相对顺序有需求的逻辑中,可保证在 compute
执行完毕后,其背后的异步方法必然已被执行完毕。
使用示例:
from deepfos.options import OPTION
from deepfos.element.datatable import DataTableMySQL
from deepfos.lib.concurrency import compute
OPTION.general.parallel_mode = True
test_db = DataTableMySQL(element_name='test')
do_delete = test_db.delete(where=(test_db.table.id == 1))
# Some other operations ...
compute(do_delete)
print(test_db.select(limit=5))
附录:并发特性效率报告¶
数据准备¶
本次并发效率比较以查询测试数据表元素的操作作为样例,关注于不同请求数下切换为并发模式后可以带来的效率提升。
测试环境¶
此次查询的数据表为alpha环境ehcdge空间下ehcdge006应用内的global_icp_add元素,该数据表内有106815条数据。
通过初始化数据表元素并待其meta信息加载完毕后,重复不同次数的相同的select操作来统计不同请求数下的耗时,此处,单次select操作将触发一次请求。
初始化代码:
from deepfos.element.datatable import DataTableMySQL
test_db = DataTableMySQL(element_name='global_icp_add')
# 保证select中用到的meta已经被加载完毕
print(test_db.table)
单次执行的内容:
test_db.select(where=(test_db.table._id == 1))
耗时情况¶
请求数 |
并发执行耗时(s) |
串行执行耗时(s) |
---|---|---|
1 |
0.15821815 |
0.15047904 |
10 |
0.17522854 |
1.45420659 |
50 |
0.3274132 |
7.28559049 |
100 |
0.48932746 |
14.71017488 |
200 |
0.89331368 |
29.6673264 |
500 |
2.4170979 |
76.1682548 |
其中,为了统计数据的代表性,请求数为1/10/50的耗时数据,经过了10次统计取平均,100/200的经过了5次统计取平均,500的为2次。
耗时情况柱形图¶
提速率¶
总结¶
在 可并发 的请求数达到一定规模(大于50)时,使用Deepfos v1.1 提供的并发特性可以得到超过20倍的提速。