Schema Transformation / Consolidation - Intialization

Select a stage and upload the stored procedure to enable bulk updates for schema transformation metadata.

1. (Optional) Create a new stage for datapancake files (or use an existing stage)

CREATE STAGE <DATABASE>.<SCHEMA>.DATAPANCAKE_FILES;

For more details on creating a stage, see https://docs.snowflake.com/en/sql-reference/sql/create-stage

2. Copy the Schema Transformation Procedure DDL

CREATE OR REPLACE PROCEDURE <database_name>.<schema_name>.sp_update_datapancake_datasource_schema_transformation_metadata_core(
    FILE_PATH string,
    ARCHIVE_PATH string, 
    USER_NAME string
)
RETURNS string
LANGUAGE PYTHON
RUNTIME_VERSION = '3.11'
PACKAGES = ('snowflake-snowpark-python','orjson')
HANDLER = 'update_datapancake_datasource_schema_transformation_metadata'
EXECUTE AS CALLER

AS
$$
import snowflake.snowpark as snowpark
import datetime
import traceback
import orjson

def update_datapancake_datasource_schema_transformation_metadata(session: snowpark.Session, FILE_PATH: str, ARCHIVE_PATH: str, USER_NAME: str):
    try:

        # Initialize the output variable
        transformation_data = ""

        # Step 1: Read the CSV file from the specified Snowflake stage
        # Set skip_header=0 so we can manually extract and process the first row as the header
        
        df = session.read.options({"FIELD_DELIMITER": ",","TRIM_SPACE":True,"SKIP_HEADER": 0, "FIELD_OPTIONALLY_ENCLOSED_BY":'"'}).csv(FILE_PATH)
        
        # Step 2: Collect all rows from the file
        
        rows = df.collect()
        if not rows:
            return "No rows found."

        # Step 3: Extract column names from the first row (header)
        # Convert column names to uppercase and strip whitespace
        
        header_row = rows[0]
        header_names = [str(getattr(header_row, f"c{i+1}")).strip().upper() for i in range(len(header_row))]

        # Step 4: Extract all remaining data rows
        
        data_rows = rows[1:]
        if not data_rows:
            return "Only header row found."

        # Step 5: Build a list of records (dicts) using the extracted headers
        
        records = []
        for row in data_rows:
            record = {}
            for i in range(len(header_names)):
                value = getattr(row, f"c{i+1}")
                record[header_names[i]] = str(value) if value not in [None, ""] else None
            records.append(record) 
            
        # Step 6: Serialize the records list as JSON string using orjson
        # Default=str handles values like datetime or decimal types  
        
        transformation_data = orjson.dumps(records,default=str).decode("utf-8")

        # Step 7: Call the DataPancake procedure to update the schema transformation metadata
        result = session.call("DATAPANCAKE.CORE.UPDATE_DATASOURCE_SCHEMA_TRANSFORMATION_CORE",USER_NAME, transformation_data, FILE_PATH)
        if result.startswith("Success"):

            #Step 8: Optional: Archive the input file to a new location
            if ARCHIVE_PATH is not None and len(ARCHIVE_PATH) > 0 and ARCHIVE_PATH != FILE_PATH:
                session.sql(f"COPY FILES INTO '{ARCHIVE_PATH}' FROM '{FILE_PATH}'").collect()

            #Step 9: Optional - Remove the source file from the stage
            session.sql(f"REMOVE '{FILE_PATH}'").collect()

            #Step 10: Return message showing the successful result
            return f"The DataPancake schema transformation metadata has been successfully updated and the file has been archived. Processed {len(records)} rows with columns: {', '.join(header_names)}"
        else:
            #Step 10: Return the error message
            return f"Error: {result}"
            
    except Exception as e:
        return f"Error: {str(e)} with a trace of: {traceback.format_exc()}"

$$;

3. Paste the DDL and modify the database, schema name, and DataPancake application database name

4. Execute the create stored procedure statement

5. Verify the stored procedure was created successfully

Last updated

Was this helpful?