from unstructured import UnstructuredClient
# Initialize the client
client = UnstructuredClient(
username="your-username",
password="your-password",
)
# ============================================
# Step 1: Connect to S3 (Source)
# ============================================
s3_connector = client.data_source.create(
connector_name="my-document-bucket",
connector_body={
"vector_db_type": "s3",
"bucket_name": "company-documents",
"aws_access_key_id": "YOUR_ACCESS_KEY",
"aws_secret_access_key": "YOUR_SECRET_KEY",
"region": "us-east-1",
"prefix": "contracts/", # Optional: only process files in this folder
},
)
print(f"✓ Connected to S3: {s3_connector.profile_id}")
# ============================================
# Step 2: Connect to Qdrant (Destination)
# ============================================
qdrant_dest = client.destination.create(
destination_name="rag-vectors",
destination_body={
"vector_db_type": "qdrant",
"url": "https://your-cluster.qdrant.io",
"api_key": "YOUR_QDRANT_API_KEY",
"collection_name": "document_chunks",
},
)
print(f"✓ Connected to Qdrant: {qdrant_dest.destination_id}")
# ============================================
# Step 3: Define Your Taxonomy
# ============================================
taxonomy = client.taxonomy.upsert(
taxonomy_name="contract-analysis",
taxonomy_description="Extract key information from legal contracts",
tags=[
{
"name": "contract_type",
"description": "Type of contract (NDA, MSA, SLA, Employment, etc.)",
"output_type": "word",
},
{
"name": "parties",
"description": "Names of all parties involved in the contract",
"output_type": "list[string]",
},
{
"name": "effective_date",
"description": "When the contract becomes effective",
"output_type": "date",
},
{
"name": "expiration_date",
"description": "When the contract expires or terminates",
"output_type": "date",
},
{
"name": "key_obligations",
"description": "Main obligations and responsibilities outlined",
"output_type": "list[string]",
},
{
"name": "total_value",
"description": "Total monetary value of the contract if specified",
"output_type": "float",
},
],
)
print(f"✓ Created taxonomy: {taxonomy.taxonomy_id}")
# ============================================
# Step 4: Ingest and Process Documents
# ============================================
print("Processing documents...")
results = client.classify.generate_batch(
connector_name="my-document-bucket",
taxonomy_name="contract-analysis",
)
print(f"✓ Processed {len(results.metadata)} documents")
# ============================================
# Step 5: Export to Qdrant
# ============================================
export_result = client.destination.export(
destination_name="rag-vectors",
connector_name="my-document-bucket",
export_level="chunk", # Export at chunk level for RAG
export_nodes=True, # Include vector embeddings
export_metadata=True, # Include extracted metadata
metadata_format="json_store",
)
print(f"✓ Exported to Qdrant")
# If large export, track progress
if export_result.tracker_id:
status = client.progress_tracker.get_status(tracker_id=export_result.tracker_id)
print(f" Export status: {status.status} ({status.progress}%)")