Skip to main content
المدونة

Zalt Blog

Deep Dives into Code & Architecture

AT SCALE

When Async Clients Refuse To Hang

By محمود الزلط
Code Cracking
25m read
<

When async clients refuse to hang, everything about reliability changes. If you care about async behavior under failure, this one’s worth a read.

/>
When Async Clients Refuse To Hang - Featured blog post image

CONSULTING

Got a specific async or networking problem?

Timeout tuning, retry strategies, connection pooling — bring your exact situation and get a clear fix in one session.

We’re dissecting an async MCP client that was built for one thing: refusing to hang, even when the server or transport misbehaves. The client lives in the fastmcp project, which provides a high-level interface over MCP transports like HTTP and stdio. At the center of that interface is client.Client, a facade that exposes simple methods such as async with client:, await client.ping(), and await client.complete() while hiding the messy reality of background tasks, timeouts, and cancellation.

I’m Mahmoud Zalt, an AI solutions architect. We’ll walk through how this client structures its session lifecycle, supports re-entrant context managers, and uses a watchdog pattern so RPCs fail fast instead of hanging forever. Along the way, we’ll extract practical patterns you can use to make your own async clients resilient under real-world failure.

The session lifecycle story

Within fastmcp, the Client class acts as the conductor for a single MCP session. It doesn’t do network I/O itself; it orchestrates transports, background tasks, and protocol calls so the public API stays small and predictable.

fastmcp/
  client/
    transports.py        # Transport abstractions: HTTP, stdio, in-process
    logging.py           # Log handlers
    sampling.py          # Sampling handlers
    roots.py             # Roots/FS handlers
    tasks.py             # Task objects & notifications
    progress.py          # Progress handlers
    mixins.py            # Resources, prompts, tools, tasks APIs
    client.py            # <-- This file: session lifecycle, Client facade

client.Client
  |-- uses --> ClientTransport (HTTP, stdio, in-process)
  |-- owns --> ClientSessionState (session, lock, events, counters)
  |-- composes --> Mixins for domain features
  |-- delegates --> mcp.ClientSession for protocol methods
Where the Client sits in the fastmcp ecosystem.

The core responsibility of Client is to manage one underlying ClientSession from the MCP SDK in a safe, reusable way. All the fragile details — cancellation, reconnection, coordination between background tasks — are pushed into a dedicated state object that is separate from configuration:

@dataclass
class ClientSessionState:
    """Holds all session-related state for a Client instance."""

    session: ClientSession | None = None
    nesting_counter: int = 0
    lock: anyio.Lock = field(default_factory=anyio.Lock)
    session_task: asyncio.Task | None = None
    ready_event: anyio.Event = field(default_factory=anyio.Event)
    stop_event: anyio.Event = field(default_factory=anyio.Event)
    initialize_result: mcp.types.InitializeResult | None = None

This state object is the control panel for the connection:

  • session: the active MCP session, if any.
  • nesting_counter: how many async with client: blocks are currently open.
  • lock: a mutex that serializes all session lifecycle changes.
  • session_task: the background task running the session loop.
  • ready_event/stop_event: signals for “session is ready” and “please stop now”.
  • initialize_result: cached MCP initialize result so initialize() is idempotent.

With this structure, the story becomes straightforward: configure once, start a session in the background when it’s first needed, reuse that session across many contexts and calls, and shut it down safely when the last user is done.

Re-entrant contexts with a single session

One of the trickiest requirements is supporting re-entrant async context managers while still sharing a single underlying session. Code should be able to do this without spawning extra connections:

client = Client("http://localhost:8080")

async with client:  # context A
    # ... do some work ...
    async with client:  # nested context B
        # ... do more work on the same session ...
        ...

Opening and closing the network connection on every __aenter__/__aexit__ would thrash connections and invite race conditions. Instead, the client treats contexts as references to a shared background worker. The key entry point is _connect(), which runs when entering the context:

