Source code for responder.background

import asyncio
import concurrent.futures
import inspect
import multiprocessing
import traceback

from starlette.concurrency import run_in_threadpool

__all__ = ["BackgroundQueue"]


[docs] class BackgroundQueue: """A queue for running tasks in background threads. Uses a ``ThreadPoolExecutor`` sized to the number of CPUs. Access it via ``api.background``. Usage:: # As a decorator — fire and forget @api.background.task def send_email(to, subject): ... send_email("user@example.com", "Hello") # Direct submission future = api.background.run(send_email, "user@example.com", "Hello") # As a callable (supports async functions) await api.background(send_email, "user@example.com", "Hello") """ def __init__(self, n=None): """Create a new background queue. :param n: Number of worker threads. Defaults to CPU count. """ if n is None: n = multiprocessing.cpu_count() self.n = n self.pool = concurrent.futures.ThreadPoolExecutor(max_workers=n) self.results = []
[docs] def run(self, f, *args, **kwargs): """Submit a function to run in a background thread. :param f: The function to run. :returns: A ``concurrent.futures.Future`` for the result. """ f = self.pool.submit(f, *args, **kwargs) self.results.append(f) return f
[docs] def task(self, f): """Decorator that wraps a function to run in the background thread pool. The decorated function returns a ``Future`` instead of blocking. Exceptions are printed to stderr via traceback. :param f: The function to wrap. """ def on_future_done(fs): try: fs.result() except Exception: traceback.print_exc() def do_task(*args, **kwargs): result = self.run(f, *args, **kwargs) result.add_done_callback(on_future_done) return result return do_task
async def __call__(self, func, *args, **kwargs) -> None: if inspect.iscoroutinefunction(func): return await asyncio.create_task(func(*args, **kwargs)) return await run_in_threadpool(func, *args, **kwargs)