Skip to home
المدونة

Zalt Blog

Deep Dives into Code & Architecture at Scale

When One Class Runs Your Cluster

By محمود الزلط
Code Cracking
30m read
<

When one class runs your cluster, you get power and risk in the same place. Curious how that trade-off plays out in real distributed systems? ⚙️

/>
When One Class Runs Your Cluster - Featured blog post image

Every mature distributed system eventually grows a “god class” — one place where all the critical decisions converge. In Apache Kafka’s broker, that role is played by ReplicaManager. It appends your messages, serves your fetches, talks to remote storage, reacts to disk failures, and applies metadata changes, all from a single, heavyweight Scala file.

In this article, we’ll walk through that class together. I’ll show you why Kafka’s ReplicaManager is both a brilliant orchestration center and a maintainability hazard — and how we can borrow its best ideas without inheriting its pain.

I’m Mahmoud Zalt, and we’ll treat this as a guided code review of the broker’s beating heart.

ReplicaManager’s Real Job

Before we talk design, we need to be clear about what ReplicaManager actually does. Kafka’s broker is layered: the network layer parses requests, ReplicaManager decides what those requests mean for replicas and logs, and lower-level components like Partition and UnifiedLog touch disk.

kafka.broker.process
  └─ core
     └─ server
        ├─ KafkaRequestHandler (network layer)
        │    ├─ calls ReplicaManager.appendRecords / handleProduceAppend
        │    ├─ calls ReplicaManager.fetchMessages
        │    ├─ calls ReplicaManager.fetchOffset
        │    ├─ calls ReplicaManager.deleteRecords
        │    └─ calls ReplicaManager.describeLogDirs / lastOffsetForLeaderEpoch / activeProducerState
        └─ ReplicaManager (this file)
             ├─ allPartitions: Map[TopicPartition, HostedPartition]
             ├─ logManager: LogManager
             ├─ replicaFetcherManager / replicaAlterLogDirsManager
             ├─ delayedProducePurgatory / delayedFetchPurgatory / ...
             ├─ remoteLogManager (optional)
             ├─ metadataCache / applyDelta(TopicsDelta)
             └─ Partition (per-topic-partition)
                  ├─ UnifiedLog (leader/follower)
                  └─ RemoteLog (via RemoteLogManager)
The broker’s server core: request handlers above, storage primitives below, ReplicaManager in the middle.

ReplicaManager is not just a helper; it is the broker-side state machine that decides how every partition on that broker lives, moves, and fails.

Concretely, it is responsible for:

  • Maintaining an in-memory map from TopicPartition to HostedPartition (online, offline, or none).
  • Routing produces via appendRecords / handleProduceAppend and fetches via fetchMessages / readFromLog.
  • Managing replication state: ISR shrink/expand, follower fetchers, and alter-log-dirs migration.
  • Integrating remote (tiered) storage through RemoteLogManager for both fetch and offsets.
  • Reacting to metadata changes via applyDelta when leaders, followers, or directories change.
  • Handling log directory failures and deciding when to halt the broker.

It’s a single class with a very clear conceptual boundary: “everything about partitions and replicas on this broker”. That cohesion is its strength — and also the reason it became huge.

The Power and Price of a God Class

Once we see the responsibilities, the central story emerges: ReplicaManager is a carefully designed god class. It coordinates half a dozen subsystems — logs, fetchers, purgatories, remote storage, transactions, metadata — with surprisingly disciplined boundaries, but the sheer size and nested flow make it difficult to evolve.

The code introduces a small algebraic data type to represent per-partition hosting state:

sealed trait HostedPartition

object HostedPartition {
  /**
   * This broker does not have any state for this partition locally.
   */
  final object None extends HostedPartition

  /**
   * This broker hosts the partition and it is online.
   */
  final case class Online(partition: Partition) extends HostedPartition

  /**
   * This broker hosts the partition, but it is in an offline log directory.
   */
  final case class Offline(partition: Option[Partition]) extends HostedPartition
}
HostedPartition: a tiny sealed trait guarding all partition access.

This is one of the file’s best design choices. A sealed trait in Scala is like a closed enum with payloads: all variants are known at compile time. By forcing all access through HostedPartition, the class can encode invariants such as “offline directories map to Offline and must return KAFKA_STORAGE_ERROR”.

