Skip to main content

When One Agent Class Knows Too Much

When one agent class knows too much, you don’t just get convenience—you risk a God object. How do you keep your core agent powerful without turning it into a blob?

Code Cracking
25m read
#softwaredesign#architecture#agents#LLM
When One Agent Class Knows Too Much - Featured blog post image

CONSULTING

Learning to build AI agents?

Agent decomposition, clean interfaces, testing strategies — 1:1 mentoring on the patterns that keep agent code maintainable.

We’re examining how crewAI’s core Agent class orchestrates LLM workflows—tools, memory, knowledge, timeouts, guardrails, sync and async—and how that power edges it toward a classic God object. crewAI is an open-source framework for building collaborative AI agents, and this file is its control tower. I’m Mahmoud Zalt, an AI solutions architect, and we’ll use this class to learn how to design an agent façade that stays useful without turning into an unmaintainable blob.

By the end, you’ll know how to draw the line between a clean gateway layer and a God object, and how to structure retries, guardrails, and performance-sensitive logic in your own agent-style orchestration code.

How the Agent Orchestrator Works

The Agent class lives at the center of crewAI’s architecture. Think of it as the control tower for an AI airport: every task is a flight, the LLM is the pilot, tools are ground services, memory and knowledge are the map archives, and the event bus is the telemetry system.

project-root/
  lib/
    crewai/
      src/
        crewai/
          agent/
            core.py        # Agent orchestration (this file)
            utils.py
          agents/
            crew_agent_executor.py
            agent_builder/
              base_agent.py
          knowledge/
            knowledge.py
          llms/
            base_llm.py
          tools/
            agent_tools/
            memory_tools/
          events/
            event_bus.py
            types/
              agent_events.py
              memory_events.py
              knowledge_events.py
The Agent sits in the agent layer, orchestrating LLMs, tools, memory, knowledge, and events.

This class exposes two main execution styles:

  • execute_task / aexecute_task: run a structured Task inside a crew.
  • kickoff family: run ad‑hoc messages without a crew or task abstraction.

Both follow the same pipeline:

  1. Build a base prompt from the task or raw messages.
  2. Enrich it with schema, context, memory recall, and knowledge retrieval.
  3. Prepare tools and choose an executor strategy (CrewAgentExecutor vs AgentExecutor).
  4. Invoke the LLM through the executor with optional timeouts and RPM limits.
  5. Post‑process results (tools, Pydantic conversion, guardrails), emit events, and save memory.

The synchronous task path shows how much coordination the Agent owns:

Synchronous task execution pipeline with memory and retries
def execute_task(
    self,
    task: Task,
    context: str | None = None,
    tools: list[BaseTool] | None = None,
) -> Any:
    handle_reasoning(self, task)
    self._inject_date_to_task(task)

    if self.tools_handler:
        self.tools_handler.last_used_tool = None

    task_prompt = task.prompt()
    task_prompt = build_task_prompt_with_schema(task, task_prompt, self.i18n)
    task_prompt = format_task_with_context(task_prompt, context, self.i18n)

    if self._is_any_available_memory():
        crewai_event_bus.emit(... MemoryRetrievalStartedEvent ...)
        memory = ""
        try:
            unified_memory = getattr(self, "memory", None) or (
                getattr(self.crew, "_memory", None) if self.crew else None
            )
            if unified_memory is not None:
                query = task.description
                matches = unified_memory.recall(query, limit=5)
                if matches:
                    memory = "Relevant memories:\n" + "\n".join(
                        m.format() for m in matches
                    )
            if memory.strip() != "":
                task_prompt += self.i18n.slice("memory").format(memory=memory)

            crewai_event_bus.emit(... MemoryRetrievalCompletedEvent ...)
        except Exception:
            crewai_event_bus.emit(... MemoryRetrievalFailedEvent ...)

    knowledge_config = get_knowledge_config(self)
    task_prompt = handle_knowledge_retrieval(...)

    prepare_tools(self, tools, task)
    task_prompt = apply_training_data(self, task_prompt)

    # Emit AgentExecutionStartedEvent, validate timeout, execute via executor,
    # handle retries, process tool results, emit completed event, cleanup MCP.
    ...