async def _connect(self):
    """Establish or reuse a session connection."""
    async with self._session_state.lock:
        need_to_start = (
            self._session_state.session_task is None
            or self._session_state.session_task.done()
        )

        if need_to_start:
            if self._session_state.nesting_counter != 0:
                raise RuntimeError(
                    "Internal error: nesting counter should be 0 when "
                    "starting new session, got "
                    f"{self._session_state.nesting_counter}"
                )
            self._session_state.stop_event = anyio.Event()
            self._session_state.ready_event = anyio.Event()
            self._session_state.session_task = asyncio.create_task(
                self._session_runner()
            )
            try:
                await self._session_state.ready_event.wait()
            except asyncio.CancelledError:
                # ... cancellation cleanup and reset ...
                raise

        self._session_state.nesting_counter += 1

    return self

Several design choices here directly protect against hangs and race conditions:

  • All lifecycle decisions are under one lock. Starting or reusing a session is always done inside self._session_state.lock, so two tasks can’t both decide they need to start a new session.
  • Reference counting via nesting_counter. The first caller that sees need_to_start as true creates the background session task and waits for ready_event. Later callers inside the lock simply increment the counter and reuse the running session.
  • Events are tied to a specific session. ready_event and stop_event are created exactly when a new session starts, inside the lock. That avoids the classic bug where one task waits forever on an old event that another task silently replaced.
  • Startup is cancellation-safe. If the caller cancels while waiting for ready_event, they still hold the lock, which guarantees that cleanup of session_task and transport state is consistent.

On the way out of a context, _disconnect() runs under the same lock:

async def _disconnect(self, force: bool = False):
    """Disconnect from session using reference counting."""
    async with self._session_state.lock:
        if force:
            self._session_state.nesting_counter = 0
        else:
            self._session_state.nesting_counter = max(
                0, self._session_state.nesting_counter - 1
            )

        if self._session_state.nesting_counter > 0:
            return

        if self._session_state.session_task is None:
            return

        self._session_state.stop_event.set()
        await self._session_state.session_task
        self._session_state.session_task = None

As long as the counter is positive, the session stays alive. When the last context exits and the counter drops to zero, the client sets stop_event and waits for the background task to shut down the session in one centralized place.

The watchdog pattern that stops hanging requests

Handling session lifecycle correctly is necessary but not sufficient. Many real-world hangs come from a different direction: the server fails, or the transport raises in a background loop, and the foreground coroutine that’s awaiting a response just never returns. Nothing crashes; it just waits forever.

This client addresses that with a small helper that’s central to its robustness: _await_with_session_monitoring. It acts as a watchdog around important RPCs, ensuring that background failures are surfaced quickly to callers.

async def _await_with_session_monitoring(
    self, coro: Coroutine[Any, Any, ResultT]
) -> ResultT:
    """Await a coroutine while monitoring the session task for errors."""
    session_task = self._session_state.session_task

    if session_task is None:
        return await coro

    if session_task.done():
        coro.close()
        exc = session_task.exception()
        if exc:
            raise exc
        raise RuntimeError("Session task completed unexpectedly")

    call_task = asyncio.create_task(coro)

    try:
        done, _ = await asyncio.wait(
            {call_task, session_task},
            return_when=asyncio.FIRST_COMPLETED,
        )

        if session_task in done:
            call_task.cancel()
            with anyio.CancelScope(shield=True), suppress(asyncio.CancelledError):
                await call_task

            exc = session_task.exception()
            if exc:
                raise exc
            raise RuntimeError("Session task completed unexpectedly")

        return call_task.result()
    except asyncio.CancelledError:
        call_task.cancel()
        with anyio.CancelScope(shield=True), suppress(asyncio.CancelledError):
            await call_task
        raise

