Новый Python: Асинхронное всё

отступление 1: Сопрограммы (coroutines)

(PEP 342)

Сопрограмма - это такая сущность, которой можно передавать и получать назад управление и данные и которая (в отличие от процедуры) хранит внутреннее состояние. Можно заметить, что классические генераторы это уже почти сопрограммы, но не совсем:

  • generators cannot yield control while other functions are executing, unless those functions are themselves expressed as generators,
  • генератор только отдает значения, но необходимо еще иметь возможность засовывать данные внутрь.

И так действительно можно:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
>>> def flipflop(initial=0):
...     x = initial
...     while True:
...             x = yield x
... 
>>> f = flipflop()
>>> f.send(2)
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
TypeError: can't send non-None value to a just-started generator
>>> f.send(None)
0
>>> f.send(1)
1
>>> f.send(4)
4
>>> f.send(None)
>>> f.send(123)
123
>>> f.close()

еще можно закидывать исключения в итератор: it.exception()

в результате получаем полноценную сопрограмму: можем передавать и управление, и значения в обе стороны.

отступление 2: yield from

(PEP 380)

если разбивать генератор на мелкие кусочки, `'нельзя просто так взять и засунуть `yield` на более глубокий уровень`'

последовательность значений из итератора (в одну сторону) можно вернуть как-то так:

#!python
for v in g:
    yield v

но (см. выше) yield же может принимать значения снаружи поэтому yield from <expr>

как это работает:

когда выполнение натыкается на yield from subg(), все приходящие через send()/next() запросы проксируются в subg, а yield'имые значения соответственно передаются сразу наружу. Так происходит до тех пор, пока subg не кончится (т.е. не выбросит StopIteration), после этого выполнение внешнего генератора продолжается.

пример (вкуривать пока не станет понятно):

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
def g1(a):
    print('  g1 enter')
    for n in range(a):
        print('  g1 n=',n)
        batman = yield "batman",n 
        print('  g1 batman=',batman)
        yield from g2(n)

def g2(n):
    print('    g2 enter')
    q=0
    for i in range(n):
        print('    g2 i=',i)
        q=yield(i+q*10)
        print('    g2 q=',q)


val=0
g = g1(4)
print('create g')
print('send none', g.send(None))
while True:
    try:
        print('sending ',val)
        result = g.send(val)
        print('got', result)
        val += 10
    except StopIteration:
        print('StopIteration')
        break

а работает он вот так:

create g
  g1 enter
  g1 n= 0
send none ('batman', 0)
sending  0
  g1 batman= 0
    g2 enter
  g1 n= 1
got ('batman', 1)
sending  10
  g1 batman= 10
    g2 enter
    g2 i= 0
got 0
sending  20
    g2 q= 20
  g1 n= 2
got ('batman', 2)
sending  30
  g1 batman= 30
    g2 enter
    g2 i= 0
got 0
sending  40
    g2 q= 40
    g2 i= 1
got 401
sending  50
    g2 q= 50
  g1 n= 3
got ('batman', 3)
sending  60
  g1 batman= 60
    g2 enter
    g2 i= 0
got 0
sending  70
    g2 q= 70
    g2 i= 1
got 701
sending  80
    g2 q= 80
    g2 i= 2
got 802
sending  90
    g2 q= 90
StopIteration

asyncio

(PEP 3156)

с чего всё началось: транспорт и протокол

  • протокол осуществляет доставку байтов,
  • транспорт предоставляет/получает эти байты;
  • они общаются подергиванием неблокирующих методов друг друга

event loop

Такой объект со специфицированным интерфейсом, который выполняет функции планировщика. Собственно, в него передаётся управление, когда асинхронная сопрограмма залипает на блокирующей операции.

основные методы:

