Attribute Metadata Bulk Updates (Security) - Procedure DDL
Copy and execute this stored procedure DDL in your Snowflake account to perform bulk updates on DataPancake attribute metadata (including security) from .csv files
CREATE OR REPLACE PROCEDURE <database_name>.<schema_name>.sp_update_datapancake_attribute_metadata_security(
FILE_PATH string,
ARCHIVE_PATH string,
USER_NAME string
)
RETURNS string
LANGUAGE PYTHON
RUNTIME_VERSION = '3.11'
PACKAGES = ('snowflake-snowpark-python','orjson')
HANDLER = 'update_datapancake_attribute_metadata'
EXECUTE AS CALLER
AS
$$
import snowflake.snowpark as snowpark
import datetime
import traceback
import orjson
def update_datapancake_attribute_metadata(session: snowpark.Session, FILE_PATH: str, ARCHIVE_PATH: str, USER_NAME: str):
try:
# Initialize the output variable
attribute_data = ""
# Step 1: Read the CSV file from the specified Snowflake stage
# Set skip_header=0 so we can manually extract and process the first row as the header
df = session.read.options({"field_delimiter": ",", "skip_header": 0}).csv(FILE_PATH)
# Step 2: Collect all rows from the file
rows = df.collect()
if not rows:
return "No rows found."
# Step 3: Extract column names from the first row (header)
# Convert column names to uppercase and strip whitespace
header_row = rows[0]
header_names = [str(getattr(header_row, f"c{i+1}")).strip().upper() for i in range(len(header_row))]
# Step 4: Extract all remaining data rows
data_rows = rows[1:]
if not data_rows:
return "Only header row found."
# Step 5: Build a list of records (dicts) using the extracted headers
records = []
for row in data_rows:
record = {}
for i in range(len(header_names)):
value = getattr(row, f"c{i+1}")
record[header_names[i]] = str(value).strip() if value not in [None, ""] else None
records.append(record)
# Step 6: Serialize the records list as JSON string using orjson
# Default=str handles values like datetime or decimal types
attribute_data = orjson.dumps(records,default=str).decode("utf-8")
# Step 7: Call the DataPancake procedure to update the attribute metadata
result = session.call("DATAPANCAKE.CORE.UPDATE_ATTRIBUTE_METADATA_SECURITY",USER_NAME, attribute_data, FILE_PATH)
if result == "Success":
#Step 8: Optional: Archive the input file to a new location
if ARCHIVE_PATH is not None and len(ARCHIVE_PATH) > 0 and ARCHIVE_PATH != FILE_PATH:
session.sql(f"COPY FILES INTO '{ARCHIVE_PATH}' FROM '{FILE_PATH}'").collect()
#Step 9: Optional - Remove the source file from the stage
session.sql(f"REMOVE '{FILE_PATH}'").collect()
#Step 10: Return message showing the successful result
return f"The DataPancake attribute metadata has been successfully updated and the file has been archived. Processed {len(records)} rows with columns: {', '.join(header_names)}"
else:
#Step 10: Return the error message
return f"Error: {result} and: {rows[1]}"
except Exception as e:
return f"Error: {str(e)} with a trace of: {traceback.format_exc()}"
$$;
Last updated