Datasource Metadata - Initialization
Select a stage and upload the stored procedure to enable bulk updates for datasource metadata.
1. (Optional) Create a new stage for datapancake files (or use an existing stage)
CREATE STAGE <DATABASE>.<SCHEMA>.DATAPANCAKE_FILES;For more details on creating a stage, see https://docs.snowflake.com/en/sql-reference/sql/create-stage

2. Copy the Datasource Metadata Procedure DDL
CREATE OR REPLACE PROCEDURE <DATABASE_NAME>.<SCHEMA_NAME>.sp_update_datapancake_datasource_metadata(
FILE_PATH string,
ARCHIVE_PATH string,
USER_NAME string
)
RETURNS string
LANGUAGE PYTHON
RUNTIME_VERSION = '3.11'
PACKAGES = ('snowflake-snowpark-python','orjson')
HANDLER = 'update_datapancake_datasource_metadata'
EXECUTE AS CALLER
AS
$$
import snowflake.snowpark as snowpark
import datetime
import traceback
import orjson
def update_datapancake_datasource_metadata(session: snowpark.Session, FILE_PATH: str, ARCHIVE_PATH: str, USER_NAME: str):
try:
# Initialize the output variable
datasource_data = ""
# Step 1: Read the CSV file from the specified Snowflake stage
# Set skip_header=0 so we can manually extract and process the first row as the header
df = session.read.options({"field_delimiter": ",", "skip_header": 0}).csv(FILE_PATH)
# Step 2: Collect all rows from the file
rows = df.collect()
if not rows:
return "No rows found."
# Step 3: Extract column names from the first row (header)
# Convert column names to uppercase and strip whitespace
header_row = rows[0]
header_names = [str(getattr(header_row, f"c{i+1}")).strip().upper() for i in range(len(header_row))]
# Step 4: Extract all remaining data rows
data_rows = rows[1:]
if not data_rows:
return "Only header row found."
# Step 5: Build a list of records (dicts) using the extracted headers
records = []
for row in data_rows:
record = {}
for i in range(len(header_names)):
value = getattr(row, f"c{i+1}")
record[header_names[i]] = str(value).strip() if value not in [None, ""] else None
records.append(record)
# Step 6: Serialize the records list as JSON string using orjson
# Default=str handles values like datetime or decimal types
datasource_data = orjson.dumps(records,default=str).decode("utf-8")
# Step 7: Call the DataPancake procedure to update the datasource metadata
result = session.call("DATAPANCAKE.CORE.UPDATE_DATAPANCAKE_DATASOURCE_METADATA_CORE",USER_NAME, datasource_data, FILE_PATH)
if result.startswith("Successfully updated"):
#Step 8: Optional: Archive the input file to a new location
if ARCHIVE_PATH is not None and len(ARCHIVE_PATH) > 0 and ARCHIVE_PATH != FILE_PATH:
session.sql(f"COPY FILES INTO '{ARCHIVE_PATH}' FROM '{FILE_PATH}'").collect()
#Step 9: Optional - Remove the source file from the stage
session.sql(f"REMOVE '{FILE_PATH}'").collect()
#Step 10: Return message showing the successful result
return f"The DataPancake datasource metadata has been successfully updated and the file has been archived. Processed {len(records)} rows with columns: {', '.join(header_names)}"
else:
#Step 10: Return the error message
return f"Error: {result}"
except Exception as e:
return f"Error: {str(e)} with a trace of: {traceback.format_exc()}"
$$;3. Paste the DDL and modify the database, schema name, and DataPancake application database name


4. Execute the create stored procedure statement
5. Verify the stored procedure was created successfully

Last updated
Was this helpful?