запуск и остановка: * run_forever() * run_until_complete(future) (про futures ниже) * stop(): stops the event loop as soon as it is convenient, потом можно рестартить * close(): освобождает все занятые ресурсы This should not be called while the event loop is running

  • call_soon(callback, *args). This schedules a callback to be called as soon as possible. Returns a Handle (see below) representing the callback, whose cancel() method can be used to cancel the callback. Сохраняет порядок выполнения.
  • call_later(delay, callback, *args) через n секунд, НЕ сохраняет порядок
  • call_at(when, callback, *args) через это реализуется call_later
  • time() чтобы знать, что отдавать call_at
  • <еще куча всякого тредсейфового и специфичного(сокеты всякие там), см. доки>

event loop либо глобален (и добывается через event loop policy, такой специальный объект), либо явным образом передается во все функции, которые с ним работают (грепать в пепе: event loop policy; Passing an Event Loop Around Explicitly)

в некоторых случаях эвентлуп предоставляется системой и уже запущен, и его может быть нельзя остановить или закрыть внутри программы.

asyncio предоставляет две реализации: SelectorEventLoop и ProactorEventLoop, последний работает только на винде и около.

Хэндлы нужны, чтоб отменить задание через cancel(). Других применений и публичных методов у них нет.

отступление 3: Futures

(PEP 3148)

изначально идея зародилась в concurrent (многотредовость), а потом идею зохавал asyncio. Собственно, в concurrent это выглядит следующим образом: Есть два базовых класса: Executor и Future Executor'у даются заявки на выполнение, он в ответ возвращает Future, который является представителем задания и в который можно тыкать и спрашивать, как оно поживает.

Futures в asyncio работают примерно так же.

Основное: Future можно использовать для отмены задачи, как хэндл, и можно ему отдавать коллбэки, которые он дёрнет по завершении задачи. Если Future на момент прикручивания коллбэка уже выполнен, то последний дёргается через call_soon() Каждый такой объект при создании ассоциируется с эвентлупом, который занимается заданием и дергает за все эти ниточки. * asyncio'шный Future можн использовать в правой стороне yield from (т.е. yield from footure), что имеет эффект "дождись окончания исполнения и отдай результат наружу".

есть волшебные методы:

asyncio.async(arg): делает Future из аргумента (в качестве аргумента допустимо всё что можно писать справа от yield from, т.е. сопрограмма либо future)

asyncio.wrap_future(future): обертка-адаптер для concurrent.futures.Future, чтоб оно работало с asyncio

<там есть много про протоколыi и транспорты. курите сами.>

Это всё было про коллбэки.

Теперь интересное: можно коллбэками не пользоваться, а пользоваться сопрограммами.

Things a coroutine can do: result = yield from future-- suspends the coroutine until the future is done, then returns the future's result result = yield from coroutine()-- wait for another coroutine to produce a result return expression raise exception

pep 492: async/await syntax

PEP 0492

понятие native coroutine (т.е. не из генератора)

определяется так:

1
2
async def read_data(db):
    pass
  • It is a SyntaxError to have yield or yield from expressions in an async function теперь можно (см. следующую главу)
  • Regular generators, when called, return a generator object ; similarly, coroutines return a coroutine object
  • StopIteration exceptions are not propagated out of coroutines, and are replaced with a RuntimeError . For regular generators such behavior requires a future import (see PEP 479).

await

ждем ответа от чего-либо, тем временем передаём управление в event loop

1
2
3
4
async def read_data(db):
    ...
    data = await db.fetch('SELECT ...')
    ...

с точки зрения данной сопрограммы это просто блочит выполнение, пока awaitable не завершится

т.е. db.fetch() возвращает специальный awaitable объект, от которого далее можно ждать ответа

в случае asyncio awaitable это Future собственно, awaitable бывают трех видов: * A native (async def) coroutine object
* A generator-based coroutine object (NB: генератор надо декорировать при помощи types.coroutine()) * An object with an __await__ method returning an iterator. (такое называется Future-like objects)

когда мы говорим await foo, мы отдаем управление эвентлупу до тех пор пока foo не закончит (+эпсилон)

async with

