Skip to main content

The Event Loop as a Single Source of Truth

Most event-driven systems scatter state across queues, caches, and threads. “The Event Loop as a Single Source of Truth” argues for one clear authority instead.

Code Cracking
25m read
#eventdriven#architecture#eventloop#concurrency
The Event Loop as a Single Source of Truth - Featured blog post image

MENTORING

1:1 engineering mentorship.

Architecture, AI systems, career growth. Ongoing or one-off.

We’re examining how Home Assistant’s core runtime treats the asyncio event loop as the single source of truth for everything that happens in the system. Home Assistant is an open‑source home automation platform where thousands of integrations, entities, and automations share one process and one event loop. At the center of that process is core.py, which behaves less like a bag of classes and more like a small operating system for the platform. I’m Mahmoud Zalt, an AI solutions architect, and we’ll use this module as a practical guide to designing resilient, event‑driven systems that stay healthy under load.

We’ll follow one thread: how every key abstraction—jobs, events, state, services, and shutdown—exists to protect and organize the event loop instead of fighting it. By the end, you should be able to look at your own event‑driven code and reshape it around a single, explicit concurrency boundary.

The Core Runtime as a Mini OS

To see how the event loop becomes the single source of truth, start with the structure of core.py. Instead of isolated utilities, you get a coordinated set of subsystems built around one asyncio loop.

homeassistant/
  core.py  (this file: central runtime)

Main object relationships:

  +---------------------+
  |     HomeAssistant   |
  |  - loop             |
  |  - _tasks           |
  |  - _background      |
  |  - state (CoreState)|
  +----------+----------+
             | owns
   +---------+---------+----------------+
   |                   |                |
+--v---------+   +-----v------+   +-----v-----------+
|  EventBus  |   | StateMachine|   | ServiceRegistry|
+------------+   +-------------+   +----------------+
      |                   |                |
      | fires Events      | manages States | executes ServiceCalls
      |                   |                |
   listeners         entity_id -> State   domain.service -> Service
One event loop, three main subsystems, one concurrency boundary.

The HomeAssistant object plays the kernel role. It owns the event loop, tracks foreground and background tasks, and coordinates startup and shutdown. Around it:

  • EventBus is the publish/subscribe backbone for everything that happens.
  • StateMachine stores entity state and emits semantic state events.
  • ServiceRegistry exposes operations that other parts of the system can call.
  • Context, Event, and State carry data and traceability through that loop.

Once you see this as a mini operating system, the design constraint becomes clear: every feature either keeps the event loop predictable—or risks stalling the whole city.

HassJob: Classifying Work for the Loop

If the loop is the source of truth, you can’t treat scheduled work as an opaque callable. The loop needs to know what kind of work it’s about to run. That’s the role of HassJob.

HassJob wraps a callable and pre‑classifies it as one of three types: coroutine function, callback (safe to run directly on the loop), or executor job (must go to a thread pool). The type is computed once and cached instead of recomputed at every dispatch.

@final
class HassJob[**_P, _R_co]:
    """Represent a job to be run later."""

    __slots__ = ("_cache", "_cancel_on_shutdown", "name", "target")

    def __init__(
        self,
        target: Callable[_P, _R_co],
        name: str | None = None,
        *,
        cancel_on_shutdown: bool | None = None,
        job_type: HassJobType | None = None,
    ) -> None:
        self.target: Final = target
        self.name = name
        self._cancel_on_shutdown = cancel_on_shutdown
        self._cache: dict[str, Any] = {}
        if job_type:
            self._cache["job_type"] = job_type

    @under_cached_property
    def job_type(self) -> HassJobType:
        return get_hassjob_callable_job_type(self.target)

This small abstraction buys a lot of control over the loop:

  • Fast hot paths: The event bus and service registry don’t waste time re‑inspecting callables on every dispatch.
  • Deterministic routing: The runtime knows whether to await a coroutine, invoke a synchronous callback on the loop, or send work to an executor.
  • Lifecycle hooks: The cancel_on_shutdown flag lets shutdown orchestrate which scheduled jobs to cancel and which to let complete.

