Intelligent, scalable content processing pipelines powered by Azure AI and orchestrated by Microsoft Agent Framework
ContentFlow is an enterprise-grade document and content processing platform that transforms unstructured content into intelligent, actionable data. It combines:
- π Orchestrated Workflows - YAML-based pipeline definitions with conditional routing and parallel execution
- π€ AI-Powered Processing - Integration with Azure AI services for document intelligence, embeddings, and analysis
- π¦ Modular Executors - 40+ pre-built processors for PDF, Word, Excel, PowerPoint, and more
- π Cloud-Native Architecture - Deployed on Azure Container Apps with distributed processing
- π» Intuitive Web UI - React-based interface for pipeline design and monitoring
- β‘ Scalable & Distributed - Multi-worker architecture for processing at scale
- Multi-Format Support: PDF, Word, Excel, PowerPoint, plain text, web content, audio, video
- OCR & Layout Analysis: Extract text from scanned documents with layout preservation
- Intelligent Extraction: Tables, images, metadata, document structure
- Content Understanding: Chunking, embedding generation, semantic analysis
- Knowledge Graphs: Extract and build relationships between entities
- Conditional Routing: Dynamic paths based on document properties
- Parallel Processing: Fan-out/fan-in patterns with result aggregation
- Batch Operations: Efficient processing of large document collections
- Sub-Pipelines: Hierarchical workflow composition for complex scenarios
- Error Handling: Automatic retry logic and graceful degradation
- Document Intelligence: Extract text, tables, key-value pairs from documents
- Embeddings: Generate semantic vectors for similarity search and RAG
- Content Analysis: Sentiment, entity extraction, topic classification
- Web Scraping: Dynamic content extraction with Playwright
- Azure AI Landing Zone Integration: Secure deployment within enterprise environments
- RBAC & Identity: Managed identities and role-based access control
- Audit & Monitoring: Comprehensive logging and Application Insights
- Data Isolation: Blob storage and Cosmos DB for persistent data management
- Azure subscription with necessary services configured
- Python 3.12+
- Docker (for running locally)
- Node.js 18+ (for web UI development)
Supports two modes:
- Basic mode for quick setup for development and testing
- Azure AI Landing Zone integrated mode for an Enterprise level deployment
β‘οΈ View deployment docs for more details
git clone https://github.com/Azure/contentflow
cd contentflow
# One-command deployment
azd up
# This will:
# 1. Provision Azure infrastructure (Container Apps, Storage, Cosmos DB, etc.)
# 2. Build and push container images
# 3. Deploy services
# 4. Configure post-deployment settings
# 5. Output service URLs# API service
cd contentflow-api
pip install -r requirements.txt
python main.py
# Worker service
cd contentflow-worker
pip install -r requirements.txt
python main.py
# Web UI
cd contentflow-web
npm install
npm run devScenario: Enterprise needs to digitize and catalog thousands of historical documents
Input Documents β PDF Extraction β OCR & Layout Analysis β Metadata Extraction β Full-Text Indexing β Archive Storage
Benefits: Searchable digital archives, compliance automation, instant retrieval
Scenario: Build a knowledge base from company documents for AI-powered Q&A
Documents β Chunking β Embedding Generation β Vector Search Indexing β LLM Query Augmentation
ContentFlow Powers: Batch processing thousands of documents, generating embeddings, storing in vector DB
Scenario: Extract financial data from quarterly reports, earnings calls, and regulatory filings
Financial Documents β Extract Tables β Parse Key Metrics β Classify Document Type β Store in Data Warehouse
Smart Features:
- Conditional routing based on document type
- Parallel processing of multiple sections
- Automatic retry on extraction failure
Scenario: Process product descriptions, images, and specifications across multiple formats
Product Files (PDF, DOC, XLSX) β Content Extraction β Image Processing β Standardization β Catalog Upload
Powered By: Batch operations, format-specific extractors, validation logic
Scenario: Convert paper records and scanned documents into structured patient data
Scanned Records β OCR β Medical Entity Extraction β HIPAA Compliance Validation β EHR Integration
Enterprise Features: Audit logging, encryption, RBAC, data isolation
Scenario: Crawl websites and aggregate news articles with AI analysis
Web URLs β Web Scraping β Content Extraction β Sentiment Analysis β Topic Classification β Distribution
Automation: Parallel scraping, conditional routing, scheduled execution
ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β ContentFlow Platform β
ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β User Interfaces β
ββββββββββββββββββββββββββββββββ¬ββββββββββββββββββββββββββββββββ€
β Web Dashboard (React) β REST API (FastAPI) β
β β’ Pipeline Designer β β’ Execute Pipelines β
β β’ Execution Monitoring β β’ Get Results β
β β’ Result Visualization β β’ Query History β
ββββββββββββββββββββββββββββββββ΄ββββββββββββββββββββββββββββββββ
β
ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β ContentFlow API Service β
β (containerapp-api) β
β β Pipeline Management & Execution β
β β Credential & Vault Integration β
β β Event Streaming & Monitoring β
ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β β
Config/Tasks Results & Events
β β
ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β ContentFlow Library (Core Engine) β
ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β Pipeline Factory β
β β’ Parses YAML configurations β
β β’ Validates executor dependencies β
β β’ Creates optimized execution graphs β
ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β 40+ Pre-Built Executors β
βββββββββββββββββββββββββββββ¬βββββββββββββββββββββββββββββββββββ€
β Input Executors β Processing Executors β
β β’ Azure Blob Discovery β β’ PDF Text Extraction β
β β’ Local File Reader β β’ Document Intelligence β
β β’ Web Scraper β β’ Image Processing β
β β’ Database Query β β’ Embeddings Generation β
βββββββββββββββββββββββββββββΌβββββββββββββββββββββββββββββββββββ€
β Routing Executors β Output Executors β
β β’ Conditional Router β β’ Azure Blob Writer β
β β’ Batch Splitter β β’ Cosmos DB Storage β
β β’ Parallel Executor β β’ CSV/JSON Export β
β β’ Merge Aggregator β β’ Search Index Upload β
βββββββββββββββββββββββββββββ΄βββββββββββββββββββββββββββββββββββ
β
ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β ContentFlow Worker Service β
β (containerapp-worker) β
β β Processes queued work items β
β β Manages worker pool & scaling β
β β Executes pipeline instances β
β β Handles failures & retries β
ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β
ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β Azure Services Integration β
βββββββββββββββββββββββββ¬ββββββββββββββββββ¬βββββββββββββββββββββ€
β Storage & Data β AI Services β Infrastructure β
β β’ Azure Blob Storage β β’ Document β β’ Container Apps β
β β’ Cosmos DB β Intelligence β β’ App Config β
β β’ Queue Storage β β’ AI Services β β’ Key Vault β
β β’ Search Index β β’ OpenAI/Models β β’ Log Analytics β
β β β’ Embeddings β β’ App Insights β
βββββββββββββββββββββββββ΄ββββββββββββββββββ΄βββββββββββββββββββββ
Web Dashboard (contentflow-web)
- Modern React application with Vite
- Visual pipeline designer with React Flow
- Real-time execution monitoring
- Results viewer with syntax highlighting
- Responsive Tailwind CSS design
- FastAPI REST endpoints for pipeline operations
- AsyncIO-based for high concurrency
- WebSocket support for real-time events
- Integration with Azure Key Vault for secrets
- CORS configured for web UI
Worker Service (contentflow-worker)
- Multi-threaded content processing engine
- Queue-based job distribution
- Automatic scaling based on load
- Health monitoring and graceful shutdown
- Error handling with exponential backoff
Core Library (contentflow-lib)
- Pipeline Factory: Compiles YAML to execution graphs
- Executor Framework: Base classes and 40+ implementations
- Content Models: Strongly-typed data structures
- Event System: Real-time pipeline execution tracking
- Plugin Architecture: Easy extension with custom executors
pipeline:
name: document_processing
description: "Process documents with intelligence"
executors:
- id: get_content
type: azure_blob_input_discovery
settings:
blob_storage_account: "${STORAGE_ACCOUNT}"
blob_container_name: "documents"
file_extensions: ".pdf,.docx"
- id: extract_text
type: azure_document_intelligence_extractor
settings:
doc_intelligence_endpoint: "${DOC_INT_ENDPOINT}"
- id: generate_embeddings
type: embeddings_executor
settings:
model: "text-embedding-3-large"
- id: store_results
type: cosmos_db_writer
settings:
database_name: "contentflow"
container_name: "documents"
# Execution sequence with conditional routing
edges:
- from: get_content
to: extract_text
- from: extract_text
to: generate_embeddings
condition: "output.pages > 0"
- from: generate_embeddings
to: store_resultsfrom contentflow.pipeline import PipelineExecutor
from contentflow.models import Content, ContentIdentifier
async with PipelineExecutor.from_config_file(
config_path="my_pipeline.yaml",
pipeline_name="document_processing"
) as executor:
# Create content to process
document = Content(
id=ContentIdentifier(
canonical_id="doc_001",
unique_id="doc_001",
source_name="azure_blob",
source_type="pdf",
path="documents/report.pdf"
)
)
# Execute pipeline
result = await executor.execute(document)
# Check results
print(f"Status: {result.status}")
print(f"Duration: {result.duration_seconds}s")
for event in result.events:
print(f" {event.executor_id}: {event.message}")contentflow/
βββ contentflow-api/ # FastAPI REST service
β βββ app/
β β βββ routers/ # API endpoint definitions
β β βββ services/ # Business logic
β β βββ dependencies.py # Dependency injection
β β βββ settings.py # Configuration
β βββ main.py # Application entry
β βββ Dockerfile
β
βββ contentflow-lib/ # Core processing library
β βββ contentflow/
β β βββ pipeline/ # Pipeline execution engine
β β βββ executors/ # 40+ executor implementations
β β βββ connectors/ # Data source connectors
β β βββ models/ # Data models
β β βββ utils/ # Utilities
β βββ samples/ # 20+ example pipelines
β βββ executor_catalog.yaml # Executor registry
β βββ pyproject.toml
β
βββ contentflow-web/ # React web dashboard
β βββ src/
β β βββ components/ # Reusable UI components
β β βββ pages/ # Page components
β β βββ hooks/ # Custom React hooks
β β βββ lib/ # Utilities & helpers
β βββ vite.config.ts # Build configuration
β βββ Dockerfile
β
βββ contentflow-worker/ # Processing worker service
β βββ app/
β β βββ engine.py # Worker engine
β β βββ api.py # Health/status endpoints
β β βββ settings.py # Configuration
β βββ main.py # Entry point
β βββ Dockerfile
β
βββ infra/ # Infrastructure as Code
βββ bicep/
β βββ main.bicep # Main template
β βββ modules/ # Reusable Bicep modules
βββ scripts/ # Deployment automation
ContentFlow includes 40+ pre-built executors for common content processing tasks:
azure_blob_input_discovery- Discover files in Blob Storagelocal_file_reader- Read files from local filesystemweb_scraper- Extract content from web pagesdatabase_query- Retrieve content from databases
azure_document_intelligence_extractor- Extract text, tables, layoutpdf_text_extractor- PDF-specific text extractionpdf_image_extractor- Extract images from PDFsword_document_extractor- Process Word documentsexcel_spreadsheet_extractor- Extract Excel datapowerpoint_extractor- Process PowerPoint presentations
text_chunker- Split text into optimal chunksembeddings_executor- Generate semantic embeddingsclassifier_executor- Text classificationentity_extractor- Named entity recognitionsentiment_analyzer- Sentiment analysistable_row_splitter- Extract table rows as documentsfield_transformer- Transform and normalize fields
conditional_router- Route based on conditionsbatch_splitter- Split large batchesparallel_executor- Execute in parallelmerge_aggregator- Aggregate results from parallel paths
azure_blob_writer- Write to Blob Storagecosmos_db_writer- Store in Cosmos DBsearch_index_writer- Index for search
ContentFlow comes with 20+ sample pipelines demonstrating various patterns:
| Sample | Demonstrates | Files |
|---|---|---|
01-simple |
Basic pipeline setup | Config + simple execution |
02-batch-processing |
Processing large collections | Batch splitting & aggregation |
03-pdf-extractor |
PDF content extraction | Multi-stage PDF processing |
04-word-extractor |
Word document processing | Document intelligence |
05-powerpoint-extractor |
PowerPoint analysis | Slide extraction |
06-ai-analysis |
AI-powered analysis | LLM integration |
07-embeddings |
Embedding generation | Vector search prep |
08-content-understanding |
Semantic analysis | Chunking & classification |
09-blob-input |
Blob storage integration | Cloud file discovery |
14-gpt-rag-ingestion |
RAG pipeline | GPT + embeddings |
15-document-analysis |
Advanced intelligence | Comprehensive analysis |
17-knowledge-graph |
Entity relationships | Graph construction |
18-web-scraping |
Web content extraction | Dynamic scraping |
32-parallel-processing |
Concurrent execution | Multi-path workflows |
44-conditional-routing |
Smart routing | Condition-based paths |
Run any sample:
cd contentflow-lib/samples/01-simple
python run.pyβ
Managed Identity Authentication - No exposed credentials
β
Azure Key Vault Integration - Secure secret storage
β
RBAC & Access Control - Fine-grained permissions
β
Encrypted Communication - TLS for all endpoints
β
Audit & Logging - Full audit trail with Application Insights
β
Data Isolation - Separate storage containers per tenant/environment
β
Compliance Ready - Supports HIPAA, SOC 2, GDPR patterns
| Metric | Capability |
|---|---|
| Throughput | 100+ documents/hour per worker |
| Concurrency | Unlimited parallel pipelines |
| Scaling | Auto-scale Container Apps based on queue depth |
| Latency | <1s for simple operations, <30s for complex AI |
| Reliability | Automatic retry, fault tolerance, graceful degradation |
| Storage | Unlimited with Blob Storage + Cosmos DB |
- Infrastructure Guide - Deploy to Azure
- API Documentation - REST endpoints
- Sample Pipelines - Learn by example
- Web UI Guide - Dashboard features
We welcome contributions! Please:
- Fork the repository
- Create a feature branch (
git checkout -b feature/amazing-feature) - Commit changes (
git commit -m 'Add amazing feature') - Push to branch (
git push origin feature/amazing-feature) - Open a Pull Request
This project is licensed under the MIT License - see the LICENSE file for details.
- Issues: Report bugs and request features on GitHub Issues
- Discussions: Ask questions and share ideas in Discussions
- Documentation: Check our comprehensive docs
- Examples: Explore sample pipelines
Deploy to Azure Β· View Samples Β· API Reference Β· Report Issue
Made with β€οΈ using Microsoft Azure & Agent Framework