в случае с обычным with контекст глобальный. Когда мы можем передавать управление в сильно другие места (где может быть другой with это приводит к волшебным багам, поэтому нужен специальный asynchronous context manager, который умеет обрабатывать такие переключения

(Two new magic methods are added: __aenter__ and __aexit__).

async for

работает только внутри нативной сопрограммы (async def) нужен специальный asynchronous iterable __aiter__ -> __anext__ -> StopAsyncIteration

Пример:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
class AsyncIterable:
    def __aiter__(self):
        return self

    async def __anext__(self):
        data = await self.fetch_data()
        if data:
            return data
        else:
            raise StopAsyncIteration

    async def fetch_data(self):
        ...

собственно async for:

A new statement for iterating through asynchronous iterators is proposed:

1
2
3
4
async for TARGET in ITER:
    BLOCK
else:
    BLOCK2

which is semantically equivalent to:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
iter = (ITER)
iter = type(iter).__aiter__(iter)
running = True
while running:
    try:
        TARGET = await type(iter).__anext__(iter)
    except StopAsyncIteration:
        running = False
    else:
        BLOCK
else:
    BLOCK2

asynchronous comprehensions

(PEP 530)

Конечно же, для асинхронного for можно использовать короткий синтаксис генераторов:

  • set comprehension: {i async for i in agen()} ;
  • list comprehension: [i async for i in agen()] ;
  • dict comprehension: {i: i ** 2 async for i in agen()} ;
  • generator expression: (i ** 2 async for i in agen()) .

еще теперь можно в них использовать await:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
result = [await fun() for fun in funcs]
result = {await fun() for fun in funcs}
result = {fun: await fun() for fun in funcs}

result = [await fun() for fun in funcs if await smth]
result = {await fun() for fun in funcs if await smth}
result = {fun: await fun() for fun in funcs if await smth}

result = [await fun() async for fun in funcs]
result = {await fun() async for fun in funcs}
result = {fun: await fun() async for fun in funcs}

result = [await fun() async for fun in funcs if await smth]
result = {await fun() async for fun in funcs if await smth}
result = {fun: await fun() async for fun in funcs if await smth}

This is only valid in async def function body.

асинхронные генераторы

(PEP 525)

мы умеем легко писать генераторы:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
>>> def gen(n):
...     q=n
...     while q>0:
...             yield q
...             q = q//2
... 
>>> for y in gen(100500):
...     print(y)
... 
100500
50250
25125
12562
6281
3140
1570
785
392
196
98
49
24
12
6
3
1

несмотря на то, что на самом деле интерфейс генератора упорот и магическ: __iter__, __next__, вот это всё.

Ломающие новости! теперь асинхронные генераторы тоже можно так писать, не реализуя руками __aiter__() и остальные кишки.

(а еще они работают в 2 раза быстрее "ручной" реализации)

пример из пепа:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
async def ticker(delay, to):
    for i in range(to):
        yield i
        await asyncio.sleep(delay)

async def run():
    async for i in ticker(1, 10):
        print(i)

import asyncio
loop = asyncio.get_event_loop()
try:
    loop.run_until_complete(run())
finally:
    loop.close()

доп.чтение: PEP 533

два счетчика одновременно (а также вообще полностью дописанный пример асинхронного кода, который можно уже скопипастить в интерпретатор и посмотреть, как он работает, что настоятельно рекомендуется):

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
async def ticker(delay, to):
    for i in range(to):
        yield (i,delay)
        await asyncio.sleep(delay)
async def run(k):
    async for i in ticker(k, 10):
        print(i)
import asyncio

loop = asyncio.get_event_loop()
try:
    loop.run_until_complete(asyncio.gather(run(1),run(2)))
finally:
    loop.close()

результат:

(0, 2)
(0, 1)
(1, 1)
(1, 2)
(2, 1)
(3, 1)
(2, 2)
(4, 1)
(5, 1)
(3, 2)
(6, 1)
(7, 1)
(4, 2)
(8, 1)
(9, 1)
(5, 2)
(6, 2)
(7, 2)
(8, 2)
(9, 2)