Skip to content

Graphes LangGraph

LangGraph StateGraph Async

Cette page détaille l’architecture technique des graphes d’embedding — le cœur du système qui transforme n’importe quel contenu en vecteurs exploitables. Si vous n’êtes pas familier avec LangGraph, lisez d’abord la section explicative ci-dessous.

LangGraph est un framework Python développé par l’équipe LangChain pour construire des applications IA basées sur des graphes d’états. Pensez-y comme une machine à états augmentée pour l’IA.

ConceptDescriptionAnalogie
NodeUne fonction Python qui transforme l’étatUne étape dans une recette de cuisine
EdgeLa connexion entre deux nodes”Après l’étape A, faire l’étape B”
Conditional EdgeUne connexion qui dépend de l’état”Si le gâteau est cuit, sortir du four, sinon continuer”
StateUn dictionnaire typé qui traverse le grapheLes ingrédients qui évoluent à chaque étape

Pourquoi LangGraph plutôt que des chaînes linéaires ?

Section titled “Pourquoi LangGraph plutôt que des chaînes linéaires ?”
# Exemple conceptuel : un graphe LangGraph
workflow = StateGraph(EmbeddingState)
# Chaque node est une fonction pure qui prend l'état et le retourne modifié
workflow.add_node("input_router", detect_input_type)
workflow.add_node("text_pipeline", process_text)
workflow.add_node("pdf_pipeline", process_pdf)
# Les edges conditionnels permettent le branchement dynamique
workflow.add_conditional_edges(
"input_router",
lambda state: state["input_type"], # La fonction de routage
{"text": "text_pipeline", "document": "pdf_pipeline"}
)
graph = workflow.compile() # Prêt à être invoqué !

Le graphe principal pour le traitement d’une ressource unique. C’est le graphe que vous invoquez quand vous uploadez un fichier ou collez du texte.

  1. Détecte automatiquement le type d’input (PDF, image, audio, etc.)
  2. Route vers le pipeline spécialisé approprié
  3. Extrait le contenu textuel de la source
  4. Découpe le texte en chunks de taille optimale
  5. Enrichit les chunks complexes (optionnel, via LLM)
  6. Génère les embeddings vectoriels avec Mistral

Ce diagramme montre tous les nodes et leurs connexions. Les lignes pointillées indiquent des edges conditionnels (le chemin dépend de l’état) :

