importasyncioimportatexitimportconcurrent.futuresimportthreadingfromcollections.abcimportAwaitable,CoroutinefromtypingimportAny,Literal,TypeVarT=TypeVar("T")classLoopManager:def__init__(self)->None:"""Initialize the loop manager."""self._loop_strategy:Literal["current","isolated","background_thread"]="current"self._shutting_down=Falseself._lock=threading.Lock()self._isolated_loop:asyncio.AbstractEventLoop|None=Noneself._background_loop:asyncio.AbstractEventLoop|None=Noneself._background_loop_thread:threading.Thread|None=Noneself._background_loop_result_timeout=30.0self._background_loop_result_max_retries=5def_get_background_loop(self)->asyncio.AbstractEventLoop:"""Get the dedicated background loop, starting it if necessary."""ifself._background_loopisnotNone:returnself._background_loopwithself._lock:self._background_loop=asyncio.new_event_loop()self._background_loop_thread=threading.Thread(target=self._run_background_loop,daemon=True,name="fastapi-injectable-daemon-thread")self._background_loop_thread.start()returnself._background_loopdef_run_background_loop(self)->None:# pragma: no cover"""Run the background loop forever."""assertself._background_loopisnotNone# noqa: S101asyncio.set_event_loop(self._background_loop)try:self._background_loop.run_forever()finally:ifnotself._shutting_down:self._background_loop.close()def_get_isolated_loop(self)->asyncio.AbstractEventLoop:"""Get the isolated loop, creating it if necessary."""ifself._isolated_loopisnotNone:returnself._isolated_loopwithself._lock:self._isolated_loop=asyncio.get_event_loop_policy().new_event_loop()returnself._isolated_loopdef_get_or_create_current_loop(self)->asyncio.AbstractEventLoop:"""Get or create event loop for current thread. Compatible with Python 3.12+ and 3.14+. Attempts to get loop via policy, falls back to creating new loop if RuntimeError is raised. Returns: Event loop instance. """try:returnasyncio.get_event_loop_policy().get_event_loop()exceptRuntimeError:returnasyncio.get_event_loop_policy().new_event_loop()@propertydefloop_strategy(self)->Literal["current","isolated","background_thread"]:"""Get the current setting for whether to use the current loop."""returnself._loop_strategydefset_loop_strategy(self,strategy:Literal["current","isolated","background_thread"])->None:"""Set the current setting for whether to use the current loop."""withself._lock:self._loop_strategy=strategydefget_loop(self)->asyncio.AbstractEventLoop:"""Get the appropriate event loop based on the configured strategy. Returns: The appropriate event loop based on strategy or input. """ifself._loop_strategy=="current":try:returnasyncio.get_running_loop()exceptRuntimeError:returnself._get_or_create_current_loop()ifself._loop_strategy=="isolated":returnself._get_isolated_loop()returnself._get_background_loop()def_wait_with_retries(self,future:concurrent.futures.Future[T])->T:retries=0max_retries=self._background_loop_result_max_retriestimeout=self._background_loop_result_timeoutwhileretries<max_retries:try:returnfuture.result(timeout=timeout)exceptconcurrent.futures.TimeoutError:# noqa: PERF203retries+=1future.cancel()msg=f"Operation timed out after {max_retries} attempts "f"(total {max_retries*timeout} seconds)"raiseTimeoutError(msg)defin_loop(self)->bool:loop=self.get_loop()try:running_loop=asyncio.get_running_loop()exceptRuntimeError:returnFalsereturnrunning_loopisloopdefrun_in_loop(self,awaitable:Awaitable[T])->T:"""Run coroutine in the appropriate loop. Args: awaitable: The awaitable to execute. Returns: The result of the awaitable execution. """loop=self.get_loop()ifself._loop_strategyin{"current","isolated"}:returnloop.run_until_complete(awaitable)ifasyncio.iscoroutine(awaitable):coro:Coroutine[Any,Any,T]=awaitable# pragma: no coverelse:# Prepare coroutine for run_coroutine_threadsafeasyncdef_wrapper()->T:returnawaitawaitable# pragma: no covercoro=_wrapper()returnself._wait_with_retries(asyncio.run_coroutine_threadsafe(coro,loop))defshutdown(self)->None:withself._lock:self._shutting_down=Trueifself._loop_strategy=="background_thread":assertself._background_loopisnotNone# noqa: S101assertself._background_loop_threadisnotNone# noqa: S101self._background_loop.call_soon_threadsafe(self._background_loop.stop)self._background_loop_thread.join(timeout=1)self._background_loop.close()ifself._loop_strategy=="isolated"andself._isolated_loopandnotself._isolated_loop.is_closed():self._isolated_loop.close()loop_manager=LoopManager()atexit.register(loop_manager.shutdown)
[docs]defrun_coroutine_sync(coro:Coroutine[Any,Any,T])->T:"""Run an async coroutine synchronously. Args: coro: The coroutine to execute. Returns: The result of the coroutine execution. Raises: Any exception raised by the coroutine or during execution. """returnloop_manager.run_in_loop(coro)# NOTE(Jasper Sui): This can run not only coroutine, but also Future and Task.