In effect, every important RPC is raced against the session itself:

  • Background failures are visible. Some transports surface HTTP errors (4xx/5xx) or protocol failures inside the session loop, not inside the waiting coroutine. Here, the client explicitly monitors the session task so those errors can’t be lost.
  • Two-way race: RPC vs session. The helper spins up call_task for the RPC, then waits until either call_task or session_task completes. Whichever completes first determines the outcome.
  • If the session dies first, the RPC is cancelled and the session error is raised. The watchdog cancels call_task, waits for it to clean up under a shielded cancel scope, then raises the session’s exception. The caller sees a clear failure instead of a permanent wait.
  • If the RPC finishes first, the result is returned normally. On the happy path, the watchdog is just a small amount of coordination overhead.
  • Caller cancellation is handled explicitly. If the caller cancels, call_task is cancelled and drained before re-raising CancelledError. That avoids orphaned tasks and warning spam.

This watchdog is then applied to the places where hangs would be most painful in production:

async def ping(self) -> bool:
    """Send a ping request."""
    result = await self._await_with_session_monitoring(self.session.send_ping())
    return isinstance(result, mcp.types.EmptyResult)

async def set_logging_level(self, level: mcp.types.LoggingLevel) -> None:
    """Send a logging/setLevel request."""
    await self._await_with_session_monitoring(
        self.session.set_logging_level(level)
    )

async def complete_mcp(
    self,
    ref: mcp.types.ResourceTemplateReference | mcp.types.PromptReference,
    argument: dict[str, str],
    context_arguments: dict[str, Any] | None = None,
) -> mcp.types.CompleteResult:
    logger.debug(f"[{self.name}] called complete: {ref}")
    result = await self._await_with_session_monitoring(
        self.session.complete(
            ref=ref, argument=argument, context_arguments=context_arguments
        )
    )
    return result

These methods — health checks, logging control, completions — are exactly where you cannot afford silent hangs. Wrapping them in the watchdog gives a strong invariant: if the session dies, your call won’t wait forever; it will fail loudly and promptly.

The audit of this client does note a few methods — such as cancel, progress, and send_roots_list_changed — that currently call self.session directly. Extending _await_with_session_monitoring to those would make the “no RPC ever hangs silently” story fully consistent.

Safety at scale: timeouts, metrics, and locks

The design choices above make a single client robust, but the code also anticipates operational scale: many concurrent calls, flaky networks, and long-lived processes. That’s reflected in how it uses timeouts, how it structures contention around the session lock, and how it’s meant to be instrumented.

Timeouts as explicit guardrails

The client uses two main kinds of timeouts:

  • Per-request timeouts exposed as read_timeout_seconds in _session_kwargs and handed to the transport, so individual reads don’t block indefinitely.
  • Initialization timeout applied in initialize() via anyio.fail_after, so the initial handshake can’t hang forever:
async def initialize(
    self,
    timeout: datetime.timedelta | float | int | None = None,
) -> mcp.types.InitializeResult:
    if self.initialize_result is not None:
        return self.initialize_result

    if timeout is None:
        timeout = self._init_timeout
    else:
        timeout = normalize_timeout_to_seconds(timeout)

    try:
        with anyio.fail_after(timeout):
            self._session_state.initialize_result = await self.session.initialize()
            return self._session_state.initialize_result
    except TimeoutError as e:
        raise RuntimeError("Failed to initialize server session") from e

This makes initialize() both idempotent and time-bounded. If the server never responds, callers still get control back with a meaningful error. Cleanup paths in __aexit__ and _connect similarly use short move_on_after windows to ensure shutdown logic itself can’t stall indefinitely.

Lock contention and client fan-out

The single _session_state.lock is deliberately the one place where contention is possible. Every _connect and _disconnect must acquire it to adjust nesting_counter and manage session_task. Under concurrency, that serializes short critical sections while keeping the session state machine coherent.