The downside is volume. This single file also contains:

  • Full produce handling and transaction verification (handleProduceAppend).
  • Fetch handling, including preferred replicas, throttling, and remote tiered reads.
  • Delete-records coordination with purgatories.
  • Log-dir reassignments and failures.
  • Metadata delta application (applyDelta, applyLocalLeadersDelta, applyLocalFollowersDelta).
  • Background tasks like ISR shrink and high watermark checkpointing.

From the report’s quality assessment:

  • Maintainability score 3/5 – conceptually coherent, but many long methods and interleaved concerns.
  • Testability score 3/5 – collaborators are injected, but flows are complex and intertwined.

This is the key tension: the class is architecturally clean but locally complex. The story for us as engineers is how to keep the cleanliness and reduce the complexity.

Purgatories and Delayed Work

Once you accept that this class orchestrates everything, the next big idea is how it handles waiting. Kafka doesn’t block threads while it waits for data or replication; it uses purgatories — in-memory schedulers of delayed operations.

A purgatory here is a component that stores operations keyed by partition and periodically checks whether their completion conditions are satisfied. It’s an in-memory waiting room with rules.

Produce: when do we wait?

For produces, ReplicaManager decides if it should create a delayed operation based on three simple conditions:

private def delayedProduceRequestRequired(requiredAcks: Short,
                                          entriesPerPartition: Map[TopicIdPartition, MemoryRecords],
                                          localProduceResults: Map[TopicIdPartition, LogAppendResult]): Boolean = {
  requiredAcks == -1 &&
  entriesPerPartition.nonEmpty &&
  localProduceResults.values.count(_.exception.isDefined) < entriesPerPartition.size
}
Delayed produce is only needed for acks=-1, non-empty requests with at least one success.

In words:

  • Client asked for acks = -1 (wait for all replicas).
  • There is some data in this request.
  • At least one partition append succeeded (otherwise we can just fail immediately).

If those conditions hold, maybeAddDelayedProduce wraps things into a DelayedProduce and registers it in delayedProducePurgatory. Otherwise, it responds immediately.

Completing delayed work when the log moves

Now consider what happens when data is appended and the leader’s high watermark (HW) increases. That progress might unblock:

  • Produce requests waiting for replication.
  • Fetch requests waiting for new data (minBytes > 0).
  • Delete-records requests waiting for low watermarks to advance.
  • Share-fetch requests in Kafka’s shared subscription feature.

Instead of scattering this logic everywhere, the code centralizes it in addCompletePurgatoryAction:

private def addCompletePurgatoryAction(
  actionQueue: ActionQueue,
  appendResults: Map[TopicIdPartition, LogAppendResult]
): Unit = {
  actionQueue.add {
    () => appendResults.foreach { case (topicIdPartition, result) =>
      val requestKey = new TopicPartitionOperationKey(topicIdPartition.topicPartition)
      result.info.leaderHwChange match {
        case LeaderHwChange.INCREASED =>
          // some delayed operations may be unblocked after HW changed
          delayedProducePurgatory.checkAndComplete(requestKey)
          delayedFetchPurgatory.checkAndComplete(requestKey)
          delayedDeleteRecordsPurgatory.checkAndComplete(requestKey)
          if (topicIdPartition.topicId != Uuid.ZERO_UUID)
            delayedShareFetchPurgatory.checkAndComplete(
              new DelayedShareFetchPartitionKey(topicIdPartition.topicId,
                                                topicIdPartition.partition))
        case LeaderHwChange.SAME =>
          // probably unblock some follower fetch requests
          delayedFetchPurgatory.checkAndComplete(requestKey)
        case LeaderHwChange.NONE =>
          // nothing
      }
    }
  }
}
One place to reconcile changes in log state with “who was waiting on this partition?”

This is a great pattern: react to domain events (HW changed) by delegating to a central “complete all delayed work” helper. The code-smell here is that a similar enumeration of purgatories exists elsewhere.

For example, when a broker loses leadership for a partition, it must also unblock any operations that will never complete:

private def completeDelayedOperationsWhenNotPartitionLeader(
  topicPartition: TopicPartition,
  topicId: Option[Uuid]
): Unit = {
  val topicPartitionOperationKey = new TopicPartitionOperationKey(topicPartition)
  delayedProducePurgatory.checkAndComplete(topicPartitionOperationKey)
  delayedFetchPurgatory.checkAndComplete(topicPartitionOperationKey)
  delayedRemoteFetchPurgatory.checkAndComplete(topicPartitionOperationKey)
  delayedRemoteListOffsetsPurgatory.checkAndComplete(topicPartitionOperationKey)
  if (topicId.isDefined)
    delayedShareFetchPurgatory.checkAndComplete(
      new DelayedShareFetchPartitionKey(topicId.get, topicPartition.partition()))
}
Leadership loss also has to clean up all delayed operations for that partition.