Embedding Graph complet
100%
graph TD;
    __start__([__start__]):::first
    input_router(input_router)
    audio_input_check(audio_input_check)
    youtube_input_check(youtube_input_check)
    image_input_check(image_input_check)
    text_input_check(text_input_check)
    website_input_check(website_input_check)
    document_input_check(document_input_check)
    detect_complex_content(detect_complex_content)
    splitter_selection(splitter_selection)
    splitter_application(splitter_application)
    chunk_enrichment(chunk_enrichment)
    embedding_generation(embedding_generation)
    pdf_loader(pdf_loader)
    pdf_cleaner(pdf_cleaner)
    pdf_page_analyzer(pdf_page_analyzer)
    pdf_processing_strategy(pdf_processing_strategy)
    pdf_split_by_pages(pdf_split_by_pages)
    pdf_split_by_size(pdf_split_by_size)
    pdf_text_parser(pdf_text_parser)
    pdf_ocr_processor(pdf_ocr_processor)
    pdf_hybrid_processor(pdf_hybrid_processor)
    pdf_content_merger(pdf_content_merger)
    pdf_error_handler(pdf_error_handler)
    image_loader(image_loader)
    image_preprocessor(image_preprocessor)
    image_vision_analyzer(image_vision_analyzer)
    image_error_handler(image_error_handler)
    audio_loader(audio_loader)
    audio_splitter(audio_splitter)
    audio_transcriber(audio_transcriber)
    audio_error_handler(audio_error_handler)
    youtube_transcript(youtube_transcript)
    youtube_audio_download(youtube_audio_download)
    youtube_error_handler(youtube_error_handler)
    website_crawler(website_crawler)
    website_content_cleaner(website_content_cleaner)
    website_error_handler(website_error_handler)
    __end__([__end__]):::last
    
    __start__ --> input_router;
    input_router -.-> audio_input_check;
    input_router -.-> document_input_check;
    input_router -.-> image_input_check;
    input_router -.-> text_input_check;
    input_router -.-> website_input_check;
    input_router -.-> youtube_input_check;
    
    audio_input_check --> audio_loader;
    audio_loader -. continue .-> audio_splitter;
    audio_loader -. error .-> audio_error_handler;
    audio_splitter -. continue .-> audio_transcriber;
    audio_splitter -. error .-> audio_error_handler;
    audio_transcriber -. continue .-> detect_complex_content;
    audio_transcriber -. error .-> audio_error_handler;
    
    document_input_check --> pdf_loader;
    pdf_loader -. continue .-> pdf_cleaner;
    pdf_loader -. error .-> pdf_error_handler;
    pdf_cleaner -. continue .-> pdf_page_analyzer;
    pdf_cleaner -. error .-> pdf_error_handler;
    pdf_page_analyzer -. continue .-> pdf_processing_strategy;
    pdf_page_analyzer -. error .-> pdf_error_handler;
    pdf_processing_strategy -. split_pages .-> pdf_split_by_pages;
    pdf_processing_strategy -. split_size .-> pdf_split_by_size;
    pdf_processing_strategy -. parse_only .-> pdf_text_parser;
    pdf_processing_strategy -. ocr_only .-> pdf_ocr_processor;
    pdf_processing_strategy -. hybrid .-> pdf_hybrid_processor;
    pdf_split_by_pages -. parse_only .-> pdf_text_parser;
    pdf_split_by_pages -. ocr_only .-> pdf_ocr_processor;
    pdf_split_by_pages -. hybrid .-> pdf_hybrid_processor;
    pdf_split_by_size -. parse_only .-> pdf_text_parser;
    pdf_split_by_size -. ocr_only .-> pdf_ocr_processor;
    pdf_split_by_size -. hybrid .-> pdf_hybrid_processor;
    pdf_text_parser --> pdf_content_merger;
    pdf_ocr_processor --> pdf_content_merger;
    pdf_hybrid_processor --> pdf_content_merger;
    pdf_content_merger --> detect_complex_content;
    
    image_input_check --> image_loader;
    image_loader -. continue .-> image_preprocessor;
    image_loader -. error .-> image_error_handler;
    image_preprocessor -. continue .-> image_vision_analyzer;
    image_preprocessor -. error .-> image_error_handler;
    image_vision_analyzer -. continue .-> detect_complex_content;
    image_vision_analyzer -. error .-> image_error_handler;
    
    youtube_input_check -. continue .-> youtube_transcript;
    youtube_input_check -. error .-> youtube_error_handler;
    youtube_transcript -. chunking .-> detect_complex_content;
    youtube_transcript -. fallback_audio .-> youtube_audio_download;
    youtube_transcript -. error .-> youtube_error_handler;
    youtube_audio_download -. continue .-> audio_splitter;
    youtube_audio_download -. error .-> youtube_error_handler;
    
    website_input_check -. continue .-> website_crawler;
    website_input_check -. error .-> website_error_handler;
    website_crawler -. continue .-> website_content_cleaner;
    website_crawler -. error .-> website_error_handler;
    website_content_cleaner -. continue .-> detect_complex_content;
    website_content_cleaner -. error .-> website_error_handler;
    
    text_input_check --> detect_complex_content;
    detect_complex_content --> splitter_selection;
    splitter_selection --> splitter_application;
    splitter_application -. enrich .-> chunk_enrichment;
    splitter_application -. skip .-> embedding_generation;
    chunk_enrichment --> embedding_generation;
    embedding_generation --> __end__;
    
    audio_error_handler --> __end__;
    image_error_handler --> __end__;
    pdf_error_handler --> __end__;
    website_error_handler --> __end__;
    youtube_error_handler --> __end__;
    
    classDef default fill:#5e35b1,line-height:1.2
    classDef first fill-opacity:0
    classDef last fill:#7c4dff

