# Schema Transformation / Consolidation - Intialization

### (Optional) Create a new stage for datapancake files (or use an existing stage)

```sql
CREATE STAGE <DATABASE>.<SCHEMA>.DATAPANCAKE_FILES;
```

For more details on creating a stage, see <https://docs.snowflake.com/en/sql-reference/sql/create-stage>

![](https://colony-recorder.s3.amazonaws.com/files/2026-01-19/149fecfb-e559-4576-a13d-88e6b9d9798a/ascreenshot_e2dfe86c61b247be9cccc7da03b6f2a9_text_export.jpeg)

### Copy the Schema Transformation Procedure DDL

```sql
CREATE OR REPLACE PROCEDURE <database_name>.<schema_name>.sp_update_datapancake_datasource_schema_transformation_metadata_core(
    FILE_PATH string,
    ARCHIVE_PATH string, 
    USER_NAME string
)
RETURNS string
LANGUAGE PYTHON
RUNTIME_VERSION = '3.11'
PACKAGES = ('snowflake-snowpark-python','orjson')
HANDLER = 'update_datapancake_datasource_schema_transformation_metadata'
EXECUTE AS CALLER

AS
$$
import snowflake.snowpark as snowpark
import datetime
import traceback
import orjson

def update_datapancake_datasource_schema_transformation_metadata(session: snowpark.Session, FILE_PATH: str, ARCHIVE_PATH: str, USER_NAME: str):
    try:

        # Initialize the output variable
        transformation_data = ""

        # Step 1: Read the CSV file from the specified Snowflake stage
        # Set skip_header=0 so we can manually extract and process the first row as the header
        
        df = session.read.options({"FIELD_DELIMITER": ",","TRIM_SPACE":True,"SKIP_HEADER": 0, "FIELD_OPTIONALLY_ENCLOSED_BY":'"'}).csv(FILE_PATH)
        
        # Step 2: Collect all rows from the file
        
        rows = df.collect()
        if not rows:
            return "No rows found."

        # Step 3: Extract column names from the first row (header)
        # Convert column names to uppercase and strip whitespace
        
        header_row = rows[0]
        header_names = [str(getattr(header_row, f"c{i+1}")).strip().upper() for i in range(len(header_row))]

        # Step 4: Extract all remaining data rows
        
        data_rows = rows[1:]
        if not data_rows:
            return "Only header row found."

        # Step 5: Build a list of records (dicts) using the extracted headers
        
        records = []
        for row in data_rows:
            record = {}
            for i in range(len(header_names)):
                value = getattr(row, f"c{i+1}")
                record[header_names[i]] = str(value) if value not in [None, ""] else None
            records.append(record) 
            
        # Step 6: Serialize the records list as JSON string using orjson
        # Default=str handles values like datetime or decimal types  
        
        transformation_data = orjson.dumps(records,default=str).decode("utf-8")

        # Step 7: Call the DataPancake procedure to update the schema transformation metadata
        result = session.call("DATAPANCAKE.CORE.UPDATE_DATASOURCE_SCHEMA_TRANSFORMATION_CORE",USER_NAME, transformation_data, FILE_PATH)
        if result.startswith("Success"):

            #Step 8: Optional: Archive the input file to a new location
            if ARCHIVE_PATH is not None and len(ARCHIVE_PATH) > 0 and ARCHIVE_PATH != FILE_PATH:
                session.sql(f"COPY FILES INTO '{ARCHIVE_PATH}' FROM '{FILE_PATH}'").collect()

            #Step 9: Optional - Remove the source file from the stage
            session.sql(f"REMOVE '{FILE_PATH}'").collect()

            #Step 10: Return message showing the successful result
            return f"The DataPancake schema transformation metadata has been successfully updated and the file has been archived. Processed {len(records)} rows with columns: {', '.join(header_names)}"
        else:
            #Step 10: Return the error message
            return f"Error: {result}"
            
    except Exception as e:
        return f"Error: {str(e)} with a trace of: {traceback.format_exc()}"

$$;
```

### Paste the DDL and modify the database and schema name

{% hint style="info" %}
If you changed the name during install, modify DataPancake application database name.
{% endhint %}

![](https://colony-recorder.s3.amazonaws.com/files/2026-01-19/16ab4f1b-5fc4-40de-a8a6-3dee0c3b79d9/ascreenshot_3c7475f9b6ac40d5a850f128b7af6278_text_export.jpeg)

![](https://colony-recorder.s3.amazonaws.com/files/2026-01-19/7d860f93-eb68-4fdb-96c7-ecbfc2f3ad66/ascreenshot_f81ff070201c4db2b2f9ae494e019c71_text_export.jpeg)

### Execute the create stored procedure statement

![](https://colony-recorder.s3.amazonaws.com/files/2026-01-19/1fcaf425-5241-480e-ad51-2b7b64a83b67/ascreenshot_e34c0377277e4e5ca7f052878a903bb2_text_export.jpeg)

### Verify the stored procedure was created successfully

![](https://colony-recorder.s3.amazonaws.com/files/2026-01-19/3e518af4-76e6-4fd5-b259-623394824462/ascreenshot_3a969c0b03784ef9b5a345100d14a6e2_text_export.jpeg)