In one method you see memory, knowledge, tools, training data, events, and retries all wired together. That centralized orchestration is exactly what makes the class powerful—and exactly what pushes it toward knowing too much.

Facade vs. God Object

With this mental model in place, the key question is architectural: is Agent a clean gateway into a complex system, or has it slipped into God object territory? A God object is a class that knows or does too much, becoming the dumping ground for unrelated responsibilities.

The analysis report for this file explicitly flags a smell:

Smell Impact Suggested Fix
God object / large multipurpose class Agent handles task orchestration, kickoff, guardrails, tools, memory, knowledge, MCP, platform, Docker validation—raising cognitive load and change risk. Extract components like GuardrailExecutor, KickoffService, or CodeExecutionValidator and delegate from Agent.

At the same time, the design uses real patterns:

  • Facade: Agent presents a single high‑level API over LLMs, tools, memory, knowledge, and executors.
  • Strategy: executor_class lets you swap CrewAgentExecutor for AgentExecutor without changing call sites.
  • Observer: key phases emit events via crewai_event_bus, decoupling observability from core logic.

So Agent is simultaneously:

  • a gateway layer that makes a complex system easy to use, and
  • a God object that centralizes so many concerns that every change is risky.

The real lesson here: a strong façade will drift into a God object unless you draw hard boundaries around what the façade is allowed to orchestrate and what must live in dedicated components.

Retries and Guardrails: Hidden Complexity

Once you accept that Agent is the orchestration hub, the next pressure point is failure handling: timeouts, errors, and guardrail violations. This is where invisible complexity creeps in—users don’t see it in the API but they absolutely feel it in behavior, latency, and cost.

Recursive Retries in Task Execution

Both execute_task and aexecute_task implement retries using recursion:

except Exception as e:
    if e.__class__.__module__.startswith("litellm"):
        crewai_event_bus.emit(... AgentExecutionErrorEvent ...)
        raise e
    if isinstance(e, _passthrough_exceptions):
        raise
    self._times_executed += 1
    if self._times_executed > self.max_retry_limit:
        crewai_event_bus.emit(... AgentExecutionErrorEvent ...)
        raise e
    result = self.execute_task(task, context, tools)

Recursion works for small limits, but it has drawbacks:

  • Confusing stack traces: repeated execute_task frames obscure the failing call.
  • Stack overflow risk: if max_retry_limit or guards change, you can end up with deep recursion.
  • Shared mutable state: _times_executed lives on the object. Reusing one Agent instance across calls—especially concurrently—becomes dangerous.

A loop-based retry makes the policy explicit and easier to reason about:

Illustrative: loop‑based retry instead of recursion
def execute_task(self, task: Task, context: str | None = None,
                 tools: list[BaseTool] | None = None) -> Any:
    # ...prompt, memory, knowledge, tools prepared above...

    attempt = 0
    last_exception: Exception | None = None

    while attempt <= self.max_retry_limit:
        try:
            # emit AgentExecutionStartedEvent, run with/without timeout
            result = self._run_single_attempt(task, context, tools)
            break
        except TimeoutError:
            # emit error event and re‑raise immediately
            raise
        except Exception as e:
            if self._should_not_retry(e):
                # emit error event and re‑raise
                raise
            last_exception = e
            attempt += 1

    if last_exception is not None and attempt > self.max_retry_limit:
        # emit final error event
        raise last_exception

    # process result, emit completed event, cleanup MCP
    return self._finalize_result(result, task)

This is illustrative, but it captures the design goal: a linear representation of “try up to N times, then give up”, with clear hooks for metrics and logging.

Guardrails as a Decorator Around Kickoff

Guardrails are validations or policies applied to outputs. In this class, guardrails wrap the kickoff flow via _process_kickoff_guardrail. Conceptually, this is a decorator: an extra layer that can reject outputs and trigger re‑runs.