Le graphe ci-dessus peut sembler complexe, mais il suit une logique simple :

  1. Entrée (__start__) → Routeur : Le graphe commence et détecte le type d’input
  2. RouteurPipeline spécialisé : Selon le type (text, document, image…), on branche
  3. PipelineTraitement commun : Après extraction, on rejoint le flux commun
  4. Traitement communSortie : Chunking, enrichissement optionnel, embedding, fin

L’état est un TypedDict Python qui contient toutes les données qui traversent le graphe. Chaque node lit l’état, le modifie, et le retourne. Voici la structure complète avec des explications :

class EmbeddingState(TypedDict):
"""État principal qui traverse le graphe d'embedding.
Cet état est passé de node en node. Chaque node peut lire et modifier
les champs dont il a besoin. LangGraph gère automatiquement la fusion
des modifications.
"""
# === Input (défini au départ) ===
input_type: InputType # Type détecté par input_router
sentences: List[str] # Si TEXT : les phrases à traiter
url_resource: str # Si DOCUMENT/IMAGE/etc : URL S3 de la ressource
language: str # Langue pour les prompts ("fr", "en"...)
# === Processing (modifié pendant le traitement) ===
extracted_content: str # Texte extrait de la source (rempli par le pipeline)
chunks: List[str] # Texte découpé en segments (après chunking)
has_complex_content: bool # True si tables/code/formules détectés
# === PDF Specific (uniquement pour les PDFs) ===
pdf_pages: List[Dict] # Métadonnées de chaque page
processing_strategy: str # "parse_only", "ocr_only", "hybrid"
# === Output (résultat final) ===
embedded_chunks: List[Dict] # Les chunks avec leurs embeddings
error: Optional[str] # Message d'erreur si échec
# === Metadata (statistiques) ===
total_tokens: int # Tokens traités
processing_time: float # Temps de traitement
embedding_dimension: int # Dimension (1024)
from langgraph.graph import StateGraph, END
def create_embedding_graph():
"""Construit le graphe d'embedding"""
workflow = StateGraph(EmbeddingState)
# === Nœuds principaux ===
workflow.add_node("input_router", input_router)
workflow.add_node("text_input_check", text_input_check)
workflow.add_node("detect_complex_content", detect_complex_content)
workflow.add_node("splitter_selection", splitter_selection)
workflow.add_node("splitter_application", splitter_application)
workflow.add_node("chunk_enrichment", chunk_enrichment)
workflow.add_node("embedding_generation", embedding_generation)
# === Nœuds PDF ===
workflow.add_node("document_input_check", document_input_check)
workflow.add_node("pdf_loader", pdf_loader)
workflow.add_node("pdf_cleaner", pdf_cleaner)
# ... autres nœuds PDF
# === Arêtes ===
workflow.add_edge("__start__", "input_router")
# Routage conditionnel selon le type d'input
workflow.add_conditional_edges(
"input_router",
route_by_input_type,
{
"text": "text_input_check",
"document": "document_input_check",
"image": "image_input_check",
"audio": "audio_input_check",
"youtube": "youtube_input_check",
"website": "website_input_check",
}
)
# Arête conditionnelle pour l'enrichissement
workflow.add_conditional_edges(
"splitter_application",
should_enrich,
{
"enrich": "chunk_enrichment",
"skip": "embedding_generation"
}
)
workflow.add_edge("embedding_generation", END)
return workflow.compile()
# Export du graphe compilé
graph = create_embedding_graph()

Permet de traiter plusieurs ressources en une seule invocation. Chaque ressource est traitée via le graphe d’embedding individuel puis les résultats sont agrégés.

Batch Embedding Graph
100%
graph TD;
    __start__([__start__]):::first
    validate_batch(validate_batch)
    process_batch(process_batch)
    __end__([__end__]):::last
    
    __start__ --> validate_batch;
    validate_batch -. error .-> __end__;
    validate_batch -. continue .-> process_batch;
    process_batch --> __end__;
    
    classDef default fill:#5e35b1,line-height:1.2
    classDef first fill-opacity:0
    classDef last fill:#7c4dff
