并发编程

在 v1.1 中,本项目引入了若干新特性,将使调用 elementdb 内异步方法的并发编程变得更为容易。

本文将以 element 内方法的调用为例,就如下三种写法介绍在本项目中的并发编程:

  • 基于多线程的并发编程

  • 基于协程的并发编程

  • 元素类提供的协程并发特性

基于多线程的并发编程

deepfos项目初始(v1.0.0)即在 deepfos.lib.concurrency 中提供了线程池类 ThreadCtxExecutor

可以通过如下写法将有 I/O 开销的 element 内同步方法放入线程池中,达到并发的效果:

from deepfos.lib.concurrency import ThreadCtxExecutor
from deepfos.element.datatable import DataTableMySQL

test_db = DataTableMySQL('test')

with ThreadCtxExecutor(max_workers=3) as exc:
    futures = [
        exc.submit(test_db.delete, where=(test_db.table.id == 1)),
        exc.submit(test_db.delete, where=(test_db.table.id == 2)),
        exc.submit(test_db.delete, where=(test_db.table.id == 3)),
    ]

如需获得返回,可通过调用 futures 中任务的 result 来获取:

from deepfos.lib.concurrency import ThreadCtxExecutor
from deepfos.element.datatable import DataTableMySQL

test_db = DataTableMySQL('test')

results = []
with ThreadCtxExecutor(max_workers=3) as exc:
    futures = [
        exc.submit(test_db.delete, where=(test_db.table.id == 1)),
        exc.submit(test_db.delete, where=(test_db.table.id == 2)),
        exc.submit(test_db.delete, where=(test_db.table.id == 3)),
    ]

    for f in as_completed(futures):
        results.append(f.result())

ThreadCtxExecutor 类本身继承自 ThreadPoolExecutor ,用法与其一致,具体用法可参考: https://docs.python.org/3/library/concurrent.futures.html?concurrent.futures.ThreadPoolExecutor#threadpoolexecutor

基于协程的并发编程

deepfos项目中所有 deepfos.api 内封装的接口方法,皆为通过发送http异步请求的方式实现。

协程相较于线程,开销更小,切换更快,执行顺序明确,因此在本项目的中并发编程中,更为推荐使用。

基于协程的并发调用 element 内方法示例:

import asyncio
from deepfos.element.datatable import AsyncDataTableMySQL

async def batch_delete():
    test_db = AsyncDataTableMySQL('test')

    results = await asyncio.gather(
        test_db.delete(where=(test_db.table.id == 1)),
        test_db.delete(where=(test_db.table.id == 2)),
        test_db.delete(where=(test_db.table.id == 3)),
    )

    return results

asyncio.run(batch_delete())

所有 elementdb 内都有异步实现,名称以 Async 为开头与同步类做区分

其中异步方法的定义,可参考示例如下:

async def delete(
    self,
    where: Union[str, Term, EmptyCriterion],
) -> CustomSqlRespDTO:

在异步类中,涉及接口请求的方法,皆为异步方法,因此需要以异步编程的方式调用,即使用 async/await 语法。

关于异步编程,具体可参考: https://docs.python.org/3/library/asyncio.html

元素类提供的协程并发特性

使用异步编程固然可以得到较高的并发收益,但是由于异步方法编写具有的传染性,对于已有项目的异步重构可能是成本巨大的。

对此,在 v1.1 中,deepfos提供了如下配置项:

OPTION.general.parallel_mode: 是否开启并发模式

该配置项默认关闭,开启后,可以在调用 elementdb 模块中有异步实现的同步方法时, 将调用的行为封装为一种代理类对象,在该代理类对象中, 对方法的同步调用,将变为把对其的异步调用放入协程中, 只有在使用方法的返回或程序退出前,会获取异步方法的返回结果或等待其执行结束。

理想情况下,启用该配置项后,多处对有异步实现的同步方法的调用在结果被使用或程序退出前, 其异步方法是并发执行的,从而达到更高的执行效率,且该过程在编程中是无感的,适用于对已有代码进行并发化改造。