Two usage patterns fall naturally out of this design:

  • Share a client; don’t recreate it per request. The client is intended to be created once per target server and reused. In steady state, _connect usually just increments nesting_counter and returns quickly, so the lock is only held briefly.
  • Use client.new() to add parallelism when you hit a bottleneck. When one session becomes a contention point, new() cheaply clones configuration but gives you a fresh ClientSessionState and thus an independent session:
def new(self) -> Client[ClientTransportT]:
    new_client = copy.copy(self)

    if not isinstance(self.transport, StdioTransport):
        new_client._session_state = ClientSessionState()

    new_client.name += f":{secrets.token_hex(2)}"
    return new_client

This is where the earlier separation of configuration and runtime state pays off directly: cloning configuration is trivial, and each clone gets its own lock, counters, and events without affecting the others.

Metrics that track your invariants

A design like this only fully pays off if you can see when its assumptions stop holding. The audit suggests a small set of metrics that map cleanly onto the invariants we’ve discussed:

Metric What it tells you Typical target
fastmcp_client_session_active Whether a client currently has an active session task and session Gauge: 0 or 1 per client
fastmcp_client_connect_latency_seconds Time from starting _connect to ready_event being set p95 < 1s for low-latency servers
fastmcp_client_initialize_latency_seconds Duration of initialize() calls p95 well below configured init_timeout
fastmcp_client_rpc_errors_total Exceptions surfaced via _await_with_session_monitoring Error ratio < 1% of RPCs
fastmcp_client_session_restarts_total How often the background session gets restarted Low under normal operation; investigate spikes

If you adopt a similar background-session and watchdog architecture, pairing it with focused metrics like these gives early warning when latency, error rates, or session stability drift away from your design assumptions.

Lessons you can steal today

We’ve followed this MCP client from its session state object, through re-entrant context management, into watchdog-guarded RPCs, and out to timeouts, locks, and metrics. The core lesson is simple: design your async clients so they fail fast and visibly instead of hanging silently, even when transports or servers fail in awkward ways.

Here are concrete patterns you can lift into your own async libraries:

  • Isolate configuration from runtime state. Keep a compact state object (like ClientSessionState) that holds locks, counters, tasks, and events. That isolation makes cloning, resetting, and lifecycle reasoning far less error-prone.
  • Use a reference-counted background worker for shared connections. Treat async with client: as “borrow a handle” to a long-lived session, not “open and close a socket every time”. A simple counter under a lock can model “who is still using this resource?” clearly.
  • Introduce a watchdog helper for long-running RPCs. When a session loop can fail independently of an individual call, explicitly race the RPC against the session task and propagate whichever fails first. This one pattern removes an entire class of hangs.
  • Put explicit time limits on setup and teardown. Use constructs like fail_after and short move_on_after windows so that no phase of the client lifecycle can block indefinitely, even when the other side is broken.
  • Instrument the invariants you care about. Track whether sessions are active, how long connects and initializes take, how often RPCs fail via the watchdog, and how frequently sessions restart. Those metrics tell you when the system is drifting toward the conditions that cause hangs in the first place.

If you’re building async clients — for HTTP APIs, databases, or protocol layers like MCP — this design is a strong blueprint: keep the public surface area small and intuitive, but invest heavily in the internal machinery that ensures your clients never just sit there waiting forever.

Full Source Code

Here's the full source code of the file that inspired this article.
Read on GitHub

Thanks for reading! I hope this was useful. If you have questions or thoughts, feel free to reach out.

Content Creation Process: This article was generated via a semi-automated workflow using AI tools. I prepared the strategic framework, including specific prompts and data sources. From there, the automation system conducted the research, analysis, and writing. The content passed through automated verification steps before being finalized and published without manual intervention.

Mahmoud Zalt

About the Author

I’m Zalt, a technologist with 16+ years of experience, passionate about designing and building AI systems that move us closer to a world where machines handle everything and humans reclaim wonder.

Let's connect if you're working on interesting AI projects, looking for technical advice or want to discuss anything.

Support this content

Share this article