Flux Batch Sequence
100%
sequenceDiagram
    participant Client
    participant BatchGraph
    participant Validator
    participant EmbeddingGraph
    participant Aggregator
    
    Client->>BatchGraph: resources[]
    BatchGraph->>Validator: validate_batch_input()
    
    loop Pour chaque ressource
        Validator->>EmbeddingGraph: process(resource)
        EmbeddingGraph-->>Aggregator: embedded_chunks
    end
    
    Aggregator->>Client: all_embedded_chunks + stats
class ResourceInput(TypedDict):
"""Définition d'une ressource à traiter"""
input_type: InputType
sentences: NotRequired[List[str]] # Pour TEXT
url_resource: NotRequired[str] # Pour DOCUMENT, IMAGE, etc.
class BatchEmbeddingState(TypedDict):
"""State pour le traitement par batch"""
resources: List[ResourceInput] # Liste des ressources
processing_results: List[Dict] # Résultats par ressource
all_embedded_chunks: List[Dict] # Tous les chunks embeddés
total_chunks: int # Nombre total de chunks
processing_stats: Dict # Stats globales
def create_batch_embedding_graph():
"""Construit le graphe de traitement batch"""
workflow = StateGraph(BatchEmbeddingState)
workflow.add_node("validate_batch", validate_batch_input)
workflow.add_node("process_batch", process_batch_resources)
workflow.add_edge("__start__", "validate_batch")
workflow.add_conditional_edges(
"validate_batch",
check_validation,
{
"continue": "process_batch",
"error": END
}
)
workflow.add_edge("process_batch", END)
return workflow.compile()
batch_graph = create_batch_embedding_graph()

Le nœud process_batch utilise asyncio.gather pour traiter les ressources en parallèle :

async def process_batch_resources(state: BatchEmbeddingState) -> BatchEmbeddingState:
"""Traite toutes les ressources en parallèle"""
# Import du graphe d'embedding
from src.graphs.embedding import graph
tasks = []
for resource in state["resources"]:
task = asyncio.create_task(
graph.ainvoke({
"input_type": resource["input_type"],
"sentences": resource.get("sentences"),
"url_resource": resource.get("url_resource"),
"language": state.get("language", "fr")
})
)
tasks.append(task)
results = await asyncio.gather(*tasks, return_exceptions=True)
# Agrégation des résultats
all_chunks = []
processing_results = []
for i, result in enumerate(results):
if isinstance(result, Exception):
processing_results.append({
"resource_index": i,
"status": "error",
"error": str(result)
})
else:
chunks = result.get("embedded_chunks", [])
all_chunks.extend(chunks)
processing_results.append({
"resource_index": i,
"status": "success",
"chunks_count": len(chunks)
})
return {
**state,
"all_embedded_chunks": all_chunks,
"total_chunks": len(all_chunks),
"processing_results": processing_results,
"processing_stats": {
"total_resources": len(state["resources"]),
"successful": sum(1 for r in processing_results if r["status"] == "success"),
"failed": sum(1 for r in processing_results if r["status"] == "error")
}
}

from src.graphs.embedding import graph
from src.models.input_type import InputType
# Traitement d'un texte
result = await graph.ainvoke({
"input_type": InputType.TEXT,
"sentences": [
"La photosynthèse est le processus...",
"Les plantes utilisent la lumière..."
],
"language": "fr"
})
embedded_chunks = result["embedded_chunks"]
print(f"Chunks générés: {len(embedded_chunks)}")

src/graphs/
├── embedding.py # 🎯 Graph principal (export: graph)
├── batch_embedding.py # 📦 Graph batch (export: batch_graph)
└── states/
└── embedding_state.py # États TypedDict

Modularité

Chaque nœud est une fonction indépendante, facile à tester et à maintenir.

Visibilité

Les graphes sont visualisables, facilitant le debugging et la documentation.

Flexibilité

Les arêtes conditionnelles permettent des workflows complexes et adaptatifs.

Traçabilité

Chaque étape du traitement est loggée et peut être inspectée.


Architecture LangGraph pour un traitement de contenu robuste et scalable.