What’s new¶
v1.1 相对于 v1.0 最大的更新,即为对异步编程的全面支持,具体体现在:
所有涉及网络I/O的函数都有异步的实现
同步实现基于异步实现完成
元素类中常用的元数据的加载,将在其实例化过程中作为后台异步任务执行,进一步减少元数据加载过程的耗时, 详见 future_property介绍
异步化的常用类¶
异步函数同步化¶
v1.1的deepfos项目提供了十分简易的异步函数同步化改造方案,即:
使用synchronize装饰器装饰实现
简单示例¶
例如如下异步函数:
import asyncio
async def do_task(task_id):
print(f'Do task {task_id}...')
await asyncio.sleep(1)
print(f'Task {task_id} done')
本身需用 await
语法或被放入事件循环中才能被执行,然而在被 synchronize 装饰器装饰后,可以像同步函数一样被调用
import asyncio
from deepfos.element.base import synchronize
@synchronize
async def do_task(task_id):
print(f'Do task {task_id}...')
await asyncio.sleep(1)
print(f'Task {task_id} done')
for i in range(5):
do_task(i)
将打印如下:
Do task 0...
Task 0 done
Do task 1...
Task 1 done
Do task 2...
Task 2 done
Do task 3...
Task 3 done
Do task 4...
Task 4 done
在 for 循环中,其中的异步函数将顺次执行,且无并发
在如上例子中,经过 synchronize 装饰后的效果与直接在循环中用 asyncio.run
几乎没有差异,
而这显然没有体现出异步编程的优势,因此deepfos v1.1提供了并发编程的支持,
可以仅通过一个配置项启动后,在不改变如上写法的前提下使异步函数可以并发执行
配置项:
OPTION.general.parallel_mode: 是否开启并发模式
使用示例:
import asyncio
from deepfos.element.base import synchronize
from deepfos import OPTION
OPTION.general.parallel_mode = True
@synchronize
async def do_task(task_id):
print(f'Do task {task_id}...')
await asyncio.sleep(1)
print(f'Task {task_id} done')
for i in range(5):
do_task(i)
将打印如下:
Do task 0...
Do task 1...
Do task 2...
Do task 3...
Do task 4...
Task 0 done
Task 2 done
Task 4 done
Task 1 done
Task 3 done
从打印内容可以看出,异步函数的执行是并发的
基于此特性,可以编写对耗时要求较高但对执行顺序无要求的功能
例如通过如下代码实现并发升级元素:
from deepfos import OPTION
from deepfos.api.app import AppAPI
from deepfos.api.models.app import ElementUpgradeInfoDto
from deepfos.element.base import synchronize
api = AppAPI(sync=False)
OPTION.general.parallel_mode = True
@synchronize
async def upgrade_once(upgrade_dto: ElementUpgradeInfoDto):
await api.element.element_upgrade([upgrade_dto])
elements = [
ElementUpgradeInfoDto(...),
ElementUpgradeInfoDto(...),
ElementUpgradeInfoDto(...),
ElementUpgradeInfoDto(...),
ElementUpgradeInfoDto(...),
ElementUpgradeInfoDto(...)
]
for ele in elements:
upgrade_once(ele)
synchronize 装饰器在并发和非并发模式下的逻辑差异概况如下:
在并发模式下,对异步函数的调用将返回可对其结果进行访问的 ParallelProxy
在非并发模式中,对异步函数的调用将变为将其放入
evloop
并同步等待其执行完毕
关于并发模式的更多说明,可见 并发特性介绍
项目示例¶
v1.1的deepfos项目中现有异步类的同步化,皆为基于对 synchronize 装饰器的进一步包装来实现,并由此提供了一个较为通用的模式:
使用SyncMeta元类来实现类内异步函数的同步化改造
具体来说,就是经过 SyncMeta 元类创建,并包含了 synchronize
成员的类,
可通过在 synchronize
元组中指定异步函数名的方式,使类内的异步函数可以被同步调用
例如异步的变量元素类定义如下:
class AsyncVariable(ElementBase):
api_class = VariableAPI
api: VariableAPI
......
async def save(self):
payload = UpdateVariavlesDTO.construct_from(
self.meta, description=self._description,
globalVariables=list(self._gv_memo.values()),
userVariables=list(self._uv_memo.values()),
moduleId=self.api.module_id
)
return await self._update_impl(payload=payload)
async def _update_impl(self, payload: UpdateVariavlesDTO):
# async_api成员是异步的API类对象
# 因此其中的update接口函数也为异步函数
await self.async_api.variable.update(payload)
...
其同步的元素类定义如下:
class Variable(AsyncVariable, metaclass=SyncMeta):
synchronize = ('save', )
if TYPE_CHECKING:
def save(self):
...
在异步变量元素对外暴露的函数中,只有 save 函数是异步函数,因此只需继承 AsyncVariable 后,
将该函数名加入 synchronize
成员,表明在创建该子类的过程中,需同步化 save 函数
这样以后,在使用时,可以直接以同步调用的语法调用 save 函数
from deepfos.element.variable import Variable
variable = Variable(element_name='test_val', path='/test')
variable.save()
实现原理¶
SyncMeta 元类中,会在 base
和 namespace
中寻找同步化信息,即 synchronize
成员,
然后在 base
中,将最后一次出现的相应异步函数类成员以 synchronize 装饰器装饰,替换回当前类中
class SyncMeta(_AppendDoc):
def __new__(mcs, name, bases, namespace, **kwargs):
base = bases[0]
methods = None
if len(bases) > 1:
for parent in bases:
if hasattr(parent, "synchronize"):
methods = parent.synchronize
break
if methods is None:
methods = namespace.pop('synchronize', [])
for attr in methods:
namespace[attr] = synchronize(mcs._get_from_bases(base, attr))
cls = super().__new__(mcs, name, bases, namespace, **kwargs)
return cls
这样以后,生成的类中的异步函数就有了同步实现,且可以使用到 synchronize 装饰器自带的特性
Future Property¶
除了独立的异步函数,在项目开发的功能类使用场景中,还有一种较常见的I/O开销大的情况,即包含了接口请求的元数据的准备
元数据一般涉及一个或多个接口请求,用于组织一个可能被多次使用的类成员,出于性能和复用性考虑,
这一般会是一个 cached_property
,然而, cached_property
只能用于装饰同步函数,
且这种使用方式无法体现出异步I/O的优势
针对这种使用场景,v1.1 版本的deepfos项目提供了一种专用于如上场景的装饰器类—— future_property
前提¶
future_property 装饰器类需要配合 FuturePropertyMeta 元类使用, 后者负责重写类的__init__,使得 future_property 装饰过的方法将在__init__过程中被 异步调用, 并注册得到结果时的回调
如上过程可以保证 future_property 装饰器类的使用体感在涉及异步函数的情况下与 cached_property
相似
使用示例¶
例如一个简单的使用样例:
import asyncio
from deepfos.lib.asynchronous import FuturePropertyMeta, future_property
class Example(metaclass=FuturePropertyMeta):
async def get_a(self):
await asyncio.sleep(1)
return 1
async def get_b(self):
await asyncio.sleep(2)
return 2
@future_property
async def get_all(self):
a = await self.get_a()
b = await self.get_b()
return a, b
e = Example()
print(e.get_all)
将打印:
(1, 2)
在如上样例中,使用 get_all 处无需使用 await
语法, 且用法和 cached_property
一样,都是访问成员变量的语法
注意¶
由于异步函数间在 await
切换间的执行先后顺序是不确定的,如需在 future_property 中,
使用其他被 future_property 装饰过的成员变量,有发生死锁的可能性
对此, future_property 提供了 wait_for 方法, 用以安全有序地在 future_property 所装饰的异步函数内访问其他 future_property 属性
写法示例:
import asyncio
from deepfos.lib.asynchronous import FuturePropertyMeta, future_property
class Example(metaclass=FuturePropertyMeta):
async def get_a(self):
await asyncio.sleep(1)
return 1
async def get_b(self):
await asyncio.sleep(2)
return 2
@future_property
async def concat_a(self):
# 注意此处使用的是type(self)
# 则效果为访问future_property(self.get_all)
# 如为self.get_all
# 则效果为访问future_property(self.get_all).__get__(self)
# 无法调用到future_property.wait_for
_all = await type(self).get_all.wait_for(self)
a = await self.get_a()
return a, _all
@future_property
async def get_all(self):
a = await self.get_a()
b = await self.get_b()
return a, b
e = Example()
print(e.concat_a)
将打印:
(1, (1, 2))
补充¶
如需使 future_property 装饰的元数据失效,可使用 future_property 的 reset 方法, 传入当前类实例后,与该实例绑定的元数据结果将被清空,再次调用时则会重新获取
例如如下代码:
import asyncio
from deepfos.lib.asynchronous import FuturePropertyMeta, future_property
class Example(metaclass=FuturePropertyMeta):
async def get_a(self):
await asyncio.sleep(2)
print('Getting a...')
return 1
async def get_b(self):
await asyncio.sleep(1)
print('Getting b...')
return 2
@future_property
async def get_all(self):
a = await self.get_a()
b = await self.get_b()
return a, b
使用如下方式使用:
e = Example()
print(e.get_all)
print('Get again')
print(e.get_all)
type(e).get_all.reset(e)
print('After reset, get again')
print(e.get_all)
则打印结果如下:
Getting a...
Getting b...
(1, 2)
Get again
(1, 2)
After reset, get again
Getting a...
Getting b...
(1, 2)