# Scan Processing

### Overview

Scan processing is DataPancake's approach to schema discovery using Snowflake compute resources. It uses vertical scaling (multiple threads within a single warehouse) and batch processing to handle large datasets efficiently while respecting Snowflake's 60-minute procedure timeout limit.

During scanning, DataPancake recursively traverses semi-structured data, discovers all attributes at any nesting depth, detects polymorphic variations, and creates comprehensive metadata.

**DataPancake-Specific Capabilities:**

* Vertical scaling within a single warehouse (multiple threads)
* Batch processing with configurable batch sizes
* Multiple procedure calls for datasets exceeding 60-minute timeout
* Progressive attribute discovery as data is processed
* Error resilience (individual record errors don't stop the scan)

***

### Vertical Scaling & Threads

DataPancake uses multiple threads within a single warehouse to maximize compute utilization without requiring multiple warehouses.

**How Threads Work:**

* Each thread processes batches independently in parallel
* Threads use joblib's `Parallel` for parallel execution
* Batches are distributed round-robin across threads (Thread 1: batches 1, thread\_count+1, 2*thread\_count+1; Thread 2: batches 2, thread\_count+2, 2*thread\_count+2, etc.)
* Threads share warehouse memory but process different data

**Thread Configuration:**

* **Default:** Automatically set to `AVAILABLE_CORES` from the warehouse's `compute_resource_size`:
  * X-Small: 8 threads
  * Small: 16 threads
  * Medium Snowpark-Optimized: 32 threads
  * Large Snowpark-Optimized: 64 threads
  * 2X-Large Snowpark-Optimized: 256 threads
  * 3X-Large through 6X-Large: 512-4096 threads
* **Manual Override:** Adjustable in Scan Configuration Vertical Scale Settings
* **Memory Impact:** More threads = more concurrent memory usage but faster processing (up to warehouse max)

**Supported / Not Supported:**

* ✅ Multiple threads within single warehouse
* ✅ Thread count up to warehouse maximum (`AVAILABLE_CORES`)
* ❌ Thread count exceeding warehouse capacity (will fail or be limited)

***

### Batch Processing

**Batch Structure:**

* Records are divided into batches for processing
* Batch size = `thread_process_record_count` (default: 1000 records per thread)
* Each batch is processed by a single thread
* Batches are processed in parallel across threads

**Memory Calculation:**

```
Memory Used = Avg document size (bytes) × 2 × Thread Count × thread_process_record_count
```

**Key Factors:**

* **Average document size** - Larger documents use more memory
* **Thread count** - More threads = more concurrent memory usage
* **thread\_process\_record\_count** - More records per batch = more memory per thread (default: 1000)
* **Multiplier of 2** - Accounts for data structures and processing overhead

**Memory Constraints:**

* Python procedures have significant memory requirements
* Complex nested structures increase memory usage
* Large arrays consume additional memory
* Warehouse size determines available memory

**Optimization:**

* Reduce `thread_process_record_count` if memory errors occur
* Reduce thread count if warehouse memory is limited
* Default (1000) works for most scenarios

***

### Procedure Calls & Timeout Management

#### 60-Minute Timeout Constraint

**Snowflake Native App Limitation:**

* Snowpark procedures in native apps have a 3600-second (60-minute) runtime limit
* This is an organizational-level Snowflake limitation
* Scans exceeding 60 minutes are automatically terminated
* Not a DataPancake limitation—applies to all Snowflake native apps

**Impact:**

* Very large data sources cannot be scanned in a single procedure call
* Must break scans into multiple procedure calls
* Each procedure call processes a subset of records
* Procedure calls execute sequentially via Snowflake Tasks

#### Multiple Procedure Calls

**When to Use:**

* Data source takes longer than 60 minutes to scan
* Very large datasets (millions of records)
* Complex nested structures requiring more processing time

**Configuration:**

* **Number of Procedure Calls** (`PROCEDURE_INSTANCE_COUNT`) - Total number of procedure calls needed
* **Record Count Per Procedure Call** (`PROCEDURE_INSTANCE_ROW_COUNT`) - Records processed per call
* **Order By** (`SCAN_ORDER_BY`) - Required for multiple calls to ensure consistent partitioning

**How It Works:**

1. Data is divided into N procedure calls based on record count
2. Each procedure call processes its assigned record range using `THREAD_RANGE_CRITERIA`
3. Procedure calls execute sequentially via Snowflake Tasks
4. Each call must complete within 60 minutes
5. Results are consolidated after all calls complete

**Important Constraints:**

* **WHERE clause cannot be used** when `PROCEDURE_INSTANCE_COUNT > 1` (enforced in `Scan_Configuration.py:379-384`)
* **ORDER BY is required** when `PROCEDURE_INSTANCE_COUNT > 1` (enforced in `Scan_Configuration.py:366`)
* **Record count validation** - Must be large enough to process all rows but not exceed total records (validated in `upsert_datasource_scan_configuration.py:139-158`)

**Example:**

* Dataset: 2,000,000 records
* Configuration: 2 procedure calls, 1,000,000 records per call
* Procedure Call 1: Processes records 1-1,000,000 (via `THREAD_RANGE_CRITERIA`)
* Procedure Call 2: Processes records 1,000,001-2,000,000 (via `THREAD_RANGE_CRITERIA`)
* Both calls execute sequentially, each within 60-minute limit

#### Order By Requirement

**When Required:**

* `PROCEDURE_INSTANCE_COUNT > 1` - Ensures consistent record partitioning across calls

**When Optional:**

* `PROCEDURE_INSTANCE_COUNT = 1` and `SCAN_RECORD_LIMIT > 0` - Can improve performance for limited scans

**When Not Required:**

* `PROCEDURE_INSTANCE_COUNT = 1` and `SCAN_RECORD_LIMIT = 0` - Full table scan in single call

**Examples:**

* `json_data:"_id"` - Single attribute
* `customer_id` - Single attribute
* `customer_id, json_data:"_id"` - Multiple attributes (composite)

**Supported / Not Supported:**

* ✅ Single or multiple attributes in ORDER BY
* ✅ JSON path expressions (e.g., `json_data:"_id"`)
* ✅ Composite ORDER BY (multiple attributes)
* ❌ ORDER BY with WHERE clause when `PROCEDURE_INSTANCE_COUNT > 1`

***

### Data Processing

During scanning, each thread recursively traverses its assigned records to discover attributes at any nesting depth. The process handles nested objects, arrays, embedded JSON, and creates all 7 polymorphic versions proactively when attributes are first encountered. Thread results are consolidated after processing, with attribute data merged and duplicate attributes combined.

For detailed information on attribute discovery, polymorphic versions, and the discovery process, see the [Attribute Consolidation](https://docs.datapancake.com/core-concepts/attribute-consolidation) documentation.

**Supported / Not Supported:**

* ✅ Recursive traversal of nested structures (any depth)
* ✅ Polymorphic variation detection
* ✅ Embedded JSON parsing
* ✅ Array handling
* ❌ Structured (non-semi-structured) data sources (use different scan procedure)

***

### Error Handling

Scan processing is designed to be resilient: individual record errors (such as invalid JSON syntax) don't stop the scan—the problematic record is skipped and processing continues. Thread errors are logged but don't stop other threads, ensuring the scan processes as many records as possible.

**Error Tracking:**

* Errors are tracked in the `datasource_scan_error` table
* Can be queried via system views for troubleshooting
* Error types include: JSON parse errors, thread processing errors, validation errors

**Supported / Not Supported:**

* ✅ Individual record errors are skipped (scan continues)
* ✅ Thread errors are logged (other threads continue)
* ✅ Error tracking and querying via views
* ❌ Automatic retry of failed records (manual intervention required)

***

### Performance Characteristics

#### Scan Speed

**Benchmark Performance:**

* Approximately 1,000,000 records per minute on Medium Snowpark-Optimized warehouse
* Performance varies based on:
  * Data complexity (nesting depth, array sizes)
  * Document size
  * Number of polymorphic variations
  * Embedded JSON content
  * Data source type (Internal Tables vs. External Tables)

**External Tables:**

* Slightly slower than internal tables
* Network latency adds overhead
* Still achieves high throughput on medium warehouses

**Factors Affecting Speed:**

* **Thread count** - More threads = faster (up to warehouse max)
* **Batch size** (`thread_process_record_count`) - Larger batches = more efficient but more memory
* **Data complexity** - Deeper nesting = slower processing
* **Polymorphic variations** - More variations = more processing
* **Embedded JSON** - Stringified JSON adds parsing overhead

#### Optimization Strategies

**For Speed:**

* Use maximum thread count (default)
* Increase `thread_process_record_count` if memory allows
* Use appropriate warehouse size (Medium recommended)
* Use indexed columns in ORDER BY

**For Memory:**

* Reduce `thread_process_record_count` (default: 1000)
* Reduce thread count if needed
* Use smaller warehouse if memory constrained
* Process in smaller procedure calls

**For Large Datasets:**

* Use multiple procedure calls
* Set appropriate `PROCEDURE_INSTANCE_ROW_COUNT` per call
* Use efficient ORDER BY attributes
* Monitor progress and adjust as needed

***

### Common Scenarios

#### Scenario 1: Standard Full Scan

**Configuration:**

* `PROCEDURE_INSTANCE_COUNT = 1`
* Default thread count (warehouse max)
* `SCAN_RECORD_LIMIT = 0` (no limit)
* No WHERE clause

**Process:**

* All records processed in single call
* Completes within 60 minutes (if dataset allows)
* Full attribute discovery

#### Scenario 2: Large Dataset Scan

**Configuration:**

* `PROCEDURE_INSTANCE_COUNT > 1` (e.g., 4 calls)
* `PROCEDURE_INSTANCE_ROW_COUNT = 1,000,000` records per call
* `SCAN_ORDER_BY` on unique identifier
* Default thread count

**Process:**

* Data divided into N ranges via `THREAD_RANGE_CRITERIA`
* Each call processes its range sequentially via Tasks
* Results consolidated after all calls complete

#### Scenario 3: Incremental Scan

**Configuration:**

* `PROCEDURE_INSTANCE_COUNT = 1`
* `SCAN_WHERE_CLAUSE` with timestamp filter
* `SOURCE_STREAM_LAST_SCANNED_TIMESTAMP` from previous scan

**Process:**

* Only new/changed records scanned
* Faster than full scan
* Maintains schema discovery for new data

**Note:** WHERE clause only works with `PROCEDURE_INSTANCE_COUNT = 1`

#### Scenario 4: Memory-Constrained Scan

**Configuration:**

* Reduced thread count
* Reduced `thread_process_record_count` (below default 1000)
* Smaller warehouse size

**Process:**

* Lower memory usage, slower processing
* Prevents memory errors
* Suitable for complex nested data

***

### Summary

Scan processing is DataPancake's high-performance approach to schema discovery. Key takeaways:

* **Vertical scaling** - Uses multiple threads within a single warehouse for parallel processing
* **Batch processing** - Divides data into batches processed simultaneously across threads (default: 1000 records per thread)
* **Timeout management** - Multiple procedure calls handle datasets exceeding 60-minute limit
* **Memory control** - Batch size (`thread_process_record_count`) and thread count control memory usage
* **Recursive discovery** - Deep traversal discovers all attributes at any nesting depth
* **Progressive processing** - Attributes discovered incrementally as data is processed
* **Error resilience** - Individual errors don't stop the scan, processing continues

**Key Constraints:**

* 60-minute timeout per procedure call (Snowflake limitation)
* WHERE clause cannot be used with multiple procedure calls
* ORDER BY required when `PROCEDURE_INSTANCE_COUNT > 1`
* Thread count limited by warehouse `AVAILABLE_CORES`

By understanding scan processing, you can optimize scan configurations for your specific data sources, balancing performance, memory usage, and timeout constraints.
