Procedure DDL - Core + Data Dictionary

Copy and execute this stored procedure DDL in your Snowflake account to perform bulk updates for DataPancake attribute metadata (including data dictionary) from .csv files

CREATE OR REPLACE PROCEDURE <database_name>.<schema_name>.sp_update_datapancake_attribute_metadata_dictionary(
    FILE_PATH string,
    ARCHIVE_PATH string, 
    USER_NAME string
)
RETURNS string
LANGUAGE PYTHON
RUNTIME_VERSION = '3.11'
PACKAGES = ('snowflake-snowpark-python','orjson')
HANDLER = 'update_datapancake_attribute_metadata'
EXECUTE AS CALLER

AS
$$
import snowflake.snowpark as snowpark
import datetime
import traceback
import orjson

def update_datapancake_attribute_metadata(session: snowpark.Session, FILE_PATH: str, ARCHIVE_PATH: str, USER_NAME: str):
    try:

        # Initialize the output variable
        
        attribute_data = ""

        # Step 1: Read the CSV file from the specified Snowflake stage
        # Set skip_header=0 so we can manually extract and process the first row as the header
        
        df = session.read.options({"field_delimiter": ",", "skip_header": 0}).csv(FILE_PATH)
        
        # Step 2: Collect all rows from the file
        
        rows = df.collect()
        if not rows:
            return "No rows found."

        # Step 3: Extract column names from the first row (header)
        # Convert column names to uppercase and strip whitespace
        
        header_row = rows[0]
        header_names = [str(getattr(header_row, f"c{i+1}")).strip().upper() for i in range(len(header_row))]

        # Step 4: Extract all remaining data rows
        
        data_rows = rows[1:]
        if not data_rows:
            return "Only header row found."

        # Step 5: Build a list of records (dicts) using the extracted headers
        
        records = []
        for row in data_rows:
            record = {}
            for i in range(len(header_names)):
                value = getattr(row, f"c{i+1}")
                record[header_names[i]] = str(value).strip() if value not in [None, ""] else None
            records.append(record) 
            
        # Step 6: Serialize the records list as JSON string using orjson
        # Default=str handles values like datetime or decimal types  
        
        attribute_data = orjson.dumps(records,default=str).decode("utf-8")

        # Step 7: Call the DataPancake procedure to update the attribute metadata
        result = session.call("DATAPANCAKE.CORE.UPDATE_ATTRIBUTE_METADATA_DICTIONARY",USER_NAME, attribute_data, FILE_PATH)
        if result == "Success":

            #Step 8: Optional: Archive the input file to a new location
            if ARCHIVE_PATH is not None and len(ARCHIVE_PATH) > 0 and ARCHIVE_PATH != FILE_PATH:
                session.sql(f"COPY FILES INTO '{ARCHIVE_PATH}' FROM '{FILE_PATH}'").collect()

            #Step 9: Optional - Remove the source file from the stage
            session.sql(f"REMOVE '{FILE_PATH}'").collect()

            #Step 10: Return message showing the successful result
            return f"The DataPancake attribute metadata has been successfully updated and the file has been archived. Processed {len(records)} rows with columns: {', '.join(header_names)}"
        else:
            #Step 10: Return the error message
            return f"Error: {result} and: {rows[1]}"
            
    except Exception as e:
        return f"Error: {str(e)} with a trace of: {traceback.format_exc()}"
$$;

Last updated