Procedure DDL - Core + Data Dictionary
Copy and execute this stored procedure DDL in your Snowflake account to perform bulk updates for DataPancake attribute metadata (including data dictionary) from .csv files
CREATE OR REPLACE PROCEDURE <database_name>.<schema_name>.sp_update_datapancake_attribute_metadata_dictionary(
FILE_PATH string,
ARCHIVE_PATH string,
USER_NAME string
)
RETURNS string
LANGUAGE PYTHON
RUNTIME_VERSION = '3.11'
PACKAGES = ('snowflake-snowpark-python','orjson')
HANDLER = 'update_datapancake_attribute_metadata'
EXECUTE AS CALLER
AS
$$
import snowflake.snowpark as snowpark
import datetime
import traceback
import orjson
def update_datapancake_attribute_metadata(session: snowpark.Session, FILE_PATH: str, ARCHIVE_PATH: str, USER_NAME: str):
try:
# Initialize the output variable
attribute_data = ""
# Step 1: Read the CSV file from the specified Snowflake stage
# Set skip_header=0 so we can manually extract and process the first row as the header
df = session.read.options({"field_delimiter": ",", "skip_header": 0}).csv(FILE_PATH)
# Step 2: Collect all rows from the file
rows = df.collect()
if not rows:
return "No rows found."
# Step 3: Extract column names from the first row (header)
# Convert column names to uppercase and strip whitespace
header_row = rows[0]
header_names = [str(getattr(header_row, f"c{i+1}")).strip().upper() for i in range(len(header_row))]
# Step 4: Extract all remaining data rows
data_rows = rows[1:]
if not data_rows:
return "Only header row found."
# Step 5: Build a list of records (dicts) using the extracted headers
records = []
for row in data_rows:
record = {}
for i in range(len(header_names)):
value = getattr(row, f"c{i+1}")
record[header_names[i]] = str(value).strip() if value not in [None, ""] else None
records.append(record)
# Step 6: Serialize the records list as JSON string using orjson
# Default=str handles values like datetime or decimal types
attribute_data = orjson.dumps(records,default=str).decode("utf-8")
# Step 7: Call the DataPancake procedure to update the attribute metadata
result = session.call("DATAPANCAKE.CORE.UPDATE_ATTRIBUTE_METADATA_DICTIONARY",USER_NAME, attribute_data, FILE_PATH)
if result == "Success":
#Step 8: Optional: Archive the input file to a new location
if ARCHIVE_PATH is not None and len(ARCHIVE_PATH) > 0 and ARCHIVE_PATH != FILE_PATH:
session.sql(f"COPY FILES INTO '{ARCHIVE_PATH}' FROM '{FILE_PATH}'").collect()
#Step 9: Optional - Remove the source file from the stage
session.sql(f"REMOVE '{FILE_PATH}'").collect()
#Step 10: Return message showing the successful result
return f"The DataPancake attribute metadata has been successfully updated and the file has been archived. Processed {len(records)} rows with columns: {', '.join(header_names)}"
else:
#Step 10: Return the error message
return f"Error: {result} and: {rows[1]}"
except Exception as e:
return f"Error: {str(e)} with a trace of: {traceback.format_exc()}"
$$;
Last updated