基于该配置项的并发编程示例:

from deepfos.options import OPTION
from deepfos.element.datatable import DataTableMySQL

OPTION.general.parallel_mode = True

test_db = DataTableMySQL(element_name='test')

test_db.delete(where=(test_db.table.id == 1))
test_db.delete(where=(test_db.table.id == 2))
test_db.delete(where=(test_db.table.id == 3))

在程序退出前,3次对 delete 方法的调用将在协程内并发执行。

警告

并发执行意味着执行顺序是不固定的,如果有对执行顺序的要求,可参考 compute的用法

对于需要使用返回值进一步操作的情况,可参考如下:

import pandas as pd
from deepfos.options import OPTION
from deepfos.element.datatable import DataTableMySQL

OPTION.general.parallel_mode = True

test_db = DataTableMySQL(element_name='test')

left = test_db.select(where=(test_db.table.id == '1'))
right = test_db.select(where=(test_db.table.id == '2'))

# Some other operations without using left or right ...

result = pd.concat([left, right])

pd.concat 操作前,两次对 select 方法的调用将并发执行。

注意事项

如前文所述,并发特性的实现本质,是将对有异步实现的同步方法的调用封装为代理类对象,因此会有一些情况无法触发到对实际值的获取操作。

这一般发生在对结果值的直接右操作,或在C-extension中编写了绕过Python内置方法直接操作该值时,在这种情况下,会导致 TypeError 等报错发生。

对此,deepfos提供了一个保底方法: compute ,可以保证调用该方法后得到的值为代理类对象的实际值,而非代理类对象本身。

使用示例:

from deepfos.options import OPTION
from deepfos.element.datatable import DataTableMySQL
from deepfos.lib.concurrency import compute

OPTION.general.parallel_mode = True

test_db = DataTableMySQL(element_name='test')

c = test_db.count(where=(test_db.table.id == '1'))

print(compute(c))

目前已知无法支持的用法:

  • 对于代理类对象的实际值为 字符串 ,且代理类对象出现在字符串 in 操作左边:

Proxy_value in "abcde"

补充

compute 可用在对方法触发的相对顺序有需求的逻辑中,可保证在 compute 执行完毕后,其背后的异步方法必然已被执行完毕。

使用示例:

from deepfos.options import OPTION
from deepfos.element.datatable import DataTableMySQL
from deepfos.lib.concurrency import compute

OPTION.general.parallel_mode = True

test_db = DataTableMySQL(element_name='test')

do_delete = test_db.delete(where=(test_db.table.id == 1))

# Some other operations ...

compute(do_delete)

print(test_db.select(limit=5))

附录:并发特性效率报告

数据准备

本次并发效率比较以查询测试数据表元素的操作作为样例,关注于不同请求数下切换为并发模式后可以带来的效率提升。

测试环境

此次查询的数据表为alpha环境ehcdge空间下ehcdge006应用内的global_icp_add元素,该数据表内有106815条数据。

通过初始化数据表元素并待其meta信息加载完毕后,重复不同次数的相同的select操作来统计不同请求数下的耗时,此处,单次select操作将触发一次请求。

初始化代码:

from deepfos.element.datatable import DataTableMySQL

test_db = DataTableMySQL(element_name='global_icp_add')

# 保证select中用到的meta已经被加载完毕
print(test_db.table)

单次执行的内容:

test_db.select(where=(test_db.table._id == 1))

耗时情况

请求数

并发执行耗时(s)

串行执行耗时(s)

1

0.15821815

0.15047904

10

0.17522854

1.45420659

50

0.3274132

7.28559049

100

0.48932746

14.71017488

200

0.89331368

29.6673264

500

2.4170979

76.1682548

其中,为了统计数据的代表性,请求数为1/10/50的耗时数据,经过了10次统计取平均,100/200的经过了5次统计取平均,500的为2次。

耗时情况柱形图

../_images/duration.png

提速率

../_images/speedup.png

总结

可并发 的请求数达到一定规模(大于50)时,使用Deepfos v1.1 提供的并发特性可以得到超过20倍的提速。