We’re examining how Home Assistant’s core runtime treats the asyncio event loop as the single source of truth for everything that happens in the system. Home Assistant is an open‑source home automation platform where thousands of integrations, entities, and automations share one process and one event loop. At the center of that process is core.py, which behaves less like a bag of classes and more like a small operating system for the platform. I’m Mahmoud Zalt, an AI solutions architect, and we’ll use this module as a practical guide to designing resilient, event‑driven systems that stay healthy under load.
We’ll follow one thread: how every key abstraction—jobs, events, state, services, and shutdown—exists to protect and organize the event loop instead of fighting it. By the end, you should be able to look at your own event‑driven code and reshape it around a single, explicit concurrency boundary.
The Core Runtime as a Mini OS
To see how the event loop becomes the single source of truth, start with the structure of core.py. Instead of isolated utilities, you get a coordinated set of subsystems built around one asyncio loop.
homeassistant/
core.py (this file: central runtime)
Main object relationships:
+---------------------+
| HomeAssistant |
| - loop |
| - _tasks |
| - _background |
| - state (CoreState)|
+----------+----------+
| owns
+---------+---------+----------------+
| | |
+--v---------+ +-----v------+ +-----v-----------+
| EventBus | | StateMachine| | ServiceRegistry|
+------------+ +-------------+ +----------------+
| | |
| fires Events | manages States | executes ServiceCalls
| | |
listeners entity_id -> State domain.service -> Service
The HomeAssistant object plays the kernel role. It owns the event loop, tracks foreground and background tasks, and coordinates startup and shutdown. Around it:
EventBusis the publish/subscribe backbone for everything that happens.StateMachinestores entity state and emits semantic state events.ServiceRegistryexposes operations that other parts of the system can call.Context,Event, andStatecarry data and traceability through that loop.
Once you see this as a mini operating system, the design constraint becomes clear: every feature either keeps the event loop predictable—or risks stalling the whole city.
HassJob: Classifying Work for the Loop
If the loop is the source of truth, you can’t treat scheduled work as an opaque callable. The loop needs to know what kind of work it’s about to run. That’s the role of HassJob.
HassJob wraps a callable and pre‑classifies it as one of three types: coroutine function, callback (safe to run directly on the loop), or executor job (must go to a thread pool). The type is computed once and cached instead of recomputed at every dispatch.
@final
class HassJob[**_P, _R_co]:
"""Represent a job to be run later."""
__slots__ = ("_cache", "_cancel_on_shutdown", "name", "target")
def __init__(
self,
target: Callable[_P, _R_co],
name: str | None = None,
*,
cancel_on_shutdown: bool | None = None,
job_type: HassJobType | None = None,
) -> None:
self.target: Final = target
self.name = name
self._cancel_on_shutdown = cancel_on_shutdown
self._cache: dict[str, Any] = {}
if job_type:
self._cache["job_type"] = job_type
@under_cached_property
def job_type(self) -> HassJobType:
return get_hassjob_callable_job_type(self.target)
This small abstraction buys a lot of control over the loop:
- Fast hot paths: The event bus and service registry don’t waste time re‑inspecting callables on every dispatch.
- Deterministic routing: The runtime knows whether to
awaita coroutine, invoke a synchronous callback on the loop, or send work to an executor. - Lifecycle hooks: The
cancel_on_shutdownflag lets shutdown orchestrate which scheduled jobs to cancel and which to let complete.
Events and State: Flow vs Truth
With jobs defined, the next task is moving information through the system without compromising the loop. Home Assistant does this with a defensive event bus and a disciplined state machine that clearly separate “what flowed” from “what is true.”
The Event Bus: Containing Fan‑Out and Failure
The EventBus acts like a radio station: components listen to event types (channels), and the bus broadcasts events to all relevant listeners. One method—async_fire_internal—handles the dispatch loop:
@callback
def async_fire_internal(
self,
event_type: EventType[_DataT] | str,
event_data: _DataT | None = None,
origin: EventOrigin = EventOrigin.local,
context: Context | None = None,
time_fired: float | None = None,
) -> None:
listeners = self._listeners.get(event_type, EMPTY_LIST)
if event_type not in EVENTS_EXCLUDED_FROM_MATCH_ALL:
match_all_listeners = self._match_all_listeners
else:
match_all_listeners = EMPTY_LIST
event: Event[_DataT] | None = None
for job, event_filter in listeners + match_all_listeners:
if event_filter is not None:
try:
if event_data is None or not event_filter(event_data):
continue
except Exception:
_LOGGER.exception("Error in event filter")
continue
if not event:
event = Event(
event_type,
event_data,
origin,
time_fired,
context,
)
try:
self._hass.async_run_hass_job(job, event)
except Exception:
_LOGGER.exception("Error running job: %s", job)
The loop remains the source of truth because dispatch is structured around a few rules:
- Lazy event construction: The
Eventobject is only created if at least one listener will use it. No listeners, no allocation. - Filter isolation: Listener filters can fail without poisoning the bus. Exceptions are logged and skipped so one bad integration doesn’t stall the global event path.
- Controlled fan‑out: Some high‑volume events are excluded from the
MATCH_ALLscanner channel to avoid accidental “listen to everything” subscribers overwhelming the loop.
Thread boundaries are explicit: synchronous callers use fire(), which jumps into the loop via call_soon_threadsafe; async callers use async_fire(), which asserts that you’re already on the loop and then calls async_fire_internal(). All mutation of bus internals happens on the loop, not across threads.
The State Machine: Change vs Report
Events describe what happened; the state machine describes what is. Home Assistant’s key design choice here is to distinguish a real state change from a repeated state report.
A change means the value or attributes genuinely differ. A report means “I’m still the same” and updates monitoring metadata without changing the semantic state. This distinction becomes critical for automations, history, and performance.
The core logic lives in async_set_internal:
@callback
def async_set_internal(
self,
entity_id: str,
new_state: str,
attributes: Mapping[str, Any] | None,
force_update: bool,
context: Context | None,
state_info: StateInfo | None,
timestamp: float,
) -> None:
# ... compute same_state / same_attr vs old_state ...
now = dt_util.utc_from_timestamp(timestamp)
if context is None:
context = Context(id=ulid_at_time(timestamp))
if same_state and same_attr:
old_last_reported = old_state.last_reported # type: ignore[union-attr]
old_state.last_reported = now # type: ignore[union-attr]
old_state._cache["last_reported_timestamp"] = timestamp # type: ignore[union-attr]
self._bus.async_fire_internal(
EVENT_STATE_REPORTED,
{
"entity_id": entity_id,
"last_reported": now,
"old_last_reported": old_last_reported,
"new_state": old_state,
},
context=context,
time_fired=timestamp,
)
return
if same_attr:
attributes = old_state.attributes
if not same_state and len(new_state) > MAX_LENGTH_STATE_STATE:
_LOGGER.error(
"State %s for %s is longer than %s, falling back to %s",
new_state,
entity_id,
MAX_LENGTH_STATE_STATE,
STATE_UNKNOWN,
)
new_state = STATE_UNKNOWN
state = State(
entity_id,
new_state,
attributes,
last_changed,
now,
now,
context,
old_state is None,
state_info,
timestamp,
)
if old_state is not None:
old_state.expire()
self._states[entity_id] = state
self._bus.async_fire_internal(
EVENT_STATE_CHANGED,
{
"entity_id": entity_id,
"old_state": old_state,
"new_state": state,
},
context=context,
time_fired=timestamp,
)
The loop remains authoritative because:
- Semantic events:
EVENT_STATE_CHANGEDandEVENT_STATE_REPORTEDencode intent. Consumers can cheaply ignore reports when they only care about changes. - Disciplined mutation: For reports, the existing
Stateis updated in place for timing data only. For changes, a newStatereplaces the old one and the old object is explicitly expired. - Input constraints at the boundary: Over‑long state strings are logged and coerced to
STATE_UNKNOWNinstead of being allowed to break the loop.
The read path is optimized as well: a States container maintains a domain index (domain -> entity_id -> State), and expensive conversions like timestamps and JSON fragments are cached. The loop remains the single source of truth, but everything around it is tuned for “many readers, frequent writes.”
Services and Shutdown on One Loop
So far we have structure for what flows (events) and what’s true (state). Two more system‑level concerns must still respect the same event loop boundary: how commands execute, and how the whole process shuts down.
Services as Commands With Contracts
The ServiceRegistry acts like a phone book of commands: each domain.service maps to a handler with validation rules and response semantics. The async_call method is where those semantics are enforced around the loop.
async def async_call(
self,
domain: str,
service: str,
service_data: dict[str, Any] | None = None,
blocking: bool = False,
context: Context | None = None,
target: dict[str, Any] | None = None,
return_response: bool = False,
) -> ServiceResponse:
context = context or Context()
service_data = service_data or {}
try:
handler = self._services[domain][service]
except KeyError:
domain = domain.lower()
service = service.lower()
try:
handler = self._services[domain][service]
except KeyError:
raise ServiceNotFound(domain, service) from None
if return_response:
if not blocking:
raise ServiceValidationError(
translation_domain=DOMAIN,
translation_key="service_should_be_blocking",
translation_placeholders={
"return_response": "return_response=True",
"non_blocking_argument": "blocking=False",
},
)
if handler.supports_response is SupportsResponse.NONE:
raise ServiceValidationError(
translation_domain=DOMAIN,
translation_key="service_does_not_support_response",
translation_placeholders={
"return_response": "return_response=True"
},
)
elif handler.supports_response is SupportsResponse.ONLY:
raise ServiceValidationError(
translation_domain=DOMAIN,
translation_key="service_lacks_response_request",
translation_placeholders={"return_response": "return_response=True"},
)
# ... schema validation, fire EVENT_CALL_SERVICE ...
coro = self._execute_service(handler, service_call)
if not blocking:
self._hass.async_create_task_internal(
self._run_service_call_catch_exceptions(coro, service_call),
f"service call background {service_call.domain}.{service_call.service}",
eager_start=True,
)
return None
response_data = await coro
if not return_response:
return None
if not isinstance(response_data, dict):
raise HomeAssistantError(
translation_domain=DOMAIN,
translation_key="service_reponse_invalid",
translation_placeholders={
"response_data_type": str(type(response_data))
},
)
return response_data
The SupportsResponse enum encodes the contract:
NONE: fire‑and‑forget; callers must not ask for a response.OPTIONAL: callers may ask for a response, trading latency for information.ONLY: callers must ask for a response; the service is essentially a read operation.
Requests that violate the contract raise ServiceValidationError early, before the service logic runs. Combined with voluptuous schemas for service_data, the registry turns untyped service calls into well‑behaved commands that respect the loop’s capacity.
Execution itself feeds back into the job machinery: coroutine handlers are awaited directly, callbacks run on the loop, and blocking work is pushed into an executor through async_add_executor_job. Errors in background service calls are caught and logged without disrupting other tasks on the loop.
Shutdown as a First‑Class Workflow
Finally, shutdown. Many daemons treat it as an afterthought; HomeAssistant.async_stop does the opposite. Shutdown is a staged workflow with explicit events, timeouts, and coordination across jobs and services.
The method orchestrates four main stages:
- Run shutdown jobs: Execute registered
HassJobshutdown hooks within a bounded timeout. - Stop integrations: Fire
EVENT_HOMEASSISTANT_STOP, cancel background tasks, and wait for foreground tasks to finish within another timeout. - Final write: Fire
EVENT_HOMEASSISTANT_FINAL_WRITEso recorders and integrations can flush data. - Close: Fire
EVENT_HOMEASSISTANT_CLOSE, drain callbacks, shut down executors, and finally mark the core state asstopped.
Each stage uses helpers to log slow or stuck tasks and wraps waits in TimeoutManager.async_timeout to keep progress moving even when integrations misbehave. Just before the final close, the code calls shutdown_run_callback_threadsafe(self.loop) to prevent new cross‑thread callbacks from being scheduled onto a loop that is effectively finished.
Design Patterns You Can Reuse
The through‑line in this module is simple but strict: the asyncio event loop is the single source of truth, and every abstraction exists to protect, structure, or observe it. Jobs classify work for the loop, the event bus contains fan‑out and failure, the state machine separates change from report, services encode contracts, and shutdown is a controlled sequence on that same loop.
Here are concrete patterns you can apply in your own event‑driven systems:
- Add a job layer between your APIs and the event loop. Wrap callables in an object that pre‑classifies them (coroutine, callback, executor) and carries metadata like names and shutdown behavior. This keeps scheduling logic simple and gives you a single place to manage lifecycle.
- Design your event bus defensively. Lazily construct event objects, isolate listener failures, and explicitly control which events can reach global “listen to everything” subscribers. The goal is to keep the dispatch loop fast and robust, regardless of integration quality.
- Model state with semantics, not just blobs. Emit distinct events for meaningful changes vs repeated reports, and treat your in‑memory state store as a constrained database. Consumers and performance both benefit from that additional structure.
- Treat services as commands with contracts. Use schemas and enums to encode what a service accepts and whether it supports responses, and enforce those rules up front. That discipline prevents poorly designed services from quietly harming your loop.
- Make shutdown a first‑class workflow. Break it into stages, define events for each, set explicit timeouts, and lock out new cross‑thread callbacks once you’re past the point of no return. This is how you keep a complex runtime from getting stuck in “almost stopped.”
When you look at your own system, ask one question: where is the real source of truth for concurrency and ordering? Once you’ve named that boundary—often an event loop—shape your jobs, events, state, services, and shutdown around it the way Home Assistant does. The payoff is a platform that remains both flexible and predictable, even as more features and integrations pile on.