Events and State: Flow vs Truth

With jobs defined, the next task is moving information through the system without compromising the loop. Home Assistant does this with a defensive event bus and a disciplined state machine that clearly separate “what flowed” from “what is true.”

The Event Bus: Containing Fan‑Out and Failure

The EventBus acts like a radio station: components listen to event types (channels), and the bus broadcasts events to all relevant listeners. One method—async_fire_internal—handles the dispatch loop:

@callback
def async_fire_internal(
    self,
    event_type: EventType[_DataT] | str,
    event_data: _DataT | None = None,
    origin: EventOrigin = EventOrigin.local,
    context: Context | None = None,
    time_fired: float | None = None,
) -> None:
    listeners = self._listeners.get(event_type, EMPTY_LIST)
    if event_type not in EVENTS_EXCLUDED_FROM_MATCH_ALL:
        match_all_listeners = self._match_all_listeners
    else:
        match_all_listeners = EMPTY_LIST

    event: Event[_DataT] | None = None
    for job, event_filter in listeners + match_all_listeners:
        if event_filter is not None:
            try:
                if event_data is None or not event_filter(event_data):
                    continue
            except Exception:
                _LOGGER.exception("Error in event filter")
                continue

        if not event:
            event = Event(
                event_type,
                event_data,
                origin,
                time_fired,
                context,
            )

        try:
            self._hass.async_run_hass_job(job, event)
        except Exception:
            _LOGGER.exception("Error running job: %s", job)

The loop remains the source of truth because dispatch is structured around a few rules:

  • Lazy event construction: The Event object is only created if at least one listener will use it. No listeners, no allocation.
  • Filter isolation: Listener filters can fail without poisoning the bus. Exceptions are logged and skipped so one bad integration doesn’t stall the global event path.
  • Controlled fan‑out: Some high‑volume events are excluded from the MATCH_ALL scanner channel to avoid accidental “listen to everything” subscribers overwhelming the loop.

Thread boundaries are explicit: synchronous callers use fire(), which jumps into the loop via call_soon_threadsafe; async callers use async_fire(), which asserts that you’re already on the loop and then calls async_fire_internal(). All mutation of bus internals happens on the loop, not across threads.

The State Machine: Change vs Report

Events describe what happened; the state machine describes what is. Home Assistant’s key design choice here is to distinguish a real state change from a repeated state report.

A change means the value or attributes genuinely differ. A report means “I’m still the same” and updates monitoring metadata without changing the semantic state. This distinction becomes critical for automations, history, and performance.

The core logic lives in async_set_internal:

@callback
def async_set_internal(
    self,
    entity_id: str,
    new_state: str,
    attributes: Mapping[str, Any] | None,
    force_update: bool,
    context: Context | None,
    state_info: StateInfo | None,
    timestamp: float,
) -> None:
    # ... compute same_state / same_attr vs old_state ...
    now = dt_util.utc_from_timestamp(timestamp)

    if context is None:
        context = Context(id=ulid_at_time(timestamp))

    if same_state and same_attr:
        old_last_reported = old_state.last_reported  # type: ignore[union-attr]
        old_state.last_reported = now  # type: ignore[union-attr]
        old_state._cache["last_reported_timestamp"] = timestamp  # type: ignore[union-attr]
        self._bus.async_fire_internal(
            EVENT_STATE_REPORTED,
            {
                "entity_id": entity_id,
                "last_reported": now,
                "old_last_reported": old_last_reported,
                "new_state": old_state,
            },
            context=context,
            time_fired=timestamp,
        )
        return

    if same_attr:
        attributes = old_state.attributes

    if not same_state and len(new_state) > MAX_LENGTH_STATE_STATE:
        _LOGGER.error(
            "State %s for %s is longer than %s, falling back to %s",
            new_state,
            entity_id,
            MAX_LENGTH_STATE_STATE,
            STATE_UNKNOWN,
        )
        new_state = STATE_UNKNOWN

    state = State(
        entity_id,
        new_state,
        attributes,
        last_changed,
        now,
        now,
        context,
        old_state is None,
        state_info,
        timestamp,
    )
    if old_state is not None:
        old_state.expire()
    self._states[entity_id] = state
    self._bus.async_fire_internal(
        EVENT_STATE_CHANGED,
        {
            "entity_id": entity_id,
            "old_state": old_state,
            "new_state": state,
        },
        context=context,
        time_fired=timestamp,
    )

