Attribute Metadata Bulk Updates (Security) - Procedure DDL

Copy and execute this stored procedure DDL in your Snowflake account to perform bulk updates on DataPancake attribute metadata (including security) from .csv files

CREATE OR REPLACE PROCEDURE <database_name>.<schema_name>.sp_update_datapancake_attribute_metadata_security(
    FILE_PATH string,
    ARCHIVE_PATH string, 
    USER_NAME string
)
RETURNS string
LANGUAGE PYTHON
RUNTIME_VERSION = '3.11'
PACKAGES = ('snowflake-snowpark-python','orjson')
HANDLER = 'update_datapancake_attribute_metadata'
EXECUTE AS CALLER

AS
$$
import snowflake.snowpark as snowpark
import datetime
import traceback
import orjson

def update_datapancake_attribute_metadata(session: snowpark.Session, FILE_PATH: str, ARCHIVE_PATH: str, USER_NAME: str):
    try:

        # Initialize the output variable
        
        attribute_data = ""

        # Step 1: Read the CSV file from the specified Snowflake stage
        # Set skip_header=0 so we can manually extract and process the first row as the header
        
        df = session.read.options({"field_delimiter": ",", "skip_header": 0}).csv(FILE_PATH)
        
        # Step 2: Collect all rows from the file
        
        rows = df.collect()
        if not rows:
            return "No rows found."

        # Step 3: Extract column names from the first row (header)
        # Convert column names to uppercase and strip whitespace
        
        header_row = rows[0]
        header_names = [str(getattr(header_row, f"c{i+1}")).strip().upper() for i in range(len(header_row))]

        # Step 4: Extract all remaining data rows
        
        data_rows = rows[1:]
        if not data_rows:
            return "Only header row found."

        # Step 5: Build a list of records (dicts) using the extracted headers
        
        records = []
        for row in data_rows:
            record = {}
            for i in range(len(header_names)):
                value = getattr(row, f"c{i+1}")
                record[header_names[i]] = str(value).strip() if value not in [None, ""] else None
            records.append(record) 
            
        # Step 6: Serialize the records list as JSON string using orjson
        # Default=str handles values like datetime or decimal types  
        
        attribute_data = orjson.dumps(records,default=str).decode("utf-8")

        # Step 7: Call the DataPancake procedure to update the attribute metadata
        result = session.call("DATAPANCAKE.CORE.UPDATE_ATTRIBUTE_METADATA_SECURITY",USER_NAME, attribute_data, FILE_PATH)
        if result == "Success":

            #Step 8: Optional: Archive the input file to a new location
            if ARCHIVE_PATH is not None and len(ARCHIVE_PATH) > 0 and ARCHIVE_PATH != FILE_PATH:
                session.sql(f"COPY FILES INTO '{ARCHIVE_PATH}' FROM '{FILE_PATH}'").collect()

            #Step 9: Optional - Remove the source file from the stage
            session.sql(f"REMOVE '{FILE_PATH}'").collect()

            #Step 10: Return message showing the successful result
            return f"The DataPancake attribute metadata has been successfully updated and the file has been archived. Processed {len(records)} rows with columns: {', '.join(header_names)}"
        else:
            #Step 10: Return the error message
            return f"Error: {result} and: {rows[1]}"
            
    except Exception as e:
        return f"Error: {str(e)} with a trace of: {traceback.format_exc()}"
$$;

Last updated