blob: 254df70a5aac340643b5c08e61dcf5a781851f49 (
plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
|
try:
import asyncio
from asyncio.test_utils import run_once as _run_once
def run_once():
return _run_once(asyncio.get_event_loop())
except ImportError as e:
try:
import trollius as asyncio
except ImportError:
asyncio = None
def run_once():
'''
copied from asyncio.testutils because trollius has no
"testutils"
'''
# in Twisted, this method is a no-op
if asyncio is None:
return
# just like modern asyncio.testutils.run_once does it...
loop = asyncio.get_event_loop()
loop.stop()
loop.run_forever()
asyncio.gather(*asyncio.Task.all_tasks())
try:
# XXX fixme hack better way to detect twisted
# (has to work on py3 where asyncio exists always, though)
import twisted # noqa
def await(_):
return
except ImportError:
def await(future):
asyncio.get_event_loop().run_until_complete(future)
|