The report highlights this as a duplication risk: every time a new purgatory is added, we must remember to update all such helpers. The suggested refactor is to introduce a single completeAllDelayedForPartition helper and call it from every leadership-change or partition-stop path.

Transactional Produce Without Losing Your Mind

The most cognitively dense part of ReplicaManager is transactional produce handling: handleProduceAppend. This is where the class coordinates producers, transactional IDs, the transaction coordinator, and standard append logic.

The flow looks like this, in simplified English:

  1. Scan all batches for transactional producers (those with producerId and isTransactional).
  2. Ensure there is at most one (producerId, epoch) pair in the request.
  3. Ask the transaction coordinator to verify or add partitions to the transaction.
  4. Translate coordinator errors into produce-friendly errors (e.g., NOT_ENOUGH_REPLICAS).
  5. Retry on CONCURRENT_TRANSACTIONS for newer clients within a bounded timeout.
  6. Finally, delegate to appendRecords to perform the actual append + optional delayed produce.

The first chunk of the method is particularly noisy:

val transactionalProducerInfo = mutable.HashSet[(Long, Short)]()
val topicPartitionBatchInfo = mutable.Map[TopicPartition, Int]()
val topicIds = entriesPerPartition.keys.map(tp => tp.topic() -> tp.topicId()).toMap
entriesPerPartition.foreachEntry { (topicIdPartition, records) =>
  // Produce requests (only requests that require verification) should only have one batch per partition
  val transactionalBatches = records.batches.asScala
    .filter(batch => batch.hasProducerId && batch.isTransactional)
  transactionalBatches.foreach(batch =>
    transactionalProducerInfo.add(batch.producerId, batch.producerEpoch))
  if (transactionalBatches.nonEmpty)
    topicPartitionBatchInfo.put(topicIdPartition.topicPartition(),
                                records.firstBatch.baseSequence)
}
if (transactionalProducerInfo.size > 1) {
  throw new InvalidPidMappingException(
    "Transactional records contained more than one producer ID")
}
Transactional batch discovery and validation in handleProduceAppend.

This is exactly the kind of logic that should live in a small, pure helper. The report suggests extracting it into collectTransactionalProduceInfo, returning a tuple of:

  • Set of (producerId, epoch) pairs.
  • Map of TopicPartition → baseSequence.
  • Map of topic name to topic ID.

Why does this matter?

  • Cognitive complexity. The method currently interleaves scanning, mapping, callbacks, retries, and error translation.
  • Testability. A helper like collectTransactionalProduceInfo is trivial to unit test for edge cases (e.g., multiple producer IDs) without wiring schedulers or coordinators.
  • Extensibility. Future transaction variants (say, additional flags) can be integrated by adjusting a single helper’s output type instead of threading new conditionals through a long method.

More broadly, handleProduceAppend is a classic example of what happens when an orchestrator grows features vertically inside one method instead of horizontally into helpers. The report places its cyclomatic complexity at 12 and cognitive complexity at 14, which matches how it feels to read.

Handling Disks, Directories, and Disaster

So far we’ve looked at the “happy” side: produces and fetches that eventually succeed. But ReplicaManager also owns a much darker duty: reacting when log directories fail.

Disk failure handling is a place where elegance matters less than safety. This code path decides whether to keep the broker up or halt it, which partitions go offline, and which metrics and controllers are notified.

