Modularité
Chaque nœud est une fonction indépendante, facile à tester et à maintenir.
Cette page détaille l’architecture technique des graphes d’embedding — le cœur du système qui transforme n’importe quel contenu en vecteurs exploitables. Si vous n’êtes pas familier avec LangGraph, lisez d’abord la section explicative ci-dessous.
LangGraph est un framework Python développé par l’équipe LangChain pour construire des applications IA basées sur des graphes d’états. Pensez-y comme une machine à états augmentée pour l’IA.
| Concept | Description | Analogie |
|---|---|---|
| Node | Une fonction Python qui transforme l’état | Une étape dans une recette de cuisine |
| Edge | La connexion entre deux nodes | ”Après l’étape A, faire l’étape B” |
| Conditional Edge | Une connexion qui dépend de l’état | ”Si le gâteau est cuit, sortir du four, sinon continuer” |
| State | Un dictionnaire typé qui traverse le graphe | Les ingrédients qui évoluent à chaque étape |
# Exemple conceptuel : un graphe LangGraphworkflow = StateGraph(EmbeddingState)
# Chaque node est une fonction pure qui prend l'état et le retourne modifiéworkflow.add_node("input_router", detect_input_type)workflow.add_node("text_pipeline", process_text)workflow.add_node("pdf_pipeline", process_pdf)
# Les edges conditionnels permettent le branchement dynamiqueworkflow.add_conditional_edges( "input_router", lambda state: state["input_type"], # La fonction de routage {"text": "text_pipeline", "document": "pdf_pipeline"})
graph = workflow.compile() # Prêt à être invoqué !Le graphe principal pour le traitement d’une ressource unique. C’est le graphe que vous invoquez quand vous uploadez un fichier ou collez du texte.
Ce diagramme montre tous les nodes et leurs connexions. Les lignes pointillées indiquent des edges conditionnels (le chemin dépend de l’état) :
graph TD;
__start__([__start__]):::first
input_router(input_router)
audio_input_check(audio_input_check)
youtube_input_check(youtube_input_check)
image_input_check(image_input_check)
text_input_check(text_input_check)
website_input_check(website_input_check)
document_input_check(document_input_check)
detect_complex_content(detect_complex_content)
splitter_selection(splitter_selection)
splitter_application(splitter_application)
chunk_enrichment(chunk_enrichment)
embedding_generation(embedding_generation)
pdf_loader(pdf_loader)
pdf_cleaner(pdf_cleaner)
pdf_page_analyzer(pdf_page_analyzer)
pdf_processing_strategy(pdf_processing_strategy)
pdf_split_by_pages(pdf_split_by_pages)
pdf_split_by_size(pdf_split_by_size)
pdf_text_parser(pdf_text_parser)
pdf_ocr_processor(pdf_ocr_processor)
pdf_hybrid_processor(pdf_hybrid_processor)
pdf_content_merger(pdf_content_merger)
pdf_error_handler(pdf_error_handler)
image_loader(image_loader)
image_preprocessor(image_preprocessor)
image_vision_analyzer(image_vision_analyzer)
image_error_handler(image_error_handler)
audio_loader(audio_loader)
audio_splitter(audio_splitter)
audio_transcriber(audio_transcriber)
audio_error_handler(audio_error_handler)
youtube_transcript(youtube_transcript)
youtube_audio_download(youtube_audio_download)
youtube_error_handler(youtube_error_handler)
website_crawler(website_crawler)
website_content_cleaner(website_content_cleaner)
website_error_handler(website_error_handler)
__end__([__end__]):::last
__start__ --> input_router;
input_router -.-> audio_input_check;
input_router -.-> document_input_check;
input_router -.-> image_input_check;
input_router -.-> text_input_check;
input_router -.-> website_input_check;
input_router -.-> youtube_input_check;
audio_input_check --> audio_loader;
audio_loader -. continue .-> audio_splitter;
audio_loader -. error .-> audio_error_handler;
audio_splitter -. continue .-> audio_transcriber;
audio_splitter -. error .-> audio_error_handler;
audio_transcriber -. continue .-> detect_complex_content;
audio_transcriber -. error .-> audio_error_handler;
document_input_check --> pdf_loader;
pdf_loader -. continue .-> pdf_cleaner;
pdf_loader -. error .-> pdf_error_handler;
pdf_cleaner -. continue .-> pdf_page_analyzer;
pdf_cleaner -. error .-> pdf_error_handler;
pdf_page_analyzer -. continue .-> pdf_processing_strategy;
pdf_page_analyzer -. error .-> pdf_error_handler;
pdf_processing_strategy -. split_pages .-> pdf_split_by_pages;
pdf_processing_strategy -. split_size .-> pdf_split_by_size;
pdf_processing_strategy -. parse_only .-> pdf_text_parser;
pdf_processing_strategy -. ocr_only .-> pdf_ocr_processor;
pdf_processing_strategy -. hybrid .-> pdf_hybrid_processor;
pdf_split_by_pages -. parse_only .-> pdf_text_parser;
pdf_split_by_pages -. ocr_only .-> pdf_ocr_processor;
pdf_split_by_pages -. hybrid .-> pdf_hybrid_processor;
pdf_split_by_size -. parse_only .-> pdf_text_parser;
pdf_split_by_size -. ocr_only .-> pdf_ocr_processor;
pdf_split_by_size -. hybrid .-> pdf_hybrid_processor;
pdf_text_parser --> pdf_content_merger;
pdf_ocr_processor --> pdf_content_merger;
pdf_hybrid_processor --> pdf_content_merger;
pdf_content_merger --> detect_complex_content;
image_input_check --> image_loader;
image_loader -. continue .-> image_preprocessor;
image_loader -. error .-> image_error_handler;
image_preprocessor -. continue .-> image_vision_analyzer;
image_preprocessor -. error .-> image_error_handler;
image_vision_analyzer -. continue .-> detect_complex_content;
image_vision_analyzer -. error .-> image_error_handler;
youtube_input_check -. continue .-> youtube_transcript;
youtube_input_check -. error .-> youtube_error_handler;
youtube_transcript -. chunking .-> detect_complex_content;
youtube_transcript -. fallback_audio .-> youtube_audio_download;
youtube_transcript -. error .-> youtube_error_handler;
youtube_audio_download -. continue .-> audio_splitter;
youtube_audio_download -. error .-> youtube_error_handler;
website_input_check -. continue .-> website_crawler;
website_input_check -. error .-> website_error_handler;
website_crawler -. continue .-> website_content_cleaner;
website_crawler -. error .-> website_error_handler;
website_content_cleaner -. continue .-> detect_complex_content;
website_content_cleaner -. error .-> website_error_handler;
text_input_check --> detect_complex_content;
detect_complex_content --> splitter_selection;
splitter_selection --> splitter_application;
splitter_application -. enrich .-> chunk_enrichment;
splitter_application -. skip .-> embedding_generation;
chunk_enrichment --> embedding_generation;
embedding_generation --> __end__;
audio_error_handler --> __end__;
image_error_handler --> __end__;
pdf_error_handler --> __end__;
website_error_handler --> __end__;
youtube_error_handler --> __end__;
classDef default fill:#5e35b1,line-height:1.2
classDef first fill-opacity:0
classDef last fill:#7c4dff Le graphe ci-dessus peut sembler complexe, mais il suit une logique simple :
__start__) → Routeur : Le graphe commence et détecte le type d’inputL’état est un TypedDict Python qui contient toutes les données qui traversent le graphe. Chaque node lit l’état, le modifie, et le retourne. Voici la structure complète avec des explications :
class EmbeddingState(TypedDict): """État principal qui traverse le graphe d'embedding.
Cet état est passé de node en node. Chaque node peut lire et modifier les champs dont il a besoin. LangGraph gère automatiquement la fusion des modifications. """
# === Input (défini au départ) === input_type: InputType # Type détecté par input_router sentences: List[str] # Si TEXT : les phrases à traiter url_resource: str # Si DOCUMENT/IMAGE/etc : URL S3 de la ressource language: str # Langue pour les prompts ("fr", "en"...)
# === Processing (modifié pendant le traitement) === extracted_content: str # Texte extrait de la source (rempli par le pipeline) chunks: List[str] # Texte découpé en segments (après chunking) has_complex_content: bool # True si tables/code/formules détectés
# === PDF Specific (uniquement pour les PDFs) === pdf_pages: List[Dict] # Métadonnées de chaque page processing_strategy: str # "parse_only", "ocr_only", "hybrid"
# === Output (résultat final) === embedded_chunks: List[Dict] # Les chunks avec leurs embeddings error: Optional[str] # Message d'erreur si échec
# === Metadata (statistiques) === total_tokens: int # Tokens traités processing_time: float # Temps de traitement embedding_dimension: int # Dimension (1024)from langgraph.graph import StateGraph, END
def create_embedding_graph(): """Construit le graphe d'embedding"""
workflow = StateGraph(EmbeddingState)
# === Nœuds principaux === workflow.add_node("input_router", input_router) workflow.add_node("text_input_check", text_input_check) workflow.add_node("detect_complex_content", detect_complex_content) workflow.add_node("splitter_selection", splitter_selection) workflow.add_node("splitter_application", splitter_application) workflow.add_node("chunk_enrichment", chunk_enrichment) workflow.add_node("embedding_generation", embedding_generation)
# === Nœuds PDF === workflow.add_node("document_input_check", document_input_check) workflow.add_node("pdf_loader", pdf_loader) workflow.add_node("pdf_cleaner", pdf_cleaner) # ... autres nœuds PDF
# === Arêtes === workflow.add_edge("__start__", "input_router")
# Routage conditionnel selon le type d'input workflow.add_conditional_edges( "input_router", route_by_input_type, { "text": "text_input_check", "document": "document_input_check", "image": "image_input_check", "audio": "audio_input_check", "youtube": "youtube_input_check", "website": "website_input_check", } )
# Arête conditionnelle pour l'enrichissement workflow.add_conditional_edges( "splitter_application", should_enrich, { "enrich": "chunk_enrichment", "skip": "embedding_generation" } )
workflow.add_edge("embedding_generation", END)
return workflow.compile()
# Export du graphe compilégraph = create_embedding_graph()Permet de traiter plusieurs ressources en une seule invocation. Chaque ressource est traitée via le graphe d’embedding individuel puis les résultats sont agrégés.
graph TD;
__start__([__start__]):::first
validate_batch(validate_batch)
process_batch(process_batch)
__end__([__end__]):::last
__start__ --> validate_batch;
validate_batch -. error .-> __end__;
validate_batch -. continue .-> process_batch;
process_batch --> __end__;
classDef default fill:#5e35b1,line-height:1.2
classDef first fill-opacity:0
classDef last fill:#7c4dff sequenceDiagram
participant Client
participant BatchGraph
participant Validator
participant EmbeddingGraph
participant Aggregator
Client->>BatchGraph: resources[]
BatchGraph->>Validator: validate_batch_input()
loop Pour chaque ressource
Validator->>EmbeddingGraph: process(resource)
EmbeddingGraph-->>Aggregator: embedded_chunks
end
Aggregator->>Client: all_embedded_chunks + stats class ResourceInput(TypedDict): """Définition d'une ressource à traiter""" input_type: InputType sentences: NotRequired[List[str]] # Pour TEXT url_resource: NotRequired[str] # Pour DOCUMENT, IMAGE, etc.
class BatchEmbeddingState(TypedDict): """State pour le traitement par batch""" resources: List[ResourceInput] # Liste des ressources processing_results: List[Dict] # Résultats par ressource all_embedded_chunks: List[Dict] # Tous les chunks embeddés total_chunks: int # Nombre total de chunks processing_stats: Dict # Stats globalesdef create_batch_embedding_graph(): """Construit le graphe de traitement batch"""
workflow = StateGraph(BatchEmbeddingState)
workflow.add_node("validate_batch", validate_batch_input) workflow.add_node("process_batch", process_batch_resources)
workflow.add_edge("__start__", "validate_batch") workflow.add_conditional_edges( "validate_batch", check_validation, { "continue": "process_batch", "error": END } ) workflow.add_edge("process_batch", END)
return workflow.compile()
batch_graph = create_batch_embedding_graph()Le nœud process_batch utilise asyncio.gather pour traiter les ressources en parallèle :
async def process_batch_resources(state: BatchEmbeddingState) -> BatchEmbeddingState: """Traite toutes les ressources en parallèle"""
# Import du graphe d'embedding from src.graphs.embedding import graph
tasks = [] for resource in state["resources"]: task = asyncio.create_task( graph.ainvoke({ "input_type": resource["input_type"], "sentences": resource.get("sentences"), "url_resource": resource.get("url_resource"), "language": state.get("language", "fr") }) ) tasks.append(task)
results = await asyncio.gather(*tasks, return_exceptions=True)
# Agrégation des résultats all_chunks = [] processing_results = []
for i, result in enumerate(results): if isinstance(result, Exception): processing_results.append({ "resource_index": i, "status": "error", "error": str(result) }) else: chunks = result.get("embedded_chunks", []) all_chunks.extend(chunks) processing_results.append({ "resource_index": i, "status": "success", "chunks_count": len(chunks) })
return { **state, "all_embedded_chunks": all_chunks, "total_chunks": len(all_chunks), "processing_results": processing_results, "processing_stats": { "total_resources": len(state["resources"]), "successful": sum(1 for r in processing_results if r["status"] == "success"), "failed": sum(1 for r in processing_results if r["status"] == "error") } }from src.graphs.embedding import graphfrom src.models.input_type import InputType
# Traitement d'un texteresult = await graph.ainvoke({ "input_type": InputType.TEXT, "sentences": [ "La photosynthèse est le processus...", "Les plantes utilisent la lumière..." ], "language": "fr"})
embedded_chunks = result["embedded_chunks"]print(f"Chunks générés: {len(embedded_chunks)}")from src.graphs.batch_embedding import batch_graphfrom src.models.input_type import InputType
# Traitement de plusieurs ressourcesresult = await batch_graph.ainvoke({ "resources": [ { "input_type": InputType.TEXT, "sentences": ["Premier texte..."] }, { "input_type": InputType.DOCUMENT, "url_resource": "s3://bucket/document.pdf" }, { "input_type": InputType.YOUTUBE, "url_resource": "https://youtube.com/watch?v=xxx" } ], "language": "fr"})
all_chunks = result["all_embedded_chunks"]stats = result["processing_stats"]print(f"Total chunks: {len(all_chunks)}")print(f"Succès: {stats['successful']}/{stats['total_resources']}")src/graphs/├── embedding.py # 🎯 Graph principal (export: graph)├── batch_embedding.py # 📦 Graph batch (export: batch_graph)└── states/ └── embedding_state.py # États TypedDictModularité
Chaque nœud est une fonction indépendante, facile à tester et à maintenir.
Visibilité
Les graphes sont visualisables, facilitant le debugging et la documentation.
Flexibilité
Les arêtes conditionnelles permettent des workflows complexes et adaptatifs.
Traçabilité
Chaque étape du traitement est loggée et peut être inspectée.
Architecture LangGraph pour un traitement de contenu robuste et scalable.