What's new ================================ v1.1 相对于 v1.0 最大的更新,即为对异步编程的全面支持,具体体现在: * 所有涉及网络I/O的函数都有异步的实现 * 同步实现基于异步实现完成 * 元素类中常用的元数据的加载,将在其实例化过程中作为后台异步任务执行,进一步减少元数据加载过程的耗时, 详见 future_property介绍_ 异步化的常用类 ------------------------------------ 现有的异步类全集详见 :ref:`异步元素类` 和 :ref:`数据库类` 异步函数同步化 ------------------------------------ v1.1的deepfos项目提供了十分简易的异步函数同步化改造方案,即: **使用synchronize装饰器装饰实现** 简单示例 ~~~~~~~~~~~~~~ 例如如下异步函数: .. code-block:: python import asyncio async def do_task(task_id): print(f'Do task {task_id}...') await asyncio.sleep(1) print(f'Task {task_id} done') 本身需用 ``await`` 语法或被放入事件循环中才能被执行,然而在被 *synchronize* 装饰器装饰后,可以像同步函数一样被调用 .. code-block:: python import asyncio from deepfos.element.base import synchronize @synchronize async def do_task(task_id): print(f'Do task {task_id}...') await asyncio.sleep(1) print(f'Task {task_id} done') for i in range(5): do_task(i) 将打印如下: .. code-block:: python Do task 0... Task 0 done Do task 1... Task 1 done Do task 2... Task 2 done Do task 3... Task 3 done Do task 4... Task 4 done 在 for 循环中,其中的异步函数将顺次执行,且无并发 在如上例子中,经过 *synchronize* 装饰后的效果与直接在循环中用 ``asyncio.run`` 几乎没有差异, 而这显然没有体现出异步编程的优势,因此deepfos v1.1提供了并发编程的支持, 可以仅通过一个配置项启动后,在不改变如上写法的前提下使异步函数可以并发执行 配置项: :: OPTION.general.parallel_mode: 是否开启并发模式 使用示例: .. code-block:: python :emphasize-lines: 6 import asyncio from deepfos.element.base import synchronize from deepfos import OPTION OPTION.general.parallel_mode = True @synchronize async def do_task(task_id): print(f'Do task {task_id}...') await asyncio.sleep(1) print(f'Task {task_id} done') for i in range(5): do_task(i) 将打印如下: .. code-block:: python Do task 0... Do task 1... Do task 2... Do task 3... Do task 4... Task 0 done Task 2 done Task 4 done Task 1 done Task 3 done 从打印内容可以看出,异步函数的执行是并发的 基于此特性,可以编写对耗时要求较高但对执行顺序无要求的功能 例如通过如下代码实现并发升级元素: .. code-block:: python :emphasize-lines: 8, 11, 26 from deepfos import OPTION from deepfos.api.app import AppAPI from deepfos.api.models.app import ElementUpgradeInfoDto from deepfos.element.base import synchronize api = AppAPI(sync=False) OPTION.general.parallel_mode = True @synchronize async def upgrade_once(upgrade_dto: ElementUpgradeInfoDto): await api.element.element_upgrade([upgrade_dto]) elements = [ ElementUpgradeInfoDto(...), ElementUpgradeInfoDto(...), ElementUpgradeInfoDto(...), ElementUpgradeInfoDto(...), ElementUpgradeInfoDto(...), ElementUpgradeInfoDto(...) ] for ele in elements: upgrade_once(ele) *synchronize* 装饰器在并发和非并发模式下的逻辑差异概况如下: * 在并发模式下,对异步函数的调用将返回可对其结果进行访问的 **ParallelProxy** * 在非并发模式中,对异步函数的调用将变为将其放入 ``evloop`` 并同步等待其执行完毕 关于并发模式的更多说明,可见 :ref:`并发特性介绍<并发特性>` 项目示例 ~~~~~~~~~~~~~~ v1.1的deepfos项目中现有异步类的同步化,皆为基于对 *synchronize* 装饰器的进一步包装来实现,并由此提供了一个较为通用的模式: **使用SyncMeta元类来实现类内异步函数的同步化改造** 具体来说,就是经过 **SyncMeta** 元类创建,并包含了 ``synchronize`` 成员的类, 可通过在 ``synchronize`` 元组中指定异步函数名的方式,使类内的异步函数可以被同步调用 例如异步的变量元素类定义如下: .. code-block:: python :emphasize-lines: 1,7,22,23,24 class AsyncVariable(ElementBase): api_class = VariableAPI api: VariableAPI ...... async def save(self): payload = UpdateVariavlesDTO.construct_from( self.meta, description=self._description, globalVariables=list(self._gv_memo.values()), userVariables=list(self._uv_memo.values()), moduleId=self.api.module_id ) return await self._update_impl(payload=payload) async def _update_impl(self, payload: UpdateVariavlesDTO): # async_api成员是异步的API类对象 # 因此其中的update接口函数也为异步函数 await self.async_api.variable.update(payload) ... 其同步的元素类定义如下: .. code-block:: python :emphasize-lines: 1,2 class Variable(AsyncVariable, metaclass=SyncMeta): synchronize = ('save', ) if TYPE_CHECKING: def save(self): ... 在异步变量元素对外暴露的函数中,只有 *save* 函数是异步函数,因此只需继承 **AsyncVariable** 后, 将该函数名加入 ``synchronize`` 成员,表明在创建该子类的过程中,需同步化 *save* 函数 这样以后,在使用时,可以直接以同步调用的语法调用 *save* 函数 .. code-block:: python from deepfos.element.variable import Variable variable = Variable(element_name='test_val', path='/test') variable.save() 实现原理 >>>>>>>>>>>> **SyncMeta** 元类中,会在 ``base`` 和 ``namespace`` 中寻找同步化信息,即 ``synchronize`` 成员, 然后在 ``base`` 中,将最后一次出现的相应异步函数类成员以 *synchronize* 装饰器装饰,替换回当前类中 .. code-block:: python :emphasize-lines: 16 class SyncMeta(_AppendDoc): def __new__(mcs, name, bases, namespace, **kwargs): base = bases[0] methods = None if len(bases) > 1: for parent in bases: if hasattr(parent, "synchronize"): methods = parent.synchronize break if methods is None: methods = namespace.pop('synchronize', []) for attr in methods: namespace[attr] = synchronize(mcs._get_from_bases(base, attr)) cls = super().__new__(mcs, name, bases, namespace, **kwargs) return cls 这样以后,生成的类中的异步函数就有了同步实现,且可以使用到 *synchronize* 装饰器自带的特性 .. _future_property介绍: Future Property ------------------------------------ 除了独立的异步函数,在项目开发的功能类使用场景中,还有一种较常见的I/O开销大的情况,即包含了接口请求的元数据的准备 元数据一般涉及一个或多个接口请求,用于组织一个可能被多次使用的类成员,出于性能和复用性考虑, 这一般会是一个 ``cached_property`` ,然而, ``cached_property`` 只能用于装饰同步函数, 且这种使用方式无法体现出异步I/O的优势 针对这种使用场景,v1.1 版本的deepfos项目提供了一种专用于如上场景的装饰器类—— *future_property* 前提 ~~~~~~~~~~~~~~ *future_property* 装饰器类需要配合 **FuturePropertyMeta** 元类使用, 后者负责重写类的__init__,使得 *future_property* 装饰过的方法将在__init__过程中被 **异步调用**, 并注册得到结果时的回调 如上过程可以保证 *future_property* 装饰器类的使用体感在涉及异步函数的情况下与 ``cached_property`` 相似 使用示例 ~~~~~~~~~~~~~~ 例如一个简单的使用样例: .. code-block:: python :emphasize-lines: 14 import asyncio from deepfos.lib.asynchronous import FuturePropertyMeta, future_property class Example(metaclass=FuturePropertyMeta): async def get_a(self): await asyncio.sleep(1) return 1 async def get_b(self): await asyncio.sleep(2) return 2 @future_property async def get_all(self): a = await self.get_a() b = await self.get_b() return a, b e = Example() print(e.get_all) 将打印: .. code-block:: python (1, 2) 在如上样例中,使用 *get_all* 处无需使用 ``await`` 语法, 且用法和 ``cached_property`` 一样,都是访问成员变量的语法 **注意** >>>>>>>>>>>> 由于异步函数间在 ``await`` 切换间的执行先后顺序是不确定的,如需在 *future_property* 中, 使用其他被 *future_property* 装饰过的成员变量,有发生死锁的可能性 对此, *future_property* 提供了 *wait_for* 方法, 用以安全有序地在 *future_property* 所装饰的异步函数内访问其他 *future_property* 属性 写法示例: .. code-block:: python :emphasize-lines: 16, 17, 18, 19, 20, 21 import asyncio from deepfos.lib.asynchronous import FuturePropertyMeta, future_property class Example(metaclass=FuturePropertyMeta): async def get_a(self): await asyncio.sleep(1) return 1 async def get_b(self): await asyncio.sleep(2) return 2 @future_property async def concat_a(self): # 注意此处使用的是type(self) # 则效果为访问future_property(self.get_all) # 如为self.get_all # 则效果为访问future_property(self.get_all).__get__(self) # 无法调用到future_property.wait_for _all = await type(self).get_all.wait_for(self) a = await self.get_a() return a, _all @future_property async def get_all(self): a = await self.get_a() b = await self.get_b() return a, b e = Example() print(e.concat_a) 将打印: .. code-block:: python (1, (1, 2)) 补充 ~~~~~~~~~~~~~~ 如需使 *future_property* 装饰的元数据失效,可使用 *future_property* 的 *reset* 方法, 传入当前类实例后,与该实例绑定的元数据结果将被清空,再次调用时则会重新获取 例如如下代码: .. code-block:: python import asyncio from deepfos.lib.asynchronous import FuturePropertyMeta, future_property class Example(metaclass=FuturePropertyMeta): async def get_a(self): await asyncio.sleep(2) print('Getting a...') return 1 async def get_b(self): await asyncio.sleep(1) print('Getting b...') return 2 @future_property async def get_all(self): a = await self.get_a() b = await self.get_b() return a, b 使用如下方式使用: .. code-block:: python e = Example() print(e.get_all) print('Get again') print(e.get_all) type(e).get_all.reset(e) print('After reset, get again') print(e.get_all) 则打印结果如下: .. code-block:: python Getting a... Getting b... (1, 2) Get again (1, 2) After reset, get again Getting a... Getting b... (1, 2)