We’re dissecting an async MCP client that was built for one thing: refusing to hang, even when the server or transport misbehaves. The client lives in the fastmcp project, which provides a high-level interface over MCP transports like HTTP and stdio. At the center of that interface is client.Client, a facade that exposes simple methods such as async with client:, await client.ping(), and await client.complete() while hiding the messy reality of background tasks, timeouts, and cancellation.
I’m Mahmoud Zalt, an AI solutions architect. We’ll walk through how this client structures its session lifecycle, supports re-entrant context managers, and uses a watchdog pattern so RPCs fail fast instead of hanging forever. Along the way, we’ll extract practical patterns you can use to make your own async clients resilient under real-world failure.
The session lifecycle story
Within fastmcp, the Client class acts as the conductor for a single MCP session. It doesn’t do network I/O itself; it orchestrates transports, background tasks, and protocol calls so the public API stays small and predictable.
fastmcp/
client/
transports.py # Transport abstractions: HTTP, stdio, in-process
logging.py # Log handlers
sampling.py # Sampling handlers
roots.py # Roots/FS handlers
tasks.py # Task objects & notifications
progress.py # Progress handlers
mixins.py # Resources, prompts, tools, tasks APIs
client.py # <-- This file: session lifecycle, Client facade
client.Client
|-- uses --> ClientTransport (HTTP, stdio, in-process)
|-- owns --> ClientSessionState (session, lock, events, counters)
|-- composes --> Mixins for domain features
|-- delegates --> mcp.ClientSession for protocol methods
The core responsibility of Client is to manage one underlying ClientSession from the MCP SDK in a safe, reusable way. All the fragile details — cancellation, reconnection, coordination between background tasks — are pushed into a dedicated state object that is separate from configuration:
@dataclass
class ClientSessionState:
"""Holds all session-related state for a Client instance."""
session: ClientSession | None = None
nesting_counter: int = 0
lock: anyio.Lock = field(default_factory=anyio.Lock)
session_task: asyncio.Task | None = None
ready_event: anyio.Event = field(default_factory=anyio.Event)
stop_event: anyio.Event = field(default_factory=anyio.Event)
initialize_result: mcp.types.InitializeResult | None = None
This state object is the control panel for the connection:
session: the active MCP session, if any.nesting_counter: how manyasync with client:blocks are currently open.lock: a mutex that serializes all session lifecycle changes.session_task: the background task running the session loop.ready_event/stop_event: signals for “session is ready” and “please stop now”.initialize_result: cached MCP initialize result soinitialize()is idempotent.
With this structure, the story becomes straightforward: configure once, start a session in the background when it’s first needed, reuse that session across many contexts and calls, and shut it down safely when the last user is done.
Re-entrant contexts with a single session
One of the trickiest requirements is supporting re-entrant async context managers while still sharing a single underlying session. Code should be able to do this without spawning extra connections:
client = Client("http://localhost:8080")
async with client: # context A
# ... do some work ...
async with client: # nested context B
# ... do more work on the same session ...
...
Opening and closing the network connection on every __aenter__/__aexit__ would thrash connections and invite race conditions. Instead, the client treats contexts as references to a shared background worker. The key entry point is _connect(), which runs when entering the context:
async def _connect(self):
"""Establish or reuse a session connection."""
async with self._session_state.lock:
need_to_start = (
self._session_state.session_task is None
or self._session_state.session_task.done()
)
if need_to_start:
if self._session_state.nesting_counter != 0:
raise RuntimeError(
"Internal error: nesting counter should be 0 when "
"starting new session, got "
f"{self._session_state.nesting_counter}"
)
self._session_state.stop_event = anyio.Event()
self._session_state.ready_event = anyio.Event()
self._session_state.session_task = asyncio.create_task(
self._session_runner()
)
try:
await self._session_state.ready_event.wait()
except asyncio.CancelledError:
# ... cancellation cleanup and reset ...
raise
self._session_state.nesting_counter += 1
return self
Several design choices here directly protect against hangs and race conditions:
- All lifecycle decisions are under one lock. Starting or reusing a session is always done inside
self._session_state.lock, so two tasks can’t both decide they need to start a new session. - Reference counting via
nesting_counter. The first caller that seesneed_to_startas true creates the background session task and waits forready_event. Later callers inside the lock simply increment the counter and reuse the running session. - Events are tied to a specific session.
ready_eventandstop_eventare created exactly when a new session starts, inside the lock. That avoids the classic bug where one task waits forever on an old event that another task silently replaced. - Startup is cancellation-safe. If the caller cancels while waiting for
ready_event, they still hold the lock, which guarantees that cleanup ofsession_taskand transport state is consistent.
On the way out of a context, _disconnect() runs under the same lock:
async def _disconnect(self, force: bool = False):
"""Disconnect from session using reference counting."""
async with self._session_state.lock:
if force:
self._session_state.nesting_counter = 0
else:
self._session_state.nesting_counter = max(
0, self._session_state.nesting_counter - 1
)
if self._session_state.nesting_counter > 0:
return
if self._session_state.session_task is None:
return
self._session_state.stop_event.set()
await self._session_state.session_task
self._session_state.session_task = None
As long as the counter is positive, the session stays alive. When the last context exits and the counter drops to zero, the client sets stop_event and waits for the background task to shut down the session in one centralized place.
The watchdog pattern that stops hanging requests
Handling session lifecycle correctly is necessary but not sufficient. Many real-world hangs come from a different direction: the server fails, or the transport raises in a background loop, and the foreground coroutine that’s awaiting a response just never returns. Nothing crashes; it just waits forever.
This client addresses that with a small helper that’s central to its robustness: _await_with_session_monitoring. It acts as a watchdog around important RPCs, ensuring that background failures are surfaced quickly to callers.
async def _await_with_session_monitoring(
self, coro: Coroutine[Any, Any, ResultT]
) -> ResultT:
"""Await a coroutine while monitoring the session task for errors."""
session_task = self._session_state.session_task
if session_task is None:
return await coro
if session_task.done():
coro.close()
exc = session_task.exception()
if exc:
raise exc
raise RuntimeError("Session task completed unexpectedly")
call_task = asyncio.create_task(coro)
try:
done, _ = await asyncio.wait(
{call_task, session_task},
return_when=asyncio.FIRST_COMPLETED,
)
if session_task in done:
call_task.cancel()
with anyio.CancelScope(shield=True), suppress(asyncio.CancelledError):
await call_task
exc = session_task.exception()
if exc:
raise exc
raise RuntimeError("Session task completed unexpectedly")
return call_task.result()
except asyncio.CancelledError:
call_task.cancel()
with anyio.CancelScope(shield=True), suppress(asyncio.CancelledError):
await call_task
raise
In effect, every important RPC is raced against the session itself:
- Background failures are visible. Some transports surface HTTP errors (4xx/5xx) or protocol failures inside the session loop, not inside the waiting coroutine. Here, the client explicitly monitors the session task so those errors can’t be lost.
- Two-way race: RPC vs session. The helper spins up
call_taskfor the RPC, then waits until eithercall_taskorsession_taskcompletes. Whichever completes first determines the outcome. - If the session dies first, the RPC is cancelled and the session error is raised. The watchdog cancels
call_task, waits for it to clean up under a shielded cancel scope, then raises the session’s exception. The caller sees a clear failure instead of a permanent wait. - If the RPC finishes first, the result is returned normally. On the happy path, the watchdog is just a small amount of coordination overhead.
- Caller cancellation is handled explicitly. If the caller cancels,
call_taskis cancelled and drained before re-raisingCancelledError. That avoids orphaned tasks and warning spam.
This watchdog is then applied to the places where hangs would be most painful in production:
async def ping(self) -> bool:
"""Send a ping request."""
result = await self._await_with_session_monitoring(self.session.send_ping())
return isinstance(result, mcp.types.EmptyResult)
async def set_logging_level(self, level: mcp.types.LoggingLevel) -> None:
"""Send a logging/setLevel request."""
await self._await_with_session_monitoring(
self.session.set_logging_level(level)
)
async def complete_mcp(
self,
ref: mcp.types.ResourceTemplateReference | mcp.types.PromptReference,
argument: dict[str, str],
context_arguments: dict[str, Any] | None = None,
) -> mcp.types.CompleteResult:
logger.debug(f"[{self.name}] called complete: {ref}")
result = await self._await_with_session_monitoring(
self.session.complete(
ref=ref, argument=argument, context_arguments=context_arguments
)
)
return result
These methods — health checks, logging control, completions — are exactly where you cannot afford silent hangs. Wrapping them in the watchdog gives a strong invariant: if the session dies, your call won’t wait forever; it will fail loudly and promptly.
The audit of this client does note a few methods — such as cancel, progress, and send_roots_list_changed — that currently call self.session directly. Extending _await_with_session_monitoring to those would make the “no RPC ever hangs silently” story fully consistent.
Safety at scale: timeouts, metrics, and locks
The design choices above make a single client robust, but the code also anticipates operational scale: many concurrent calls, flaky networks, and long-lived processes. That’s reflected in how it uses timeouts, how it structures contention around the session lock, and how it’s meant to be instrumented.
Timeouts as explicit guardrails
The client uses two main kinds of timeouts:
- Per-request timeouts exposed as
read_timeout_secondsin_session_kwargsand handed to the transport, so individual reads don’t block indefinitely. - Initialization timeout applied in
initialize()viaanyio.fail_after, so the initial handshake can’t hang forever:
async def initialize(
self,
timeout: datetime.timedelta | float | int | None = None,
) -> mcp.types.InitializeResult:
if self.initialize_result is not None:
return self.initialize_result
if timeout is None:
timeout = self._init_timeout
else:
timeout = normalize_timeout_to_seconds(timeout)
try:
with anyio.fail_after(timeout):
self._session_state.initialize_result = await self.session.initialize()
return self._session_state.initialize_result
except TimeoutError as e:
raise RuntimeError("Failed to initialize server session") from e
This makes initialize() both idempotent and time-bounded. If the server never responds, callers still get control back with a meaningful error. Cleanup paths in __aexit__ and _connect similarly use short move_on_after windows to ensure shutdown logic itself can’t stall indefinitely.
Lock contention and client fan-out
The single _session_state.lock is deliberately the one place where contention is possible. Every _connect and _disconnect must acquire it to adjust nesting_counter and manage session_task. Under concurrency, that serializes short critical sections while keeping the session state machine coherent.
Two usage patterns fall naturally out of this design:
- Share a client; don’t recreate it per request. The client is intended to be created once per target server and reused. In steady state,
_connectusually just incrementsnesting_counterand returns quickly, so the lock is only held briefly. - Use
client.new()to add parallelism when you hit a bottleneck. When one session becomes a contention point,new()cheaply clones configuration but gives you a freshClientSessionStateand thus an independent session:
def new(self) -> Client[ClientTransportT]:
new_client = copy.copy(self)
if not isinstance(self.transport, StdioTransport):
new_client._session_state = ClientSessionState()
new_client.name += f":{secrets.token_hex(2)}"
return new_client
This is where the earlier separation of configuration and runtime state pays off directly: cloning configuration is trivial, and each clone gets its own lock, counters, and events without affecting the others.
Metrics that track your invariants
A design like this only fully pays off if you can see when its assumptions stop holding. The audit suggests a small set of metrics that map cleanly onto the invariants we’ve discussed:
| Metric | What it tells you | Typical target |
|---|---|---|
fastmcp_client_session_active |
Whether a client currently has an active session task and session | Gauge: 0 or 1 per client |
fastmcp_client_connect_latency_seconds |
Time from starting _connect to ready_event being set |
p95 < 1s for low-latency servers |
fastmcp_client_initialize_latency_seconds |
Duration of initialize() calls |
p95 well below configured init_timeout |
fastmcp_client_rpc_errors_total |
Exceptions surfaced via _await_with_session_monitoring |
Error ratio < 1% of RPCs |
fastmcp_client_session_restarts_total |
How often the background session gets restarted | Low under normal operation; investigate spikes |
If you adopt a similar background-session and watchdog architecture, pairing it with focused metrics like these gives early warning when latency, error rates, or session stability drift away from your design assumptions.
Lessons you can steal today
We’ve followed this MCP client from its session state object, through re-entrant context management, into watchdog-guarded RPCs, and out to timeouts, locks, and metrics. The core lesson is simple: design your async clients so they fail fast and visibly instead of hanging silently, even when transports or servers fail in awkward ways.
Here are concrete patterns you can lift into your own async libraries:
- Isolate configuration from runtime state. Keep a compact state object (like
ClientSessionState) that holds locks, counters, tasks, and events. That isolation makes cloning, resetting, and lifecycle reasoning far less error-prone. - Use a reference-counted background worker for shared connections. Treat
async with client:as “borrow a handle” to a long-lived session, not “open and close a socket every time”. A simple counter under a lock can model “who is still using this resource?” clearly. - Introduce a watchdog helper for long-running RPCs. When a session loop can fail independently of an individual call, explicitly race the RPC against the session task and propagate whichever fails first. This one pattern removes an entire class of hangs.
- Put explicit time limits on setup and teardown. Use constructs like
fail_afterand shortmove_on_afterwindows so that no phase of the client lifecycle can block indefinitely, even when the other side is broken. - Instrument the invariants you care about. Track whether sessions are active, how long connects and initializes take, how often RPCs fail via the watchdog, and how frequently sessions restart. Those metrics tell you when the system is drifting toward the conditions that cause hangs in the first place.
If you’re building async clients — for HTTP APIs, databases, or protocol layers like MCP — this design is a strong blueprint: keep the public surface area small and intuitive, but invest heavily in the internal machinery that ensures your clients never just sit there waiting forever.



