Skip to main content
This cookbook walks you through building a production-ready RAG (Retrieval-Augmented Generation) pipeline. You’ll ingest documents from Amazon S3, extract structured metadata, and load everything into Qdrant for semantic search.

What You’ll Build

Prerequisites

  • An S3 bucket with PDF documents
  • A Qdrant instance (cloud or self-hosted)
  • Python 3.9+
pip install unstructured-sdk

Complete Pipeline

from unstructured import UnstructuredClient

# Initialize the client
client = UnstructuredClient(
    username="your-username",
    password="your-password",
)

# ============================================
# Step 1: Connect to S3 (Source)
# ============================================
s3_connector = client.data_source.create(
    connector_name="my-document-bucket",
    connector_body={
        "vector_db_type": "s3",
        "bucket_name": "company-documents",
        "aws_access_key_id": "YOUR_ACCESS_KEY",
        "aws_secret_access_key": "YOUR_SECRET_KEY",
        "region": "us-east-1",
        "prefix": "contracts/",  # Optional: only process files in this folder
    },
)
print(f"✓ Connected to S3: {s3_connector.profile_id}")

# ============================================
# Step 2: Connect to Qdrant (Destination)
# ============================================
qdrant_dest = client.destination.create(
    destination_name="rag-vectors",
    destination_body={
        "vector_db_type": "qdrant",
        "url": "https://your-cluster.qdrant.io",
        "api_key": "YOUR_QDRANT_API_KEY",
        "collection_name": "document_chunks",
    },
)
print(f"✓ Connected to Qdrant: {qdrant_dest.destination_id}")

# ============================================
# Step 3: Define Your Taxonomy
# ============================================
taxonomy = client.taxonomy.upsert(
    taxonomy_name="contract-analysis",
    taxonomy_description="Extract key information from legal contracts",
    tags=[
        {
            "name": "contract_type",
            "description": "Type of contract (NDA, MSA, SLA, Employment, etc.)",
            "output_type": "word",
        },
        {
            "name": "parties",
            "description": "Names of all parties involved in the contract",
            "output_type": "list[string]",
        },
        {
            "name": "effective_date",
            "description": "When the contract becomes effective",
            "output_type": "date",
        },
        {
            "name": "expiration_date",
            "description": "When the contract expires or terminates",
            "output_type": "date",
        },
        {
            "name": "key_obligations",
            "description": "Main obligations and responsibilities outlined",
            "output_type": "list[string]",
        },
        {
            "name": "total_value",
            "description": "Total monetary value of the contract if specified",
            "output_type": "float",
        },
    ],
)
print(f"✓ Created taxonomy: {taxonomy.taxonomy_id}")

# ============================================
# Step 4: Ingest and Process Documents
# ============================================
print("Processing documents...")
results = client.classify.generate_batch(
    connector_name="my-document-bucket",
    taxonomy_name="contract-analysis",
)
print(f"✓ Processed {len(results.metadata)} documents")

# ============================================
# Step 5: Export to Qdrant
# ============================================
export_result = client.destination.export(
    destination_name="rag-vectors",
    connector_name="my-document-bucket",
    export_level="chunk",      # Export at chunk level for RAG
    export_nodes=True,         # Include vector embeddings
    export_metadata=True,      # Include extracted metadata
    metadata_format="json_store",
)
print(f"✓ Exported to Qdrant")

# If large export, track progress
if export_result.tracker_id:
    status = client.progress_tracker.get_status(tracker_id=export_result.tracker_id)
    print(f"  Export status: {status.status} ({status.progress}%)")

Query Your RAG Pipeline

Once your data is in Qdrant, you can query it with any Qdrant client:
from qdrant_client import QdrantClient

# Connect to Qdrant
qdrant = QdrantClient(
    url="https://your-cluster.qdrant.io",
    api_key="YOUR_QDRANT_API_KEY",
)

# Semantic search with metadata filtering
results = qdrant.search(
    collection_name="document_chunks",
    query_vector=your_embedding,  # Your query embedding
    query_filter={
        "must": [
            {"key": "contract_type", "match": {"value": "NDA"}},
            {"key": "effective_date", "range": {"gte": "2024-01-01"}},
        ]
    },
    limit=5,
)

for hit in results:
    print(f"Score: {hit.score}")
    print(f"Text: {hit.payload['text'][:200]}...")
    print(f"Parties: {hit.payload['parties']}")
    print("---")

Understanding Export Options

OptionDescriptionWhen to Use
export_level="file"One record per documentDocument-level retrieval
export_level="chunk"One record per chunkRAG / semantic search
export_level="both"Both file and chunk recordsHybrid use cases
export_nodes=TrueInclude vector embeddingsRequired for semantic search
metadata_format="json_store"Metadata as JSON columnFlexible filtering
metadata_format="column_store"Metadata as separate columnsSQL-style queries

Production Tips

For large document sets (100+ files), the export runs asynchronously. Poll the tracker:
import time

while True:
    status = client.progress_tracker.get_status(tracker_id=export_result.tracker_id)
    if status.status == "completed":
        print("Export finished!")
        break
    elif status.status == "failed":
        print(f"Export failed: {status.error}")
        break
    print(f"Progress: {status.progress}%")
    time.sleep(5)
Use Data Slices to process only new documents:
# Create a slice for recent documents
slice = client.dataslice.create(
    dataslice_name="recent-contracts",
    connector_name="my-document-bucket",
    conditions=[
        {"field": "last_modified", "operator": "gte", "value": "2024-01-01"}
    ],
)

# Process only the slice
results = client.classify.generate_batch(
    dataslice_name="recent-contracts",
    taxonomy_name="contract-analysis",
)
Wrap operations in try-except for production robustness:
from unstructured.exceptions import UnstructuredError

try:
    results = client.classify.generate_batch(...)
except UnstructuredError as e:
    print(f"Processing failed: {e.message}")
    # Handle retry logic

Next Steps

S3 to SharePoint

Export metadata to SharePoint columns.

PII Detection

Add sensitive data detection to your pipeline.

Custom Taxonomies

Use AI to generate custom taxonomies.

Destinations

Explore all supported export targets.