Array Foreign Key - Initialization

Select a stage and upload the stored procedure to enable bulk updates for array foreign key metadata metadata.

1. (Optional) Create a new stage for datapancake files (or use an existing stage)

CREATE STAGE <DATABASE>.<SCHEMA>.DATAPANCAKE_FILES;

For more details on creating a stage, see https://docs.snowflake.com/en/sql-reference/sql/create-stage

2. Copy the Array Foreign Key Procedure DDL

CREATE OR REPLACE PROCEDURE <DATABASE_NAME>.<SCHEMA_NAME>.sp_update_datapancake_array_foreign_key_metadata_core(
    FILE_PATH string,
    ARCHIVE_PATH string, 
    USER_NAME string
)
RETURNS string
LANGUAGE PYTHON
RUNTIME_VERSION = '3.11'
PACKAGES = ('snowflake-snowpark-python','orjson')
HANDLER = 'update_datapancake_array_foreign_key_metadata'
EXECUTE AS CALLER

AS
$$
import snowflake.snowpark as snowpark
import datetime
import traceback
import orjson

def update_datapancake_array_foreign_key_metadata(session: snowpark.Session, FILE_PATH: str, ARCHIVE_PATH: str, USER_NAME: str):
    try:

        # Initialize the output variable
        
        foreign_key_data = ""

        # Step 1: Read the CSV file from the specified Snowflake stage
        # Set skip_header=0 so we can manually extract and process the first row as the header
        
        df = session.read.options({"FIELD_DELIMITER": ",","TRIM_SPACE":True,"SKIP_HEADER": 0, "FIELD_OPTIONALLY_ENCLOSED_BY":'"'}).csv(FILE_PATH)
        
        # Step 2: Collect all rows from the file
        
        rows = df.collect()
        if not rows:
            return "No rows found."

        # Step 3: Extract column names from the first row (header)
        # Convert column names to uppercase and strip whitespace
        
        header_row = rows[0]
        header_names = [str(getattr(header_row, f"c{i+1}")).strip().upper() for i in range(len(header_row))]

        # Step 4: Extract all remaining data rows
        
        data_rows = rows[1:]
        if not data_rows:
            return "Only header row found."

        # Step 5: Build a list of records (dicts) using the extracted headers
        
        records = []
        for row in data_rows:
            record = {}
            for i in range(len(header_names)):
                value = getattr(row, f"c{i+1}")
                record[header_names[i]] = str(value) if value not in [None, ""] else None
            records.append(record) 
            
        # Step 6: Serialize the records list as JSON string using orjson
        # Default=str handles values like datetime or decimal types  
        
        foreign_key_data = orjson.dumps(records,default=str).decode("utf-8")

        # Step 7: Call the DataPancake procedure to update the array foreign key metadata
        result = session.call("datapancake.core.update_array_foreign_key_metadata_core",USER_NAME, foreign_key_data, FILE_PATH)
        
        # Use substring matching to be more flexible
        if "Success" in str(result):

            #Step 8: Optional: Archive the input file to a new location
            if ARCHIVE_PATH is not None and len(ARCHIVE_PATH) > 0 and ARCHIVE_PATH != FILE_PATH:
                session.sql(f"COPY FILES INTO '{ARCHIVE_PATH}' FROM '{FILE_PATH}'").collect()

            #Step 9: Optional - Remove the source file from the stage
            session.sql(f"REMOVE '{FILE_PATH}'").collect()

            #Step 10: Return message showing the successful result
            return f"The DataPancake array foreign key metadata has been successfully updated and the file has been archived. Processed {len(records)} rows with columns: {', '.join(header_names)}"
        else:
            #Step 10: Return the error message
            return f"Error: {result} and: {rows[1]}"
            
    except Exception as e:
        return f"Error: {str(e)} with a trace of: {traceback.format_exc()}" 
$$;

3. Paste the DDL and modify the database, schema name, and DataPancake application database name

4. Execute the create stored procedure statement

5. Verify the stored procedure was created successfully

Last updated

Was this helpful?