What’s new

v1.1 相对于 v1.0 最大的更新,即为对异步编程的全面支持,具体体现在:

  • 所有涉及网络I/O的函数都有异步的实现

  • 同步实现基于异步实现完成

  • 元素类中常用的元数据的加载,将在其实例化过程中作为后台异步任务执行,进一步减少元数据加载过程的耗时, 详见 future_property介绍

异步化的常用类

现有的异步类全集详见 异步元素类数据库类

异步函数同步化

v1.1的deepfos项目提供了十分简易的异步函数同步化改造方案,即:

使用synchronize装饰器装饰实现

简单示例

例如如下异步函数:

import asyncio

async def do_task(task_id):
    print(f'Do task {task_id}...')
    await asyncio.sleep(1)
    print(f'Task {task_id} done')

本身需用 await 语法或被放入事件循环中才能被执行,然而在被 synchronize 装饰器装饰后,可以像同步函数一样被调用

import asyncio
from deepfos.element.base import synchronize

@synchronize
async def do_task(task_id):
    print(f'Do task {task_id}...')
    await asyncio.sleep(1)
    print(f'Task {task_id} done')


for i in range(5):
    do_task(i)

将打印如下:

Do task 0...
Task 0 done
Do task 1...
Task 1 done
Do task 2...
Task 2 done
Do task 3...
Task 3 done
Do task 4...
Task 4 done

在 for 循环中,其中的异步函数将顺次执行,且无并发

在如上例子中,经过 synchronize 装饰后的效果与直接在循环中用 asyncio.run 几乎没有差异, 而这显然没有体现出异步编程的优势,因此deepfos v1.1提供了并发编程的支持, 可以仅通过一个配置项启动后,在不改变如上写法的前提下使异步函数可以并发执行

配置项:

OPTION.general.parallel_mode: 是否开启并发模式

使用示例:

import asyncio
from deepfos.element.base import synchronize
from deepfos import OPTION


OPTION.general.parallel_mode = True


@synchronize
async def do_task(task_id):
    print(f'Do task {task_id}...')
    await asyncio.sleep(1)
    print(f'Task {task_id} done')


for i in range(5):
    do_task(i)

将打印如下:

Do task 0...
Do task 1...
Do task 2...
Do task 3...
Do task 4...
Task 0 done
Task 2 done
Task 4 done
Task 1 done
Task 3 done

从打印内容可以看出,异步函数的执行是并发的

基于此特性,可以编写对耗时要求较高但对执行顺序无要求的功能

例如通过如下代码实现并发升级元素:

from deepfos import OPTION
from deepfos.api.app import AppAPI
from deepfos.api.models.app import ElementUpgradeInfoDto
from deepfos.element.base import synchronize

api = AppAPI(sync=False)

OPTION.general.parallel_mode = True


@synchronize
async def upgrade_once(upgrade_dto: ElementUpgradeInfoDto):
    await api.element.element_upgrade([upgrade_dto])


elements = [
    ElementUpgradeInfoDto(...),
    ElementUpgradeInfoDto(...),
    ElementUpgradeInfoDto(...),
    ElementUpgradeInfoDto(...),
    ElementUpgradeInfoDto(...),
    ElementUpgradeInfoDto(...)
]

for ele in elements:
    upgrade_once(ele)

synchronize 装饰器在并发和非并发模式下的逻辑差异概况如下:

  • 在并发模式下,对异步函数的调用将返回可对其结果进行访问的 ParallelProxy

  • 在非并发模式中,对异步函数的调用将变为将其放入 evloop 并同步等待其执行完毕

关于并发模式的更多说明,可见 并发特性介绍

项目示例

v1.1的deepfos项目中现有异步类的同步化,皆为基于对 synchronize 装饰器的进一步包装来实现,并由此提供了一个较为通用的模式:

使用SyncMeta元类来实现类内异步函数的同步化改造

具体来说,就是经过 SyncMeta 元类创建,并包含了 synchronize 成员的类, 可通过在 synchronize 元组中指定异步函数名的方式,使类内的异步函数可以被同步调用

例如异步的变量元素类定义如下:

class AsyncVariable(ElementBase):
    api_class = VariableAPI
    api: VariableAPI

    ......

    async def save(self):
        payload = UpdateVariavlesDTO.construct_from(
            self.meta, description=self._description,
            globalVariables=list(self._gv_memo.values()),
            userVariables=list(self._uv_memo.values()),
            moduleId=self.api.module_id
        )
        return await self._update_impl(payload=payload)

    async def _update_impl(self, payload: UpdateVariavlesDTO):
        # async_api成员是异步的API类对象
        # 因此其中的update接口函数也为异步函数
        await self.async_api.variable.update(payload)
        ...

其同步的元素类定义如下:

class Variable(AsyncVariable, metaclass=SyncMeta):
    synchronize = ('save', )

    if TYPE_CHECKING:
        def save(self):
            ...

