Data Sources & Extraction
Deep dive into the dlt_extract action and available data sources
Data Sources & Extraction
VirtuousAI uses dlt (data load tool) under the hood for reliable data extraction. This guide covers how the dlt_extract action works and the available data sources.
How dlt_extract Works
The dlt_extract action:
- Resolves credentials from the specified connection
- Builds a dlt pipeline targeting your bronze data layer
- Extracts resources one at a time with checkpointing between each
- Writes Parquet files to S3 with Hive-style partitioning
- Tracks state for incremental extraction on subsequent runs
Per-Resource Checkpointing
Resources are extracted sequentially with state committed after each:
| Step | What Happens |
|---|---|
| 1 | Check cancellation flag |
| 2 | Build dlt source for single resource |
| 3 | Run extraction in thread pool |
| 4 | Commit dlt state |
| 5 | Update progress callback |
| 6 | Move to next resource |
This means if extraction is interrupted (worker crash, cancellation), it resumes from the last completed resource — not from the beginning.
Cancellation is cooperative — checked between resources. The current resource completes before graceful exit.
Available Sources
Shopify
Extract e-commerce data from Shopify stores.
Resources:
orders— Order data with line itemsproducts— Product catalogcustomers— Customer profilesinventory_items— Inventory levelscollections— Product collections
Configuration:
{
"kind": "dlt_extract",
"connectionRef": { "slug": "shopify" },
"definition": {
"source": "shopify",
"resources": ["orders", "products"],
"start_date": "2026-01-01",
"source_config": {
"items_per_page": 250
}
}
}Incremental Extraction:
- Orders: Uses
updated_atcursor - Products: Uses
updated_atcursor - Customers: Uses
updated_atcursor
REST API (Generic)
Extract data from any REST API endpoint.
Configuration:
{
"kind": "dlt_extract",
"connectionRef": { "id": "conn_rest_api" },
"definition": {
"source": "rest_api",
"resources": ["customers"],
"source_config": {
"base_url": "https://api.example.com/v1",
"endpoints": {
"customers": {
"path": "/customers",
"paginate": {
"type": "cursor",
"cursor_param": "after"
}
}
}
}
}
}Klaviyo
Extract email and SMS marketing data from Klaviyo.
Resources:
| Resource | Incremental | Cursor Field | Description |
|---|---|---|---|
profiles | Yes | attributes.updated | Customer profiles and attributes |
events | Yes | attributes.datetime | Engagement events (opens, clicks, etc.) |
lists | Yes | attributes.updated | Email/SMS lists |
campaigns | No | — | Email and SMS campaigns |
metrics | No | — | Custom metric definitions |
flows | No | — | Automation flows |
segments | No | — | Dynamic segments |
tags | No | — | Organization tags |
Default Resources: profiles, events, lists
Configuration:
{
"kind": "dlt_extract",
"connectionRef": { "slug": "klaviyo" },
"definition": {
"source": "klaviyo",
"resources": ["profiles", "events", "lists"],
"start_date": "2026-01-01"
}
}Credentials Required:
api_key— Klaviyo Private API Key (starts withpk_)
Incremental Extraction:
- Profiles: Uses
updatedcursor for changes - Events: Uses
datetimecursor for new events - Lists: Uses
updatedcursor for changes - Other resources: Full sync (reference data)
Klaviyo's API has strict rate limits. The connector automatically handles rate limiting with exponential backoff.
Adding New Sources
VirtuousAI's source registry is extensible. Contact support for custom source integrations or check the API for programmatic source registration.
Extraction Modes
Incremental (Default)
Only fetches new or modified records since last extraction:
{
"definition": {
"source": "shopify",
"resources": ["orders"],
"incremental": true
}
}How it works:
- dlt maintains state with last cursor position
- Each run queries only records after the cursor
- Dramatically reduces API calls and processing time
Full Resync
Drops existing state and extracts all data:
{
"definition": {
"source": "shopify",
"resources": ["orders"],
"full_resync": true
}
}When to use:
- Schema changes in source system
- Data corruption in bronze layer
- Initial backfill with new date range
Full resync drops the dlt pipeline state. Use sparingly on large datasets.
Output Format
Bronze Layer Structure
Extracted data is written to S3 as Parquet files:
s3://vai-bronze-{env}/
└── {tenant_id}/
└── {connection_id}/
└── {source}/
└── {resource}/
└── year=2026/
└── month=01/
└── day=22/
└── data.parquetMetrics
Each extraction returns detailed metrics:
{
"pipeline_name": "bronze__tenant123__conn456__shopify",
"rows_extracted": 1250,
"files_written": 3,
"bytes_written": 2456789,
"resources_loaded": ["orders", "order_line_items", "products"],
"per_resource_metrics": {
"orders": { "rows_extracted": 500, "files_written": 1 },
"order_line_items": { "rows_extracted": 1500, "files_written": 1 },
"products": { "rows_extracted": 250, "files_written": 1 }
},
"duration_seconds": 45.2,
"state_fingerprint": "abc123..."
}Error Handling
Retryable Errors
| Error | Cause | Resolution |
|---|---|---|
RATE_LIMITED | API rate limit hit | Automatic retry with backoff |
CONNECTION_ERROR | Network timeout | Automatic retry |
EXTRACTION_ERROR | Transient failure | Automatic retry |
Non-Retryable Errors
| Error | Cause | Resolution |
|---|---|---|
AUTH_ERROR | Invalid credentials | Update connection credentials |
CONFIGURATION_ERROR | Invalid source/resource | Fix action definition |
Resume After Failure
If extraction fails mid-way, the next run resumes from where it left off:
{
"result": {
"resumed_from": ["orders", "order_line_items"],
"resources_loaded": ["orders", "order_line_items", "products"]
}
}The progress is tracked via:
{
"progress": {
"completed_resources": ["orders", "order_line_items"],
"total_resources": 3
}
}Heartbeat & Lease
Long-running extractions use the lease system:
| Parameter | Value |
|---|---|
| Lease Duration | 90 seconds |
| Heartbeat | Every 30 seconds during extraction |
| Watchdog Grace | 180 seconds |
The ExtractionHeartbeat context manager extends the lease during pipeline execution:
with ExtractionHeartbeat(run_id, pipeline_name):
# dlt pipeline runs here
# Heartbeat extends lease automaticallyScheduling Extractions
For regular data syncs, create an automation:
vai automations create \
--name "Daily Shopify Sync" \
--trigger-type schedule \
--config '{
"schedule": "0 6 * * *",
"timezone": "UTC",
"action": {
"kind": "dlt_extract",
"connectionRef": {"slug": "shopify"},
"definition": {
"source": "shopify",
"resources": ["orders", "products"],
"incremental": true
}
}
}'Best Practices
- Use incremental mode — Only use full_resync when necessary
- Limit resources — Extract only the resources you need
- Set date ranges — Use
start_dateto limit historical data - Monitor metrics — Track rows_extracted to detect anomalies
- Handle rate limits — The executor handles this, but schedule during off-peak hours