The loop remains authoritative because:

  • Semantic events: EVENT_STATE_CHANGED and EVENT_STATE_REPORTED encode intent. Consumers can cheaply ignore reports when they only care about changes.
  • Disciplined mutation: For reports, the existing State is updated in place for timing data only. For changes, a new State replaces the old one and the old object is explicitly expired.
  • Input constraints at the boundary: Over‑long state strings are logged and coerced to STATE_UNKNOWN instead of being allowed to break the loop.

The read path is optimized as well: a States container maintains a domain index (domain -> entity_id -> State), and expensive conversions like timestamps and JSON fragments are cached. The loop remains the single source of truth, but everything around it is tuned for “many readers, frequent writes.”

Services and Shutdown on One Loop

So far we have structure for what flows (events) and what’s true (state). Two more system‑level concerns must still respect the same event loop boundary: how commands execute, and how the whole process shuts down.

Services as Commands With Contracts

The ServiceRegistry acts like a phone book of commands: each domain.service maps to a handler with validation rules and response semantics. The async_call method is where those semantics are enforced around the loop.

async def async_call(
    self,
    domain: str,
    service: str,
    service_data: dict[str, Any] | None = None,
    blocking: bool = False,
    context: Context | None = None,
    target: dict[str, Any] | None = None,
    return_response: bool = False,
) -> ServiceResponse:
    context = context or Context()
    service_data = service_data or {}

    try:
        handler = self._services[domain][service]
    except KeyError:
        domain = domain.lower()
        service = service.lower()
        try:
            handler = self._services[domain][service]
        except KeyError:
            raise ServiceNotFound(domain, service) from None

    if return_response:
        if not blocking:
            raise ServiceValidationError(
                translation_domain=DOMAIN,
                translation_key="service_should_be_blocking",
                translation_placeholders={
                    "return_response": "return_response=True",
                    "non_blocking_argument": "blocking=False",
                },
            )
        if handler.supports_response is SupportsResponse.NONE:
            raise ServiceValidationError(
                translation_domain=DOMAIN,
                translation_key="service_does_not_support_response",
                translation_placeholders={
                    "return_response": "return_response=True"
                },
            )
    elif handler.supports_response is SupportsResponse.ONLY:
        raise ServiceValidationError(
            translation_domain=DOMAIN,
            translation_key="service_lacks_response_request",
            translation_placeholders={"return_response": "return_response=True"},
        )

    # ... schema validation, fire EVENT_CALL_SERVICE ...

    coro = self._execute_service(handler, service_call)
    if not blocking:
        self._hass.async_create_task_internal(
            self._run_service_call_catch_exceptions(coro, service_call),
            f"service call background {service_call.domain}.{service_call.service}",
            eager_start=True,
        )
        return None

    response_data = await coro
    if not return_response:
        return None
    if not isinstance(response_data, dict):
        raise HomeAssistantError(
            translation_domain=DOMAIN,
            translation_key="service_reponse_invalid",
            translation_placeholders={
                "response_data_type": str(type(response_data))
            },
        )
    return response_data

The SupportsResponse enum encodes the contract:

  • NONE: fire‑and‑forget; callers must not ask for a response.
  • OPTIONAL: callers may ask for a response, trading latency for information.
  • ONLY: callers must ask for a response; the service is essentially a read operation.

Requests that violate the contract raise ServiceValidationError early, before the service logic runs. Combined with voluptuous schemas for service_data, the registry turns untyped service calls into well‑behaved commands that respect the loop’s capacity.

Execution itself feeds back into the job machinery: coroutine handlers are awaited directly, callbacks run on the loop, and blocking work is pushed into an executor through async_add_executor_job. Errors in background service calls are caught and logged without disrupting other tasks on the loop.