Guardrail processing with recursive retries
def _process_kickoff_guardrail(
    self,
    output: LiteAgentOutput,
    executor: AgentExecutor,
    inputs: dict[str, str],
    response_format: type[Any] | None = None,
    retry_count: int = 0,
) -> LiteAgentOutput:
    from crewai.utilities.guardrail_types import GuardrailCallable

    if isinstance(self.guardrail, str):
        from crewai.tasks.llm_guardrail import LLMGuardrail
        guardrail_callable = cast(
            GuardrailCallable,
            LLMGuardrail(description=self.guardrail, llm=cast(BaseLLM, self.llm)),
        )
    elif callable(self.guardrail):
        guardrail_callable = self.guardrail
    else:
        return output

    guardrail_result = process_guardrail(
        output=output,
        guardrail=guardrail_callable,
        retry_count=retry_count,
        event_source=self,
        from_agent=self,
    )

    if not guardrail_result.success:
        if retry_count >= self.guardrail_max_retries:
            raise ValueError(
                f"Agent's guardrail failed validation after {self.guardrail_max_retries} "
                f"retries. Last error: {guardrail_result.error}"
            )

        executor._append_message_to_state(
            guardrail_result.error or "Guardrail validation failed",
            role="user",
        )

        output = self._execute_and_build_output(executor, inputs, response_format)

        return self._process_kickoff_guardrail(
            output=output,
            executor=executor,
            inputs=inputs,
            response_format=response_format,
            retry_count=retry_count + 1,
        )

    if guardrail_result.result is not None:
        if isinstance(guardrail_result.result, str):
            output.raw = guardrail_result.result
        elif isinstance(guardrail_result.result, BaseModel):
            output.pydantic = guardrail_result.result

    return output

Design-wise, this is solid:

  • Guardrails can be string descriptions (handled by LLMGuardrail) or plain callables.
  • Failures trigger bounded retries via guardrail_max_retries.
  • Error feedback is appended to the conversation state so the LLM can correct itself.

But the same recursive retry pattern appears here. Combined with task-level retries, a single kickoff can:

  • Run the LLM multiple times for core execution.
  • Run additional times for each guardrail failure.

Without metrics, this quietly multiplies latency and cost. The control logic is robust, but you need visibility into how often guardrails are firing and how many retries they cause.

Performance and Scale Under Load

All of this orchestration is fine for a demo agent. The real test is dozens or hundreds of tasks hitting the same Agent under real traffic. The analysis surfaces several performance and scalability issues that fall directly out of the God object tendency.

Timeouts via Threads and Async

Synchronous execution uses a ThreadPoolExecutor to enforce max_execution_time:

def _execute_with_timeout(self, task_prompt: str, task: Task, timeout: int) -> Any:
    import concurrent.futures

    with concurrent.futures.ThreadPoolExecutor() as executor:
        future = executor.submit(
            self._execute_without_timeout, task_prompt=task_prompt, task=task
        )

        try:
            return future.result(timeout=timeout)
        except concurrent.futures.TimeoutError as e:
            future.cancel()
            raise TimeoutError(
                f"Task '{task.description}' execution timed out after {timeout} seconds. "
                "Consider increasing max_execution_time or optimizing the task."
            ) from e
        except Exception as e:
            future.cancel()
            raise RuntimeError(f"Task execution failed: {e!s}") from e

The async path mirrors this with asyncio.wait_for. The split is clean, but two operational points matter:

  • Thread pools per call: creating a new ThreadPoolExecutor for each execution is simple but inefficient under heavy sync load.
  • Shared state: fields like agent_executor and _times_executed are mutated without locks. Sharing one Agent instance across threads or concurrent async calls is unsafe.

Memory and Knowledge: Powerful but Token‑Hungry

Memory and knowledge integration are among the most useful features of this class. The agent:

  • Recalls recent memories relevant to the task description.
  • Appends a "Relevant memories:" block into the prompt.
  • Queries knowledge sources via Knowledge or crew‑level knowledge configuration.

Every recalled memory line and knowledge snippet adds tokens and latency. The performance profile recommends tracking metrics like total tokens used and the size of memory recall in tokens to keep this in check.

A simple pattern emerges:

  • Keep recall limits low (e.g., limit=5 for tasks, limit=20 for kickoff) and watch how they affect end‑to‑end duration.
  • Use configuration like respect_context_window and token counters to avoid exceeding model limits.

