Scan Processing

Overview

Scan processing is DataPancake's approach to schema discovery using Snowflake compute resources. It uses vertical scaling (multiple threads within a single warehouse) and batch processing to handle large datasets efficiently while respecting Snowflake's 60-minute procedure timeout limit.

During scanning, DataPancake recursively traverses semi-structured data, discovers all attributes at any nesting depth, detects polymorphic variations, and creates comprehensive metadata.

DataPancake-Specific Capabilities:

  • Vertical scaling within a single warehouse (multiple threads)

  • Batch processing with configurable batch sizes

  • Multiple procedure calls for datasets exceeding 60-minute timeout

  • Progressive attribute discovery as data is processed

  • Error resilience (individual record errors don't stop the scan)


Vertical Scaling & Threads

DataPancake uses multiple threads within a single warehouse to maximize compute utilization without requiring multiple warehouses.

How Threads Work:

  • Each thread processes batches independently in parallel

  • Threads use joblib's Parallel for parallel execution

  • Batches are distributed round-robin across threads (Thread 1: batches 1, thread_count+1, 2thread_count+1; Thread 2: batches 2, thread_count+2, 2thread_count+2, etc.)

  • Threads share warehouse memory but process different data

Thread Configuration:

  • Default: Automatically set to AVAILABLE_CORES from the warehouse's compute_resource_size:

    • X-Small: 8 threads

    • Small: 16 threads

    • Medium Snowpark-Optimized: 32 threads

    • Large Snowpark-Optimized: 64 threads

    • 2X-Large Snowpark-Optimized: 256 threads

    • 3X-Large through 6X-Large: 512-4096 threads

  • Manual Override: Adjustable in Scan Configuration Vertical Scale Settings

  • Memory Impact: More threads = more concurrent memory usage but faster processing (up to warehouse max)

Supported / Not Supported:

  • ✅ Multiple threads within single warehouse

  • ✅ Thread count up to warehouse maximum (AVAILABLE_CORES)

  • ❌ Thread count exceeding warehouse capacity (will fail or be limited)


Batch Processing

Batch Structure:

  • Records are divided into batches for processing

  • Batch size = thread_process_record_count (default: 1000 records per thread)

  • Each batch is processed by a single thread

  • Batches are processed in parallel across threads

Memory Calculation:

Key Factors:

  • Average document size - Larger documents use more memory

  • Thread count - More threads = more concurrent memory usage

  • thread_process_record_count - More records per batch = more memory per thread (default: 1000)

  • Multiplier of 2 - Accounts for data structures and processing overhead

Memory Constraints:

  • Python procedures have significant memory requirements

  • Complex nested structures increase memory usage

  • Large arrays consume additional memory

  • Warehouse size determines available memory

Optimization:

  • Reduce thread_process_record_count if memory errors occur

  • Reduce thread count if warehouse memory is limited

  • Default (1000) works for most scenarios


Procedure Calls & Timeout Management

60-Minute Timeout Constraint

Snowflake Native App Limitation:

  • Snowpark procedures in native apps have a 3600-second (60-minute) runtime limit

  • This is an organizational-level Snowflake limitation

  • Scans exceeding 60 minutes are automatically terminated

  • Not a DataPancake limitation—applies to all Snowflake native apps

Impact:

  • Very large data sources cannot be scanned in a single procedure call

  • Must break scans into multiple procedure calls

  • Each procedure call processes a subset of records

  • Procedure calls execute sequentially via Snowflake Tasks

Multiple Procedure Calls

When to Use:

  • Data source takes longer than 60 minutes to scan

  • Very large datasets (millions of records)

  • Complex nested structures requiring more processing time

Configuration:

  • Number of Procedure Calls (PROCEDURE_INSTANCE_COUNT) - Total number of procedure calls needed

  • Record Count Per Procedure Call (PROCEDURE_INSTANCE_ROW_COUNT) - Records processed per call

  • Order By (SCAN_ORDER_BY) - Required for multiple calls to ensure consistent partitioning

How It Works:

  1. Data is divided into N procedure calls based on record count

  2. Each procedure call processes its assigned record range using THREAD_RANGE_CRITERIA

  3. Procedure calls execute sequentially via Snowflake Tasks

  4. Each call must complete within 60 minutes

  5. Results are consolidated after all calls complete

Important Constraints:

  • WHERE clause cannot be used when PROCEDURE_INSTANCE_COUNT > 1 (enforced in Scan_Configuration.py:379-384)

  • ORDER BY is required when PROCEDURE_INSTANCE_COUNT > 1 (enforced in Scan_Configuration.py:366)

  • Record count validation - Must be large enough to process all rows but not exceed total records (validated in upsert_datasource_scan_configuration.py:139-158)

Example:

  • Dataset: 2,000,000 records

  • Configuration: 2 procedure calls, 1,000,000 records per call

  • Procedure Call 1: Processes records 1-1,000,000 (via THREAD_RANGE_CRITERIA)

  • Procedure Call 2: Processes records 1,000,001-2,000,000 (via THREAD_RANGE_CRITERIA)

  • Both calls execute sequentially, each within 60-minute limit

Order By Requirement

When Required:

  • PROCEDURE_INSTANCE_COUNT > 1 - Ensures consistent record partitioning across calls

When Optional:

  • PROCEDURE_INSTANCE_COUNT = 1 and SCAN_RECORD_LIMIT > 0 - Can improve performance for limited scans

When Not Required:

  • PROCEDURE_INSTANCE_COUNT = 1 and SCAN_RECORD_LIMIT = 0 - Full table scan in single call

Examples:

  • json_data:"_id" - Single attribute

  • customer_id - Single attribute

  • customer_id, json_data:"_id" - Multiple attributes (composite)

Supported / Not Supported:

  • ✅ Single or multiple attributes in ORDER BY

  • ✅ JSON path expressions (e.g., json_data:"_id")

  • ✅ Composite ORDER BY (multiple attributes)

  • ❌ ORDER BY with WHERE clause when PROCEDURE_INSTANCE_COUNT > 1


Data Processing

During scanning, each thread recursively traverses its assigned records to discover attributes at any nesting depth. The process handles nested objects, arrays, embedded JSON, and creates all 7 polymorphic versions proactively when attributes are first encountered. Thread results are consolidated after processing, with attribute data merged and duplicate attributes combined.

For detailed information on attribute discovery, polymorphic versions, and the discovery process, see the Attribute Consolidation documentation.

Supported / Not Supported:

  • ✅ Recursive traversal of nested structures (any depth)

  • ✅ Polymorphic variation detection

  • ✅ Embedded JSON parsing

  • ✅ Array handling

  • ❌ Structured (non-semi-structured) data sources (use different scan procedure)


Error Handling

Scan processing is designed to be resilient: individual record errors (such as invalid JSON syntax) don't stop the scan—the problematic record is skipped and processing continues. Thread errors are logged but don't stop other threads, ensuring the scan processes as many records as possible.

Error Tracking:

  • Errors are tracked in the datasource_scan_error table

  • Can be queried via system views for troubleshooting

  • Error types include: JSON parse errors, thread processing errors, validation errors

Supported / Not Supported:

  • ✅ Individual record errors are skipped (scan continues)

  • ✅ Thread errors are logged (other threads continue)

  • ✅ Error tracking and querying via views

  • ❌ Automatic retry of failed records (manual intervention required)


Performance Characteristics

Scan Speed

Benchmark Performance:

  • Approximately 1,000,000 records per minute on Medium Snowpark-Optimized warehouse

  • Performance varies based on:

    • Data complexity (nesting depth, array sizes)

    • Document size

    • Number of polymorphic variations

    • Embedded JSON content

    • Data source type (Internal Tables vs. External Tables)

External Tables:

  • Slightly slower than internal tables

  • Network latency adds overhead

  • Still achieves high throughput on medium warehouses

Factors Affecting Speed:

  • Thread count - More threads = faster (up to warehouse max)

  • Batch size (thread_process_record_count) - Larger batches = more efficient but more memory

  • Data complexity - Deeper nesting = slower processing

  • Polymorphic variations - More variations = more processing

  • Embedded JSON - Stringified JSON adds parsing overhead

Optimization Strategies

For Speed:

  • Use maximum thread count (default)

  • Increase thread_process_record_count if memory allows

  • Use appropriate warehouse size (Medium recommended)

  • Use indexed columns in ORDER BY

For Memory:

  • Reduce thread_process_record_count (default: 1000)

  • Reduce thread count if needed

  • Use smaller warehouse if memory constrained

  • Process in smaller procedure calls

For Large Datasets:

  • Use multiple procedure calls

  • Set appropriate PROCEDURE_INSTANCE_ROW_COUNT per call

  • Use efficient ORDER BY attributes

  • Monitor progress and adjust as needed


Common Scenarios

Scenario 1: Standard Full Scan

Configuration:

  • PROCEDURE_INSTANCE_COUNT = 1

  • Default thread count (warehouse max)

  • SCAN_RECORD_LIMIT = 0 (no limit)

  • No WHERE clause

Process:

  • All records processed in single call

  • Completes within 60 minutes (if dataset allows)

  • Full attribute discovery

Scenario 2: Large Dataset Scan

Configuration:

  • PROCEDURE_INSTANCE_COUNT > 1 (e.g., 4 calls)

  • PROCEDURE_INSTANCE_ROW_COUNT = 1,000,000 records per call

  • SCAN_ORDER_BY on unique identifier

  • Default thread count

Process:

  • Data divided into N ranges via THREAD_RANGE_CRITERIA

  • Each call processes its range sequentially via Tasks

  • Results consolidated after all calls complete

Scenario 3: Incremental Scan

Configuration:

  • PROCEDURE_INSTANCE_COUNT = 1

  • SCAN_WHERE_CLAUSE with timestamp filter

  • SOURCE_STREAM_LAST_SCANNED_TIMESTAMP from previous scan

Process:

  • Only new/changed records scanned

  • Faster than full scan

  • Maintains schema discovery for new data

Note: WHERE clause only works with PROCEDURE_INSTANCE_COUNT = 1

Scenario 4: Memory-Constrained Scan

Configuration:

  • Reduced thread count

  • Reduced thread_process_record_count (below default 1000)

  • Smaller warehouse size

Process:

  • Lower memory usage, slower processing

  • Prevents memory errors

  • Suitable for complex nested data


Summary

Scan processing is DataPancake's high-performance approach to schema discovery. Key takeaways:

  • Vertical scaling - Uses multiple threads within a single warehouse for parallel processing

  • Batch processing - Divides data into batches processed simultaneously across threads (default: 1000 records per thread)

  • Timeout management - Multiple procedure calls handle datasets exceeding 60-minute limit

  • Memory control - Batch size (thread_process_record_count) and thread count control memory usage

  • Recursive discovery - Deep traversal discovers all attributes at any nesting depth

  • Progressive processing - Attributes discovered incrementally as data is processed

  • Error resilience - Individual errors don't stop the scan, processing continues

Key Constraints:

  • 60-minute timeout per procedure call (Snowflake limitation)

  • WHERE clause cannot be used with multiple procedure calls

  • ORDER BY required when PROCEDURE_INSTANCE_COUNT > 1

  • Thread count limited by warehouse AVAILABLE_CORES

By understanding scan processing, you can optimize scan configurations for your specific data sources, balancing performance, memory usage, and timeout constraints.

Last updated

Was this helpful?