When changing concurrency/background execution, ensure (a) state is isolated per thread/consumer/instance, and (b) shutdown/cleanup and exception signaling are coordinated with explicit synchronization.
Standards 1) Isolate per-thread/per-consumer resources
threading.local() (or thread-specific state) as a module/class attribute; prefer per-Celery/backend instance or initialize thread-local fields during start().2) Coordinate shutdown with Events and correct lifecycle ownership
threading.Event()/similar primitives to signal stop/shutdown.3) Make exception propagation race-safe
finally, then read the exception after the event).4) Always close/reconnect thread-bound resources
Example pattern
import threading
class BackgroundWorker:
def __init__(self):
self._stop = threading.Event()
self._shutdown = threading.Event()
self._exc = None
self._thread_local = threading.local() # instance-scoped
def start(self):
if getattr(self._thread_local, "started", False):
return
self._thread_local.started = True
self._thread = threading.Thread(target=self._run, daemon=True)
self._thread.start()
def _run(self):
try:
while not self._stop.is_set():
# do work...
pass
except Exception as e:
self._exc = e
finally:
# single point to publish shutdown state
self._shutdown.set()
# close thread-bound resources here
def stop(self):
self._stop.set()
self._shutdown.wait(timeout=10)
if self._exc is not None:
raise self._exc
Code-review checklist
threading.local()? (Fix to per-instance/per-thread.)Events, and is shared state read only after synchronization?Enter the URL of a public GitHub repository