Code Execution and Docker Validation

When allow_code_execution is enabled, the agent validates Docker on initialization:

def _validate_docker_installation(self) -> None:
    """Check if Docker is installed and running."""
    docker_path = shutil.which("docker")
    if not docker_path:
        raise RuntimeError(
            f"Docker is not installed. Please install Docker to use code execution with agent: {self.role}"
        )

    try:
        subprocess.run(
            [docker_path, "info"],
            check=True,
            stdout=subprocess.PIPE,
            stderr=subprocess.PIPE,
        )
    except subprocess.CalledProcessError as e:
        raise RuntimeError(
            f"Docker is not running. Please start Docker to use code execution with agent: {self.role}"
        ) from e
    except subprocess.TimeoutExpired as e:
        raise RuntimeError(
            f"Docker command timed out. Please check your Docker installation for agent: {self.role}"
        ) from e

This is good environment validation: fail fast when a feature can’t be safely supported. The trade‑off is startup latency and tight coupling—code execution concerns now live directly on the Agent, another sign of God object drift.

Design Lessons for Your Own Agents

The crewAI Agent gives us a concrete blueprint—both what to emulate and what to guard against—when designing orchestration layers for LLM systems.

1. Embrace the Facade, Fight the God Object

  • A rich Agent API like execute_task and kickoff is great for developer experience.
  • Continuously extract subsystems as they grow: guardrail processing, environment validation, kickoff orchestration, training data injection.
  • Keep main methods as high‑level narratives; push detailed logic into small, testable helpers or dedicated classes.

2. Make Retry and Guardrail Policies Explicit

  • Prefer loops over recursion for retries so control flow and stack traces stay readable.
  • Clearly define which exceptions are retried and which are not, and emit events for each retry decision.
  • Bound guardrail retries and expose them via metrics; don’t let them silently dominate your latency and cost.

3. Treat Agents as Single‑Tenant by Default

  • Avoid sharing one Agent across concurrent requests; mutable fields like tools, agent_executor, _times_executed, and _mcp_resolver are not thread‑safe.
  • If you must share, refactor runtime state into per‑request structures and keep the façade stateless.

4. Put Observability Beside Behavior, Not After It

  • Emit structured events for memory retrieval, knowledge queries, execution start/complete/error, and guardrail retries.
  • Back those events with metrics for latency, error counts, token usage, and guardrail retry rates so invisible complexity becomes visible.

5. Be Honest About Data and Security

  • Assume prompts, memories, and knowledge queries may contain PII and can leak via events; sanitize or filter in event subscribers.
  • Keep secrets out of events and logs; ensure tools and knowledge backends enforce their own access control.

The core takeaway from this class is simple: centralizing orchestration into one agent façade is extremely powerful, but without strict boundaries it will quietly turn into a God object that owns retries, guardrails, memory, knowledge, tools, platform checks, and more.

As you design your own agents or orchestration layers, keep asking: “Is this the air traffic controller, or am I secretly building the entire airport in one class?” If you keep the agent as a focused coordinator and push specialized behavior into dedicated components, you get both developer happiness and operational sanity.

Full Source Code

Direct source from the upstream repository. Preview it inline or open it on GitHub.

heads/main/lib/crewai/src/crewai/agent/core.py

crewAIInc/crewAI • refs

Choose one action below.

Open on GitHub

Thanks for reading! I hope this was useful. If you have questions or thoughts, feel free to reach out.

Content Creation Process: This article was generated via a semi-automated workflow using AI tools. I prepared the strategic framework, including specific prompts and data sources. From there, the automation system conducted the research, analysis, and writing. The content passed through automated verification steps before being finalized and published without manual intervention.

Mahmoud Zalt

About the Author

I’m Zalt, a technologist with 16+ years of experience, passionate about designing and building AI systems that move us closer to a world where machines handle everything and humans reclaim wonder.

Let's connect if you're working on interesting AI projects, looking for technical advice or want to discuss anything.

Support this content

Share this article

CONSULTING

Agent codebase getting tangled?

God objects in agent code are silent killers. An architecture review breaks the monolith into modules before it becomes untestable.