VirtuousAI

Data Sources & Extraction

Deep dive into the dlt_extract action and available data sources

Data Sources & Extraction

VirtuousAI uses dlt (data load tool) under the hood for reliable data extraction. This guide covers how the dlt_extract action works and the available data sources.

How dlt_extract Works

The dlt_extract action:

  1. Resolves credentials from the specified connection
  2. Builds a dlt pipeline targeting your bronze data layer
  3. Extracts resources one at a time with checkpointing between each
  4. Writes Parquet files to S3 with Hive-style partitioning
  5. Tracks state for incremental extraction on subsequent runs

Per-Resource Checkpointing

Resources are extracted sequentially with state committed after each:

StepWhat Happens
1Check cancellation flag
2Build dlt source for single resource
3Run extraction in thread pool
4Commit dlt state
5Update progress callback
6Move to next resource

This means if extraction is interrupted (worker crash, cancellation), it resumes from the last completed resource — not from the beginning.

Cancellation is cooperative — checked between resources. The current resource completes before graceful exit.

Available Sources

Shopify

Extract e-commerce data from Shopify stores.

Resources:

  • orders — Order data with line items
  • products — Product catalog
  • customers — Customer profiles
  • inventory_items — Inventory levels
  • collections — Product collections

Configuration:

{
  "kind": "dlt_extract",
  "connectionRef": { "slug": "shopify" },
  "definition": {
    "source": "shopify",
    "resources": ["orders", "products"],
    "start_date": "2026-01-01",
    "source_config": {
      "items_per_page": 250
    }
  }
}

Incremental Extraction:

  • Orders: Uses updated_at cursor
  • Products: Uses updated_at cursor
  • Customers: Uses updated_at cursor

REST API (Generic)

Extract data from any REST API endpoint.

Configuration:

{
  "kind": "dlt_extract",
  "connectionRef": { "id": "conn_rest_api" },
  "definition": {
    "source": "rest_api",
    "resources": ["customers"],
    "source_config": {
      "base_url": "https://api.example.com/v1",
      "endpoints": {
        "customers": {
          "path": "/customers",
          "paginate": {
            "type": "cursor",
            "cursor_param": "after"
          }
        }
      }
    }
  }
}

Klaviyo

Extract email and SMS marketing data from Klaviyo.

Resources:

ResourceIncrementalCursor FieldDescription
profilesYesattributes.updatedCustomer profiles and attributes
eventsYesattributes.datetimeEngagement events (opens, clicks, etc.)
listsYesattributes.updatedEmail/SMS lists
campaignsNoEmail and SMS campaigns
metricsNoCustom metric definitions
flowsNoAutomation flows
segmentsNoDynamic segments
tagsNoOrganization tags

Default Resources: profiles, events, lists

Configuration:

{
  "kind": "dlt_extract",
  "connectionRef": { "slug": "klaviyo" },
  "definition": {
    "source": "klaviyo",
    "resources": ["profiles", "events", "lists"],
    "start_date": "2026-01-01"
  }
}

Credentials Required:

  • api_key — Klaviyo Private API Key (starts with pk_)

Incremental Extraction:

  • Profiles: Uses updated cursor for changes
  • Events: Uses datetime cursor for new events
  • Lists: Uses updated cursor for changes
  • Other resources: Full sync (reference data)

Klaviyo's API has strict rate limits. The connector automatically handles rate limiting with exponential backoff.

Adding New Sources

VirtuousAI's source registry is extensible. Contact support for custom source integrations or check the API for programmatic source registration.

Extraction Modes

Incremental (Default)

Only fetches new or modified records since last extraction:

{
  "definition": {
    "source": "shopify",
    "resources": ["orders"],
    "incremental": true
  }
}

How it works:

  1. dlt maintains state with last cursor position
  2. Each run queries only records after the cursor
  3. Dramatically reduces API calls and processing time

Full Resync

Drops existing state and extracts all data:

{
  "definition": {
    "source": "shopify",
    "resources": ["orders"],
    "full_resync": true
  }
}

When to use:

  • Schema changes in source system
  • Data corruption in bronze layer
  • Initial backfill with new date range

Full resync drops the dlt pipeline state. Use sparingly on large datasets.

Output Format

Bronze Layer Structure

Extracted data is written to S3 as Parquet files:

s3://vai-bronze-{env}/
└── {tenant_id}/
    └── {connection_id}/
        └── {source}/
            └── {resource}/
                └── year=2026/
                    └── month=01/
                        └── day=22/
                            └── data.parquet

Metrics

Each extraction returns detailed metrics:

{
  "pipeline_name": "bronze__tenant123__conn456__shopify",
  "rows_extracted": 1250,
  "files_written": 3,
  "bytes_written": 2456789,
  "resources_loaded": ["orders", "order_line_items", "products"],
  "per_resource_metrics": {
    "orders": { "rows_extracted": 500, "files_written": 1 },
    "order_line_items": { "rows_extracted": 1500, "files_written": 1 },
    "products": { "rows_extracted": 250, "files_written": 1 }
  },
  "duration_seconds": 45.2,
  "state_fingerprint": "abc123..."
}

Error Handling

Retryable Errors

ErrorCauseResolution
RATE_LIMITEDAPI rate limit hitAutomatic retry with backoff
CONNECTION_ERRORNetwork timeoutAutomatic retry
EXTRACTION_ERRORTransient failureAutomatic retry

Non-Retryable Errors

ErrorCauseResolution
AUTH_ERRORInvalid credentialsUpdate connection credentials
CONFIGURATION_ERRORInvalid source/resourceFix action definition

Resume After Failure

If extraction fails mid-way, the next run resumes from where it left off:

{
  "result": {
    "resumed_from": ["orders", "order_line_items"],
    "resources_loaded": ["orders", "order_line_items", "products"]
  }
}

The progress is tracked via:

{
  "progress": {
    "completed_resources": ["orders", "order_line_items"],
    "total_resources": 3
  }
}

Heartbeat & Lease

Long-running extractions use the lease system:

ParameterValue
Lease Duration90 seconds
HeartbeatEvery 30 seconds during extraction
Watchdog Grace180 seconds

The ExtractionHeartbeat context manager extends the lease during pipeline execution:

with ExtractionHeartbeat(run_id, pipeline_name):
    # dlt pipeline runs here
    # Heartbeat extends lease automatically

Scheduling Extractions

For regular data syncs, create an automation:

vai automations create \
  --name "Daily Shopify Sync" \
  --trigger-type schedule \
  --config '{
    "schedule": "0 6 * * *",
    "timezone": "UTC",
    "action": {
      "kind": "dlt_extract",
      "connectionRef": {"slug": "shopify"},
      "definition": {
        "source": "shopify",
        "resources": ["orders", "products"],
        "incremental": true
      }
    }
  }'

Best Practices

  1. Use incremental mode — Only use full_resync when necessary
  2. Limit resources — Extract only the resources you need
  3. Set date ranges — Use start_date to limit historical data
  4. Monitor metrics — Track rows_extracted to detect anomalies
  5. Handle rate limits — The executor handles this, but schedule during off-peak hours

Next Steps

On this page