def handleLogDirFailure(dir: String, notifyController: Boolean = true): Unit = {
  if (!logManager.isLogDirOnline(dir))
    return
  // retrieve the UUID here because logManager.handleLogDirFailure handler removes it
  val uuid = logManager.directoryId(dir)
  warn(s"Stopping serving replicas in dir $dir with uuid $uuid because the log directory has failed.")
  replicaStateChangeLock synchronized {
    val newOfflinePartitions = onlinePartitionsIterator.filter { partition =>
      partition.log.exists { _.parentDir == dir }
    }.map(_.topicPartition).toSet

    val partitionsWithOfflineFutureReplica = onlinePartitionsIterator.filter { partition =>
      partition.futureLog.exists { _.parentDir == dir }
    }.toSet

    replicaFetcherManager.removeFetcherForPartitions(newOfflinePartitions)
    replicaAlterLogDirsManager.removeFetcherForPartitions(
      newOfflinePartitions ++ partitionsWithOfflineFutureReplica.map(_.topicPartition))

    partitionsWithOfflineFutureReplica.foreach(partition =>
      partition.removeFutureLocalReplica(deleteFromLogDir = false))
    newOfflinePartitions.foreach { topicPartition =>
      markPartitionOffline(topicPartition)
    }
    newOfflinePartitions.map(_.topic).foreach { topic: String =>
      maybeRemoveTopicMetrics(topic)
    }
    highWatermarkCheckpoints = highWatermarkCheckpoints.filter {
      case (checkpointDir, _) => checkpointDir != dir
    }

    warn(s"Broker $localBrokerId stopped fetcher for partitions ${newOfflinePartitions.mkString(",")} and " +
         s"stopped moving logs for partitions ${partitionsWithOfflineFutureReplica.mkString(",")} " +
         s"because they are in the failed log directory $dir.")
  }
  logManager.handleLogDirFailure(dir)
  if (dir == new File(config.metadataLogDir).getAbsolutePath && config.processRoles.nonEmpty) {
    fatal(s"Shutdown broker because the metadata log dir $dir has failed")
    Exit.halt(1)
  }

  if (notifyController) {
    if (uuid.isDefined) {
      directoryEventHandler.handleFailure(uuid.get)
    } else {
      fatal(s"Unable to propagate directory failure disabled because directory $dir has no UUID")
      Exit.halt(1)
    }
  }
  warn(s"Stopped serving replicas in dir $dir")
}
Log directory failure handling: marking partitions offline and coordinating with controllers.

This snippet shows several important patterns:

  • Guard clause. If the dir is already offline, exit early.
  • Single lock. A dedicated replicaStateChangeLock coordinates changes to allPartitions and fetcher state.
  • Two kinds of partitions. Those whose current log is in the dir, and those whose future log (for alter-log-dirs) is there.
  • Fetcher shutdowns before state changes. Fetcher threads are stopped before partitions are marked offline, avoiding races.
  • HW checkpoints cleaned up. Checkpoint files for the failed dir are removed.
  • Safety fails closed. If the metadata log dir fails, the broker halts via Exit.halt(1).

From a design perspective, this is exactly the kind of logic you want in a small, well-named collaborator (e.g., LogDirFailureCoordinator) rather than buried in a 900-line class. The report explicitly calls this out as a refactor candidate.

From Clean Code to Healthy Clusters

One of the most instructive parts of the analysis is how tightly ReplicaManager connects implementation choices to operational behavior. This isn’t just “clean Scala”; it’s code that shows up in latency graphs and incident timelines.

Hot paths and complexity

The main hot paths in this class are:

  • appendRecords / appendRecordsToLeader for heavy-produce brokers.
  • fetchMessages / readFromLog for heavy-consumer brokers.
  • fetchOffset for frequent ListOffsets calls.

Each of these is essentially O(P), where P is the number of partitions touched by the request. That’s reasonable and predictable, but the real latency comes from disk I/O, purgatory waiting, and remote storage.

Remote fetches & memory risk

Remote (tiered) storage integration is particularly subtle. A remote read result can be up to fetch.max.bytes (default 50 MB). Holding many of those in purgatory would be a great way to blow up your broker.

To avoid this, ReplicaManager configures the remote fetch purgatory with a purgeInterval of 0 — meaning completed operations are purged immediately and can be garbage-collected.

On the metrics side, the report highlights several key signals that directly reflect the correctness and performance of these code paths:

  • ReplicaManager.DelayedFetchPurgatorySize – large or growing values mean many clients are waiting for data.
  • ReplicaManager.DelayedProducePurgatorySize – pending produces indicate slow followers or replication issues.
  • UnderReplicatedPartitions – core health metric; should be 0 in steady state.
  • UnderMinIsrPartitionCount / AtMinIsrPartitionCount – partitions operating close to durability limits.
  • IsrShrinksPerSec / IsrExpandsPerSec – ISR churn, a sign of instability.

The interesting part for us as designers is that these metrics are not an afterthought. They are wired directly into the main flows with carefully chosen boundaries: purgatories, ISR checks, fetchers, and remote storage all expose exactly what ReplicaManager needs to track system health without overcoupling.

