-
Notifications
You must be signed in to change notification settings - Fork 0
/
streamlit_app.py
129 lines (107 loc) · 4.76 KB
/
streamlit_app.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
import snowflake.connector
import streamlit as st
import pandas as pd
import requests
import json
import re
from urllib.error import URLError
# Streamlit secrets
streamlit_secrets = st.secrets["snowflake"]
# Snowflake connection credentials
snowflake_username = streamlit_secrets["user"]
snowflake_password = streamlit_secrets["password"]
snowflake_account = streamlit_secrets["account"]
snowflake_warehouse = streamlit_secrets["warehouse"]
snowflake_database = streamlit_secrets["database"]
snowflake_schema = streamlit_secrets["schema"]
# Connect to Snowflake
conn = snowflake.connector.connect(
user=snowflake_username,
password=snowflake_password,
account=snowflake_account,
warehouse=snowflake_warehouse,
database=snowflake_database,
schema=snowflake_schema
)
# Step 1: List entries in the Snowflake stage
stage_name = 'evt_2_sf'
stage_entries = conn.cursor().execute(f"LIST @evt_2_sf").fetchall()
# Streamlit UI
st.title("Snowflake Stage File Analysis")
st.write("Select a file from the stage and choose a mapping")
# Display stage entries to the user
st.write("Stage Entries:")
selected_entry = st.selectbox("Select an entry", [entry[0] for entry in stage_entries])
# Extract the filename from the selected entry
selected_entry = selected_entry.split("/")[-1].strip()
# File formats
file_formats = ["JSON", "CSV", "UNKNOWN", "AVRO", "XML", "PARQUET", "CUSTOM"]
# File format selection
selected_file_format = st.selectbox("Select a file format", file_formats)
# Initialize 'result' to None
result = None
if selected_file_format == "CUSTOM":
regex_pattern = st.text_input("Enter your regex pattern")
try:
if selected_file_format != "CUSTOM":
# Step 2: Retrieve structure of the selected file
query = f"SELECT $1 FROM @{stage_name}/{selected_entry} (file_format => {selected_file_format}) LIMIT 1"
result = conn.cursor().execute(query).fetchone()
else:
query = f"SELECT $1 FROM @{stage_name}/{selected_entry}"
result = conn.cursor().execute(query).fetchone()
except snowflake.connector.errors.ProgrammingError as e:
st.error("Not the parser, try another.")
st.error(str(e))
# If 'result' is None, print error message and skip the rest of the script
if result is None:
st.error("No result could be retrieved from the database.")
else:
# Display result
st.write("Result:")
st.write(result)
# Get the structure as a string
structure_string = result[0]
if selected_file_format == "JSON":
try:
# Parse JSON structure into a dictionary
structure_dict = json.loads(structure_string)
except json.JSONDecodeError:
st.error("The result could not be parsed as a JSON string.")
structure_dict = None
elif selected_file_format == "CUSTOM":
# Use regex to find matches in the string
regex_matches = re.finditer(regex_pattern, structure_string)
if regex_matches:
# Create an empty list to store the field mappings# Create an empty list to store the field mappings
field_mapping_data = []
# Iterate over the regex matches and prompt for field names
for i, match in enumerate(regex_matches, start=1):
if match.groups():
for j, group in enumerate(match.groups(), start=1):
field_name = st.text_input(f"Enter field name for Token {i} Group {j} (Value: {group})", key=f"field_name_{i}_{j}", max_chars=20)
regex_pattern = f"(?P<{field_name}>{re.escape(group)})" if field_name else None
field_mapping_data.append({"Field Name": field_name, "Value": group, "Regex Pattern": regex_pattern})
else:
field_mapping_data.append({"Field Name": "", "Value": match.group(), "Regex Pattern": None})
# Create a DataFrame from the field mappings
field_mapping_df = pd.DataFrame(field_mapping_data)
# Display the field mappings table
st.write("Field Mappings:")
if not field_mapping_df.empty:
st.table(field_mapping_df)
# Get selected field names for query
selected_fields = field_mapping_df[field_mapping_df["Field Name"] != ""]["Field Name"].tolist()
# Generate SQL statement
if st.button("Generate SQL") and selected_fields:
select_statement = "SELECT " + ", ".join(selected_fields) + f" FROM @{stage_name}/{selected_entry} (file_format => {selected_file_format})"
st.write("Generated Select Statement:")
st.code(select_statement)
if st.button("Generate Regex") and selected_regex_patterns:
combined_regex = "|".join(selected_regex_patterns)
st.write("Generated Regex:")
st.code(combined_regex)
else:
st.error("No matches found in the result using the provided regex pattern.")
else:
st.error("Invalid file format selected.")