在异步变量元素对外暴露的函数中,只有 save 函数是异步函数,因此只需继承 AsyncVariable 后, 将该函数名加入 synchronize 成员,表明在创建该子类的过程中,需同步化 save 函数

这样以后,在使用时,可以直接以同步调用的语法调用 save 函数

from deepfos.element.variable import Variable

variable = Variable(element_name='test_val', path='/test')

variable.save()

实现原理

SyncMeta 元类中,会在 basenamespace 中寻找同步化信息,即 synchronize 成员, 然后在 base 中,将最后一次出现的相应异步函数类成员以 synchronize 装饰器装饰,替换回当前类中

class SyncMeta(_AppendDoc):
    def __new__(mcs, name, bases, namespace, **kwargs):
        base = bases[0]
        methods = None

        if len(bases) > 1:
            for parent in bases:
                if hasattr(parent, "synchronize"):
                    methods = parent.synchronize
                    break

        if methods is None:
            methods = namespace.pop('synchronize', [])

        for attr in methods:
            namespace[attr] = synchronize(mcs._get_from_bases(base, attr))

        cls = super().__new__(mcs, name, bases, namespace, **kwargs)
        return cls

这样以后,生成的类中的异步函数就有了同步实现,且可以使用到 synchronize 装饰器自带的特性

Future Property

除了独立的异步函数,在项目开发的功能类使用场景中,还有一种较常见的I/O开销大的情况,即包含了接口请求的元数据的准备

元数据一般涉及一个或多个接口请求,用于组织一个可能被多次使用的类成员,出于性能和复用性考虑, 这一般会是一个 cached_property ,然而, cached_property 只能用于装饰同步函数, 且这种使用方式无法体现出异步I/O的优势

针对这种使用场景,v1.1 版本的deepfos项目提供了一种专用于如上场景的装饰器类—— future_property

前提

future_property 装饰器类需要配合 FuturePropertyMeta 元类使用, 后者负责重写类的__init__,使得 future_property 装饰过的方法将在__init__过程中被 异步调用, 并注册得到结果时的回调

如上过程可以保证 future_property 装饰器类的使用体感在涉及异步函数的情况下与 cached_property 相似

使用示例

例如一个简单的使用样例:

import asyncio
from deepfos.lib.asynchronous import FuturePropertyMeta, future_property


class Example(metaclass=FuturePropertyMeta):
    async def get_a(self):
        await asyncio.sleep(1)
        return 1

    async def get_b(self):
        await asyncio.sleep(2)
        return 2

    @future_property
    async def get_all(self):
        a = await self.get_a()
        b = await self.get_b()

        return a, b


e = Example()
print(e.get_all)

将打印:

(1, 2)

在如上样例中,使用 get_all 处无需使用 await 语法, 且用法和 cached_property 一样,都是访问成员变量的语法

注意

由于异步函数间在 await 切换间的执行先后顺序是不确定的,如需在 future_property 中, 使用其他被 future_property 装饰过的成员变量,有发生死锁的可能性

对此, future_property 提供了 wait_for 方法, 用以安全有序地在 future_property 所装饰的异步函数内访问其他 future_property 属性

写法示例:

import asyncio
from deepfos.lib.asynchronous import FuturePropertyMeta, future_property


class Example(metaclass=FuturePropertyMeta):
    async def get_a(self):
        await asyncio.sleep(1)
        return 1

    async def get_b(self):
        await asyncio.sleep(2)
        return 2

    @future_property
    async def concat_a(self):
        # 注意此处使用的是type(self)
        # 则效果为访问future_property(self.get_all)
        # 如为self.get_all
        # 则效果为访问future_property(self.get_all).__get__(self)
        # 无法调用到future_property.wait_for
        _all = await type(self).get_all.wait_for(self)
        a = await self.get_a()
        return a, _all

    @future_property
    async def get_all(self):
        a = await self.get_a()
        b = await self.get_b()

        return a, b


e = Example()
print(e.concat_a)

将打印:

(1, (1, 2))

补充

如需使 future_property 装饰的元数据失效,可使用 future_propertyreset 方法, 传入当前类实例后,与该实例绑定的元数据结果将被清空,再次调用时则会重新获取

例如如下代码:

import asyncio
from deepfos.lib.asynchronous import FuturePropertyMeta, future_property


class Example(metaclass=FuturePropertyMeta):
    async def get_a(self):
        await asyncio.sleep(2)
        print('Getting a...')
        return 1

    async def get_b(self):
        await asyncio.sleep(1)
        print('Getting b...')
        return 2

    @future_property
    async def get_all(self):
        a = await self.get_a()
        b = await self.get_b()

        return a, b

使用如下方式使用:

e = Example()
print(e.get_all)
print('Get again')
print(e.get_all)
type(e).get_all.reset(e)
print('After reset, get again')
print(e.get_all)

则打印结果如下:

Getting a...
Getting b...
(1, 2)
Get again
(1, 2)
After reset, get again
Getting a...
Getting b...
(1, 2)