并发编程 ====================== 在 v1.1 中,本项目引入了若干新特性,将使调用 **element** 和 **db** 内异步方法的并发编程变得更为容易。 本文将以 **element** 内方法的调用为例,就如下三种写法介绍在本项目中的并发编程: - 基于多线程的并发编程 - 基于协程的并发编程 - 元素类提供的协程并发特性 基于多线程的并发编程 ----------------------- deepfos项目初始(v1.0.0)即在 ``deepfos.lib.concurrency`` 中提供了线程池类 ``ThreadCtxExecutor`` 可以通过如下写法将有 I/O 开销的 **element** 内同步方法放入线程池中,达到并发的效果: .. code-block:: python :emphasize-lines: 1,6 from deepfos.lib.concurrency import ThreadCtxExecutor from deepfos.element.datatable import DataTableMySQL test_db = DataTableMySQL('test') with ThreadCtxExecutor(max_workers=3) as exc: futures = [ exc.submit(test_db.delete, where=(test_db.table.id == 1)), exc.submit(test_db.delete, where=(test_db.table.id == 2)), exc.submit(test_db.delete, where=(test_db.table.id == 3)), ] 如需获得返回,可通过调用 ``futures`` 中任务的 ``result`` 来获取: .. code-block:: python :emphasize-lines: 14,15 from deepfos.lib.concurrency import ThreadCtxExecutor from deepfos.element.datatable import DataTableMySQL test_db = DataTableMySQL('test') results = [] with ThreadCtxExecutor(max_workers=3) as exc: futures = [ exc.submit(test_db.delete, where=(test_db.table.id == 1)), exc.submit(test_db.delete, where=(test_db.table.id == 2)), exc.submit(test_db.delete, where=(test_db.table.id == 3)), ] for f in as_completed(futures): results.append(f.result()) ``ThreadCtxExecutor`` 类本身继承自 ``ThreadPoolExecutor`` ,用法与其一致,具体用法可参考: https://docs.python.org/3/library/concurrent.futures.html?concurrent.futures.ThreadPoolExecutor#threadpoolexecutor 基于协程的并发编程 -------------------- deepfos项目中所有 ``deepfos.api`` 内封装的接口方法,皆为通过发送http异步请求的方式实现。 协程相较于线程,开销更小,切换更快,执行顺序明确,因此在本项目的中并发编程中,更为推荐使用。 基于协程的并发调用 **element** 内方法示例: .. code-block:: python :emphasize-lines: 2,5 import asyncio from deepfos.element.datatable import AsyncDataTableMySQL async def batch_delete(): test_db = AsyncDataTableMySQL('test') results = await asyncio.gather( test_db.delete(where=(test_db.table.id == 1)), test_db.delete(where=(test_db.table.id == 2)), test_db.delete(where=(test_db.table.id == 3)), ) return results asyncio.run(batch_delete()) 所有 **element** 和 **db** 内都有异步实现,名称以 ``Async`` 为开头与同步类做区分 其中异步方法的定义,可参考示例如下: .. code-block:: python :emphasize-lines: 1 async def delete( self, where: Union[str, Term, EmptyCriterion], ) -> CustomSqlRespDTO: 在异步类中,涉及接口请求的方法,皆为异步方法,因此需要以异步编程的方式调用,即使用 ``async/await`` 语法。 关于异步编程,具体可参考: https://docs.python.org/3/library/asyncio.html .. _并发特性: 元素类提供的协程并发特性 ------------------------- 使用异步编程固然可以得到较高的并发收益,但是由于异步方法编写具有的传染性,对于已有项目的异步重构可能是成本巨大的。 对此,在 v1.1 中,deepfos提供了如下配置项: :: OPTION.general.parallel_mode: 是否开启并发模式 该配置项默认关闭,开启后,可以在调用 **element** 和 **db** 模块中有异步实现的同步方法时, 将调用的行为封装为一种代理类对象,在该代理类对象中, 对方法的同步调用,将变为把对其的异步调用放入协程中, 只有在使用方法的返回或程序退出前,会获取异步方法的返回结果或等待其执行结束。 理想情况下,启用该配置项后,多处对有异步实现的同步方法的调用在结果被使用或程序退出前, 其异步方法是并发执行的,从而达到更高的执行效率,且该过程在编程中是无感的,适用于对已有代码进行并发化改造。 基于该配置项的并发编程示例: .. code-block:: python :emphasize-lines: 1,4 from deepfos.options import OPTION from deepfos.element.datatable import DataTableMySQL OPTION.general.parallel_mode = True test_db = DataTableMySQL(element_name='test') test_db.delete(where=(test_db.table.id == 1)) test_db.delete(where=(test_db.table.id == 2)) test_db.delete(where=(test_db.table.id == 3)) 在程序退出前,3次对 ``delete`` 方法的调用将在协程内并发执行。 .. warning:: 并发执行意味着执行顺序是不固定的,如果有对执行顺序的要求,可参考 compute的用法_ 对于需要使用返回值进一步操作的情况,可参考如下: .. code-block:: python :emphasize-lines: 1,5 import pandas as pd from deepfos.options import OPTION from deepfos.element.datatable import DataTableMySQL OPTION.general.parallel_mode = True test_db = DataTableMySQL(element_name='test') left = test_db.select(where=(test_db.table.id == '1')) right = test_db.select(where=(test_db.table.id == '2')) # Some other operations without using left or right ... result = pd.concat([left, right]) 在 ``pd.concat`` 操作前,两次对 ``select`` 方法的调用将并发执行。 **注意事项** ~~~~~~~~~~~~~~ 如前文所述,并发特性的实现本质,是将对有异步实现的同步方法的调用封装为代理类对象,因此会有一些情况无法触发到对实际值的获取操作。 这一般发生在对结果值的直接右操作,或在C-extension中编写了绕过Python内置方法直接操作该值时,在这种情况下,会导致 ``TypeError`` 等报错发生。 对此,deepfos提供了一个保底方法: ``compute`` ,可以保证调用该方法后得到的值为代理类对象的实际值,而非代理类对象本身。 使用示例: .. code-block:: python :emphasize-lines: 3,11 from deepfos.options import OPTION from deepfos.element.datatable import DataTableMySQL from deepfos.lib.concurrency import compute OPTION.general.parallel_mode = True test_db = DataTableMySQL(element_name='test') c = test_db.count(where=(test_db.table.id == '1')) print(compute(c)) 目前已知无法支持的用法: - 对于代理类对象的实际值为 **字符串** ,且代理类对象出现在字符串 ``in`` 操作左边: .. code-block:: python Proxy_value in "abcde" .. _compute的用法: **补充** >>>>>>>>>>>> ``compute`` 可用在对方法触发的相对顺序有需求的逻辑中,可保证在 ``compute`` 执行完毕后,其背后的异步方法必然已被执行完毕。 使用示例: .. code-block:: python :emphasize-lines: 13 from deepfos.options import OPTION from deepfos.element.datatable import DataTableMySQL from deepfos.lib.concurrency import compute OPTION.general.parallel_mode = True test_db = DataTableMySQL(element_name='test') do_delete = test_db.delete(where=(test_db.table.id == 1)) # Some other operations ... compute(do_delete) print(test_db.select(limit=5)) 附录:并发特性效率报告 ---------------------- 数据准备 ~~~~~~~~ 本次并发效率比较以查询测试数据表元素的操作作为样例,关注于不同请求数下切换为并发模式后可以带来的效率提升。 测试环境 ~~~~~~~~~~~ 此次查询的数据表为alpha环境ehcdge空间下ehcdge006应用内的global_icp_add元素,该数据表内有106815条数据。 通过初始化数据表元素并待其meta信息加载完毕后,重复不同次数的相同的select操作来统计不同请求数下的耗时,此处,单次select操作将触发一次请求。 初始化代码: .. code:: python from deepfos.element.datatable import DataTableMySQL test_db = DataTableMySQL(element_name='global_icp_add') # 保证select中用到的meta已经被加载完毕 print(test_db.table) 单次执行的内容: :: test_db.select(where=(test_db.table._id == 1)) 耗时情况 ~~~~~~~~ ====== =============== =============== 请求数 并发执行耗时(s) 串行执行耗时(s) ====== =============== =============== 1 0.15821815 0.15047904 10 0.17522854 1.45420659 50 0.3274132 7.28559049 100 0.48932746 14.71017488 200 0.89331368 29.6673264 500 2.4170979 76.1682548 ====== =============== =============== 其中,为了统计数据的代表性,请求数为1/10/50的耗时数据,经过了10次统计取平均,100/200的经过了5次统计取平均,500的为2次。 耗时情况柱形图 >>>>>>>>>>>>>>>>>>>>>>>> .. panels:: :container: container pb-1 img-auto-width :column: col-lg-12 p-0 :body: p-0 .. image:: ../images/duration.png 提速率 >>>>>>>>>>>> .. panels:: :container: container pb-1 img-auto-width :column: col-lg-12 p-0 :body: p-0 .. image:: ../images/speedup.png 总结 ~~~~ 在 **可并发** 的请求数达到一定规模(大于50)时,使用Deepfos v1.1 提供的并发特性可以得到超过20倍的提速。