summaryrefslogtreecommitdiff
path: root/dev-python/txaio/files/util.py
blob: 254df70a5aac340643b5c08e61dcf5a781851f49 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
try:
    import asyncio
    from asyncio.test_utils import run_once as _run_once

    def run_once():
        return _run_once(asyncio.get_event_loop())

except ImportError as e:
    try:
        import trollius as asyncio
    except ImportError:
        asyncio = None

    def run_once():
        '''
        copied from asyncio.testutils because trollius has no
        "testutils"
        '''
        # in Twisted, this method is a no-op
        if asyncio is None:
            return

        # just like modern asyncio.testutils.run_once does it...
        loop = asyncio.get_event_loop()
        loop.stop()
        loop.run_forever()
        asyncio.gather(*asyncio.Task.all_tasks())


try:
    # XXX fixme hack better way to detect twisted
    # (has to work on py3 where asyncio exists always, though)
    import twisted  # noqa

    def await(_):
        return

except ImportError:
    def await(future):
        asyncio.get_event_loop().run_until_complete(future)