Ahmad HumayunGet in touch
AI Engineering

Building a Multi-Agent Marketing AI System with LangGraph and Gemini

How to architect a LangGraph system that routes 40+ marketing performance rules across parallel AI domain workflows using a 3-tier classifier and Gemini.

When you build a marketing AI system that checks 40+ rules across multiple domains, the first instinct is one graph that does everything.

Classification. Interpretation. Recommendation. All in one flow.

At low rule counts, this works fine.

Past a certain complexity, it breaks down.

Prompts grow enormous. Response quality drops. When something goes wrong, you cannot tell which part caused it.

Here is how I restructured it.

The problem with one graph

Marketing performance analysis covers multiple domains.

Media efficiency. Creative performance. Budget pacing. Audience quality. Conversion tracking. Platform issues.

Each domain needs different context, different signals, and different response patterns.

When you route all 40+ rules through one LLM call, the LLM has to hold media CPM thresholds and creative fatigue signals and pacing curves and audience overlap all at once.

It does not handle that well.

Output becomes generic. Recommendations lose specificity. And when one domain produces a bad result, it bleeds into the others.

The structure I use

I split it into a main_graph that routes rules to one of six domain sub-workflows.

main_graph
├── classify rules → assign domain
├── dispatch → 6 domain sub-workflows (parallel)
│   ├── media_performance_graph
│   ├── creative_graph
│   ├── budget_graph
│   ├── audience_graph
│   ├── conversion_graph
│   └── platform_graph
└── collect + merge outputs

Each sub-workflow is its own LangGraph graph.

It receives only the rules and data relevant to its domain.

It runs independently, without knowing what the other domains are doing.

The dispatch uses asyncio.gather(), so all six run in parallel.

Total wall time is roughly the slowest sub-workflow, not the sum of all six.

Classifying rules before dispatch

Before a rule can be dispatched, it needs a domain assignment.

I do this in three tiers.

Tier 1: Category lookup

Most rules have a category column that maps cleanly to a domain.

def classify_by_category(rule: Rule) -> str | None:
    mapping = {
        "MEDIA": "media_performance",
        "CREATIVE": "creative",
        "BUDGET": "budget",
    }
    return mapping.get(rule.category)

No LLM call needed. This handles most of them.

Tier 2: LLM classification with cache

For rules without a clean category, Gemini classifies the rule from its description.

Results are cached by rule ID. Same rule never costs two LLM calls.

@lru_cache(maxsize=512)
def classify_with_llm(rule_id: str, description: str) -> str:
    response = gemini.generate(
        prompt=CLASSIFIER_PROMPT.format(description=description),
        response_schema=DomainClassification,
    )
    return response.domain

Tier 3: Heuristic fallback

If the LLM call fails or returns an unrecognized domain, a keyword-based fallback assigns the rule to the most plausible domain.

One classification failure does not crash the run.

Why LangGraph instead of plain Python

Three specific things LangGraph gives me that plain Python does not.

Conditional edges with typed state. Each node operates on a typed GraphState. Conditional edges let the orchestrator branch to different sub-workflows cleanly.

main_graph.add_conditional_edges(
    "classify",
    route_to_domain,
    {
        "media_performance": "media_performance_graph",
        "creative": "creative_graph",
    }
)

Error isolation. When a domain sub-workflow raises an exception, LangGraph's error edge catches it, logs the failure, writes a fallback output, and continues with the other domains. One broken domain does not abort the full run.

Observable execution. LangGraph exposes intermediate node outputs. Combined with Langfuse tracing, every classification decision and every LLM call is logged with its full input/output. When something goes wrong in production, you can see exactly what happened.

Celery integration

The LangGraph system runs as a Celery task. One task per advertiser per analysis run.

A few things this enables in practice:

Per-advertiser parallelism. Multiple advertisers process concurrently. One slow advertiser does not block others.

Timezone-aware scheduling. Each advertiser's analysis runs at their local morning, not a fixed UTC time. Yesterday's data needs to settle before the analysis runs against it.

Kill switch. A Redis flag stops task execution for an advertiser without a code deployment. Useful during data quality incidents.

RedBeat. Celery beat schedules live in Redis, not a config file. Per-advertiser schedules can be created, updated, or deleted through the API at runtime.

@celery_app.task(bind=True, max_retries=3)
def run_analysis_pipeline(self, advertiser_id: str) -> dict:
    if redis_client.get(f"kill_switch:{advertiser_id}"):
        return {"status": "skipped", "reason": "kill_switch"}
 
    graph = build_main_graph(advertiser_id)
    result = graph.invoke({"advertiser_id": advertiser_id})
    return result

Prompt management

Prompts for each domain sub-workflow live in Langfuse, not in the codebase.

Prompt updates deploy without a code release. Every version is tracked. A/B testing is a Langfuse configuration change, not a PR.

prompt = langfuse_client.get_prompt(
    name=f"{domain}_analysis_prompt",
    label="production"
)

What makes this maintainable at scale

A few decisions that held up well in production.

Classification is cheap. Most rules hit tier 1 or the cached tier 2 result. Fresh Gemini classification calls are the exception.

Domain sub-workflows are independent. Adding a new domain means writing a new sub-graph. Existing code is not touched.

Deterministic steps stay out of the LLM. Routing, state decisions, and output schemas are computed in Python. Only interpretation and language generation use Gemini.

Failure is isolated. A broken domain workflow does not propagate to others.

The same pattern works at much smaller scale. Even with 5-10 rules and 2-3 domains, separating classification from generation and running workflows in parallel makes things easier to debug and easier to extend.

Stack

  • Orchestration: LangGraph
  • LLM: Gemini 2.5 Pro
  • Background tasks: Celery + Redis + RedBeat
  • Prompt management: Langfuse
  • Observability: OpenTelemetry + Langfuse traces
  • Data warehouse: BigQuery

Related posts in this series:

If you are building a similar system and want to talk through the architecture, get in touch.

AH

Ahmad Humayun

Data Engineering Consultant

Freelance data engineering consultant specialising in BigQuery, Dataform/dbt, marketing data pipelines, API automation, and AI-ready analytics layers. Based in Lahore, Pakistan — available worldwide.

Working through a messy reporting workflow, API integration, or BigQuery pipeline?

I can help design and build the reliable version.