invoke version: >= 1.1
Code 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
# filename=tasks.py
import asyncio
from inspect import isgeneratorfunction , iscoroutinefunction
from invoke import Task
from invoke.tasks import task
class InvokeWrapper ( Task ):
def __call__ ( self , * args , ** kwargs ):
io_loop = asyncio . get_event_loop ()
if isgeneratorfunction ( self . body ):
result = io_loop . run_until_complete (
asyncio . coroutine ( self . body )( * args , ** kwargs )
)
elif iscoroutinefunction ( self . body ):
result = io_loop . run_until_complete ( self . body ( * args , ** kwargs ))
else :
result = self . body ( * args , ** kwargs )
self . times_called += 1
return result
@task ( klass = InvokeWrapper )
def foo ( ctx ):
"""sync task"""
print ( "foo" )
@task ( klass = InvokeWrapper )
async def bar ( ctx ):
"""async/await style async task"""
await asyncio . sleep ( 0.1 )
print ( "bar" )
@task ( klass = InvokeWrapper )
def baz ( ctx ):
"""yield from(< py3.6) style async task"""
yield from asyncio . sleep ( 0.1 )
print ( "baz" )
Test 1
2
3
4
5
6
7
8
9
10
11
~ inv -l
Available tasks:
bar async/await style async task
baz yield from( < py3.6) style async task
foo sync task
~ inv bar| foo| baz
bar
baz
foo
# NOTE: I am not responsible for any expired content.
create@2021-11-13T12:26:24+08:00
update@2021-11-13T12:27:13+08:00
comment@https://github.com/ferstar/blog/issues/49