import asyncio
[docs]def sync_to_async(func):
async def wrap_async(*args, **kwargs):
loop = asyncio.get_running_loop()
def func_with_args():
func(*args, **kwargs)
return await loop.run_in_executor(None, func_with_args)
return wrap_async
[docs]@sync_to_async
def async_read(file_name, mode):
with open(file_name, mode) as f:
return f.read()
[docs]def run_async(coroutine):
loop = asyncio.get_event_loop()
if loop.is_running():
raise Exception(
"Asyncio event loop cannot be running in order to use run_async helper function."
)
return loop.run_until_complete(coroutine)