What We Should Steal From ReplicaManager

Stepping back, the core lesson from this file is not “don’t write big classes”. It’s more nuanced:

When one class truly orchestrates your system’s core lifecycle, you win a lot of clarity and power — but only if you aggressively factor out local complexity and centralize repeated patterns.

Here are the practical takeaways we can apply to our own systems.

1. Model hosting state explicitly

Instead of sprinkling booleans like isOnline, isOffline, or hasFutureLog across your codebase, represent them as an explicit sum type (sealed trait / enum with variants). HostedPartition is a textbook example:

  • None – this broker doesn’t host this partition.
  • Online – fully operational.
  • Offline – hosted, but its log directory has failed.

This makes error handling (e.g., KAFKA_STORAGE_ERROR vs NOT_LEADER_OR_FOLLOWER) explicit and consistent, and it gives you a single choke point to evolve state transitions.

2. Centralize “complete all delayed work” logic

If multiple parts of your system use delayed operations keyed by the same domain object (like TopicPartition), introduce a small helper that knows how to:

  • Register operations across all purgatories for a key.
  • Complete them when a domain event occurs (HW increased, leadership lost, partition deleted).

ReplicaManager currently lists all purgatories in multiple places; the suggested completeAllDelayedForPartition helper is exactly the right refactor to reduce bugs when adding new waiting rooms.

3. Extract helpers around heavy “if/else + callbacks + retries” flows

Methods like handleProduceAppend and fetchOffset show how quickly maintainability drops when you combine:

  • Domain discovery (scan batches for transactional producers).
  • Validation (multiple producer IDs, unsupported timestamps).
  • Async coordination (talk to the transaction coordinator or remote storage).
  • Retries with backoff.

In these situations, even “just” extracting collectTransactionalProduceInfo or a normalizeFetchDataInfo helper pays off in readability and testability. Over time, these helpers can grow into their own dedicated coordinators, reducing the god-class footprint.

4. Keep safety-critical flows isolated and boring

Disk failure handling is deliberately conservative: it takes a lock, computes a clear set of affected partitions, shuts down fetchers, marks partitions offline, updates checkpoints, calls the log manager, and, if necessary, halts the process.

Even if you keep it in the same class, treat such flows as if they lived in their own module:

  • Minimize external dependencies and side effects.
  • Keep logs and metrics explicit.
  • Document which failures are fatal and why.

5. Design for operations, not just elegance

ReplicaManager’s design is deeply operationally aware:

  • ISR checks and shrink intervals are tied to replicaLagTimeMaxMs.
  • Purgatory purge intervals are tuned to avoid holding big objects.
  • Remote fetch and list-offset timeouts are exposed via config.
  • Key metrics map almost one-to-one to conceptual entities: leaders, ISRs, purgatories, remote reads.

When you build your own orchestrators, ask: “Which parts of this flow will show up in an SLO or alert, and how do I surface those as clean metrics and logs?”


ReplicaManager is a fascinating piece of engineering: a single class that quite literally runs your Kafka cluster. It shows both how powerful a central orchestrator can be and how quickly local complexity can spiral if we don’t keep extracting helpers and abstractions.

If you’re designing the “brain” of your own system — a job scheduler, a replication controller, an API gateway — there’s a lot to learn here. Model state explicitly, centralize delayed work, separate safety-critical flows, and bake observability into the core. And when your orchestrator starts looking like this file in size, that’s your cue to grow sideways into small, testable collaborators while keeping the high-level story in one place.

That way, you get the benefits of a god class — a single mental model for how the system behaves — without inheriting its long-term maintenance curse.

Full Source Code

Here's the full source code of the file that inspired this article.
Read on GitHub

Unable to load source code

Thanks for reading! I hope this was useful. If you have questions or thoughts, feel free to reach out.

Content Creation Process: This article was generated via a semi-automated workflow using AI tools. I prepared the strategic framework, including specific prompts and data sources. From there, the automation system conducted the research, analysis, and writing. The content passed through automated verification steps before being finalized and published without manual intervention.

Mahmoud Zalt

About the Author

I’m Zalt, a technologist with 15+ years of experience, passionate about designing and building AI systems that move us closer to a world where machines handle everything and humans reclaim wonder.

Let's connect if you're working on interesting AI projects, looking for technical advice or want to discuss your career.

Support this content

Share this article