Shutdown as a First‑Class Workflow

Finally, shutdown. Many daemons treat it as an afterthought; HomeAssistant.async_stop does the opposite. Shutdown is a staged workflow with explicit events, timeouts, and coordination across jobs and services.

The method orchestrates four main stages:

  1. Run shutdown jobs: Execute registered HassJob shutdown hooks within a bounded timeout.
  2. Stop integrations: Fire EVENT_HOMEASSISTANT_STOP, cancel background tasks, and wait for foreground tasks to finish within another timeout.
  3. Final write: Fire EVENT_HOMEASSISTANT_FINAL_WRITE so recorders and integrations can flush data.
  4. Close: Fire EVENT_HOMEASSISTANT_CLOSE, drain callbacks, shut down executors, and finally mark the core state as stopped.

Each stage uses helpers to log slow or stuck tasks and wraps waits in TimeoutManager.async_timeout to keep progress moving even when integrations misbehave. Just before the final close, the code calls shutdown_run_callback_threadsafe(self.loop) to prevent new cross‑thread callbacks from being scheduled onto a loop that is effectively finished.

Design Patterns You Can Reuse

The through‑line in this module is simple but strict: the asyncio event loop is the single source of truth, and every abstraction exists to protect, structure, or observe it. Jobs classify work for the loop, the event bus contains fan‑out and failure, the state machine separates change from report, services encode contracts, and shutdown is a controlled sequence on that same loop.

Here are concrete patterns you can apply in your own event‑driven systems:

  1. Add a job layer between your APIs and the event loop. Wrap callables in an object that pre‑classifies them (coroutine, callback, executor) and carries metadata like names and shutdown behavior. This keeps scheduling logic simple and gives you a single place to manage lifecycle.
  2. Design your event bus defensively. Lazily construct event objects, isolate listener failures, and explicitly control which events can reach global “listen to everything” subscribers. The goal is to keep the dispatch loop fast and robust, regardless of integration quality.
  3. Model state with semantics, not just blobs. Emit distinct events for meaningful changes vs repeated reports, and treat your in‑memory state store as a constrained database. Consumers and performance both benefit from that additional structure.
  4. Treat services as commands with contracts. Use schemas and enums to encode what a service accepts and whether it supports responses, and enforce those rules up front. That discipline prevents poorly designed services from quietly harming your loop.
  5. Make shutdown a first‑class workflow. Break it into stages, define events for each, set explicit timeouts, and lock out new cross‑thread callbacks once you’re past the point of no return. This is how you keep a complex runtime from getting stuck in “almost stopped.”

When you look at your own system, ask one question: where is the real source of truth for concurrency and ordering? Once you’ve named that boundary—often an event loop—shape your jobs, events, state, services, and shutdown around it the way Home Assistant does. The payoff is a platform that remains both flexible and predictable, even as more features and integrations pile on.

Full Source Code

Direct source from the upstream repository. Preview it inline or open it on GitHub.

homeassistant/core.py

home-assistant/core • dev

Choose one action below.

Open on GitHub

Thanks for reading! I hope this was useful. If you have questions or thoughts, feel free to reach out.

Content Creation Process: This article was generated via a semi-automated workflow using AI tools. I prepared the strategic framework, including specific prompts and data sources. From there, the automation system conducted the research, analysis, and writing. The content passed through automated verification steps before being finalized and published without manual intervention.

Get a Personal AI Assistant

Hire an AI assistant for scheduling, reminders, inbox triage, daily coordination and more. No-code setup, fully customizable, and ready to help you save time and stay organized. Works 24/7 without breaks or burnout.

Mahmoud Zalt

About the Author

I’m Zalt, a technologist with 16+ years of experience, passionate about designing and building AI systems that move us closer to a world where machines handle everything and humans reclaim wonder.

Let's connect if you're working on interesting AI projects, looking for technical advice or want to discuss anything.

Support this content

Share this article

CONSULTING

AI consulting. Strategy to production.

Architecture, implementation, team guidance.