Skip to main content
This cookbook walks you through building a document enrichment pipeline. You’ll ingest documents from Amazon S3, extract structured metadata, and push the enriched data to SharePoint as document columns.

What You’ll Build

Prerequisites

  • An S3 bucket with PDF documents
  • A SharePoint site with appropriate permissions
  • Python 3.9+
pip install unstructured-sdk

Complete Pipeline

from unstructured import UnstructuredClient

# Initialize the client
client = UnstructuredClient(
    username="your-username",
    password="your-password",
)

# ============================================
# Step 1: Connect to S3 (Source)
# ============================================
s3_connector = client.data_source.create(
    connector_name="my-document-bucket",
    connector_body={
        "vector_db_type": "s3",
        "bucket_name": "company-documents",
        "aws_access_key_id": "YOUR_ACCESS_KEY",
        "aws_secret_access_key": "YOUR_SECRET_KEY",
        "region": "us-east-1",
        "prefix": "contracts/",  # Optional: only process files in this folder
    },
)
print(f"✓ Connected to S3: {s3_connector.profile_id}")

# ============================================
# Step 2: Connect to SharePoint (Destination)
# ============================================
sharepoint_dest = client.destination.create(
    destination_name="contract-library",
    destination_body={
        "vector_db_type": "sharepoint",
        "client_id": "YOUR_CLIENT_ID",
        "client_secret": "YOUR_CLIENT_SECRET",
        "tenant_id": "YOUR_TENANT_ID",
        "site_name": "LegalDocuments",
    },
)
print(f"✓ Connected to SharePoint: {sharepoint_dest.destination_id}")

# ============================================
# Step 3: Define Your Taxonomy
# ============================================
taxonomy = client.taxonomy.upsert(
    taxonomy_name="contract-analysis",
    taxonomy_description="Extract key information from legal contracts",
    tags=[
        {
            "name": "contract_type",
            "description": "Type of contract (NDA, MSA, SLA, Employment, etc.)",
            "output_type": "word",
        },
        {
            "name": "parties",
            "description": "Names of all parties involved in the contract",
            "output_type": "list[string]",
        },
        {
            "name": "effective_date",
            "description": "When the contract becomes effective",
            "output_type": "date",
        },
        {
            "name": "expiration_date",
            "description": "When the contract expires or terminates",
            "output_type": "date",
        },
        {
            "name": "key_obligations",
            "description": "Main obligations and responsibilities outlined",
            "output_type": "list[string]",
        },
        {
            "name": "total_value",
            "description": "Total monetary value of the contract if specified",
            "output_type": "float",
        },
    ],
)
print(f"✓ Created taxonomy: {taxonomy.taxonomy_id}")

# ============================================
# Step 4: Ingest and Process Documents
# ============================================
print("Processing documents...")
results = client.classify.generate_batch(
    connector_name="my-document-bucket",
    taxonomy_name="contract-analysis",
)
print(f"✓ Processed {len(results.metadata)} documents")

# ============================================
# Step 5: Export to SharePoint
# ============================================
export_result = client.destination.export(
    destination_name="contract-library",
    connector_name="my-document-bucket",
    export_level="file",           # Export at file level for SharePoint
    export_metadata=True,          # Include extracted metadata
    metadata_format="column_store", # Create separate SharePoint columns
    export_tags=[                   # Specify which tags become columns
        "contract_type",
        "parties",
        "effective_date",
        "expiration_date",
        "total_value",
    ],
)
print(f"✓ Exported to SharePoint")

# If large export, track progress
if export_result.tracker_id:
    status = client.progress_tracker.get_status(tracker_id=export_result.tracker_id)
    print(f"  Export status: {status.status} ({status.progress}%)")

What Happens in SharePoint

Once exported, your SharePoint document library will have new columns populated with the extracted metadata:
DocumentContract TypePartiesEffective DateTotal Value
Acme-NDA-2024.pdfNDAAcme Corp, Beta LLC2024-01-15-
ServiceAgreement.pdfMSATechCo, StartupXYZ2024-03-01$150,000
Employment-JDoe.pdfEmploymentJane Doe, Acme Corp2024-02-01$95,000
Users can now:
  • Filter and sort documents by contract type, date, or value
  • Create views like “Expiring This Quarter” or “High-Value Contracts”
  • Search using SharePoint’s native search with metadata facets
  • Set up alerts for documents matching specific criteria

Understanding Export Options

OptionDescriptionWhen to Use
export_level="file"One record per documentSharePoint, document management
export_level="chunk"One record per chunkVector databases, RAG
export_level="both"Both file and chunk recordsHybrid use cases
metadata_format="column_store"Metadata as separate columnsSharePoint, SQL databases
metadata_format="json_store"Metadata as JSON columnFlexible NoSQL storage
export_tags=[...]Specific tags to exportControl which columns are created

Production Tips

For large document sets (100+ files), the export runs asynchronously. Poll the tracker:
import time

while True:
    status = client.progress_tracker.get_status(tracker_id=export_result.tracker_id)
    if status.status == "completed":
        print("Export finished!")
        break
    elif status.status == "failed":
        print(f"Export failed: {status.error}")
        break
    print(f"Progress: {status.progress}%")
    time.sleep(5)
Use Data Slices to process only new documents:
# Create a slice for recent documents
slice = client.dataslice.create(
    dataslice_name="recent-contracts",
    connector_name="my-document-bucket",
    conditions=[
        {"field": "last_modified", "operator": "gte", "value": "2024-01-01"}
    ],
)

# Process only the slice
results = client.classify.generate_batch(
    dataslice_name="recent-contracts",
    taxonomy_name="contract-analysis",
)
Wrap operations in try-except for production robustness:
from unstructured.exceptions import UnstructuredError

try:
    results = client.classify.generate_batch(...)
except UnstructuredError as e:
    print(f"Processing failed: {e.message}")
    # Handle retry logic

Next Steps