Skip to content

Commit 7abc9d7

Browse files
committed
Adds code for snowflake container example running the UI
1 parent 1e1fb44 commit 7abc9d7

File tree

6 files changed

+369
-0
lines changed

6 files changed

+369
-0
lines changed
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
FROM python:3.13.1-slim
2+
3+
RUN apt update
4+
RUN apt upgrade sqlite3 -y
5+
RUN pip install "sf-hamilton[ui,sdk]"
6+
RUN pip install flask
7+
8+
ENV HAMILTON_ALLOWED_HOSTS=".snowflakecomputing.app"
9+
ENV SERVER_PORT=8001
10+
11+
COPY pipeline_endpoint.py /pipeline_endpoint.py
12+
COPY my_functions.py /my_functions.py
13+
14+
ENTRYPOINT /bin/bash -c "(hamilton ui --base-dir /hamilton-basedir &) && python /pipeline_endpoint.py"
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
# Running the Hamilton & the Hamilton UI in Snowflake
2+
3+
This example is code for the ["TODO" post]().
4+
5+
Here we show the code required to be packaged up for
6+
use on Snowflake.
7+
8+
TODO:
9+
- cut & paste instructions from blog post?
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
import pandas as pd
2+
3+
4+
def spend_mean(spend: pd.Series) -> float:
5+
"""Shows function creating a scalar. In this case it computes the mean of the entire column."""
6+
return spend.mean()
7+
8+
9+
def spend_zero_mean(spend: pd.Series, spend_mean: float) -> pd.Series:
10+
"""Shows function that takes a scalar. In this case to zero mean spend."""
11+
return spend - spend_mean
12+
13+
14+
def spend_std_dev(spend: pd.Series) -> float:
15+
"""Function that computes the standard deviation of the spend column."""
16+
return spend.std()
17+
18+
19+
def spend_zero_mean_unit_variance(spend_zero_mean: pd.Series, spend_std_dev: float) -> pd.Series:
20+
"""Function showing one way to make spend have zero mean and unit variance."""
21+
return spend_zero_mean / spend_std_dev
Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
import logging
2+
import os
3+
import sys
4+
5+
import pandas as pd
6+
from flask import Flask, make_response, request
7+
8+
from hamilton import registry
9+
10+
registry.disable_autoload()
11+
import my_functions # we import the module here!
12+
13+
from hamilton import driver
14+
from hamilton_sdk import adapters
15+
16+
# WRAPER CODE FOR SNOWFLAKE FUNCTION ######
17+
18+
SERVICE_HOST = os.getenv("SERVER_HOST", "0.0.0.0")
19+
SERVICE_PORT = os.getenv("SERVER_PORT", 8080)
20+
CHARACTER_NAME = os.getenv("CHARACTER_NAME", "I")
21+
22+
23+
def get_logger(logger_name):
24+
logger = logging.getLogger(logger_name)
25+
logger.setLevel(logging.DEBUG)
26+
handler = logging.StreamHandler(sys.stdout)
27+
handler.setLevel(logging.DEBUG)
28+
handler.setFormatter(logging.Formatter("%(name)s [%(asctime)s] [%(levelname)s] %(message)s"))
29+
logger.addHandler(handler)
30+
return logger
31+
32+
33+
logger = get_logger("echo-service")
34+
35+
app = Flask(__name__)
36+
37+
38+
@app.get("/healthcheck")
39+
def readiness_probe():
40+
return "OK"
41+
42+
43+
@app.post("/echo")
44+
def echo():
45+
message = request.json
46+
logger.debug(f"Received request: {message}")
47+
48+
if message is None or not message["data"]:
49+
logger.info("Received empty message")
50+
return {}
51+
52+
input_rows = message["data"]
53+
logger.info(f"Received {len(input_rows)} rows")
54+
55+
output_rows = [[row[0], get_response(row[1], row[2], row[3], row[4])] for row in input_rows]
56+
logger.info(f"Produced {len(output_rows)} rows")
57+
58+
response = make_response({"data": output_rows})
59+
response.headers["Content-type"] = "application/json"
60+
logger.debug(f"Sending response: {response.json}")
61+
return response
62+
63+
64+
# END OF WRAPER CODE FOR SNOWFLAKE FUNCTION ######
65+
66+
67+
def get_response(prj_id, spend, signups, output_columns):
68+
tracker = adapters.HamiltonTracker(
69+
project_id=prj_id,
70+
username="admin",
71+
dag_name="MYDAG",
72+
tags={"environment": "R&D", "team": "MY_TEAM", "version": "Beta"},
73+
)
74+
initial_columns = { # load from actuals or wherever -- this is our initial data we use as input.
75+
"signups": pd.Series(spend),
76+
"spend": pd.Series(signups),
77+
}
78+
dr = (
79+
driver.Builder()
80+
.with_config({}) # we don't have any configuration or invariant data for this example.
81+
.with_modules(
82+
my_functions
83+
) # we need to tell hamilton where to load function definitions from
84+
.with_adapters(tracker) # we want a pandas dataframe as output
85+
.build()
86+
)
87+
88+
df = dr.execute(output_columns, inputs=initial_columns)
89+
90+
serializable_df = {}
91+
92+
for key, value in df.items():
93+
if isinstance(value, pd.Series):
94+
# Convert Series to dict (or .tolist() for just values)
95+
serializable_df[key] = {str(k): v for k, v in value.to_dict().items()}
96+
else:
97+
# Pass other values as is
98+
serializable_df[key] = value
99+
100+
return serializable_df
101+
102+
103+
if __name__ == "__main__":
104+
app.run(host=SERVICE_HOST, port=SERVICE_PORT)
Lines changed: 128 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,128 @@
1+
import json
2+
import logging
3+
import os
4+
import sys
5+
6+
from flask import Flask, make_response, request
7+
8+
SERVICE_HOST = os.getenv("SERVER_HOST", "0.0.0.0")
9+
SERVICE_PORT = os.getenv("SERVER_PORT", 8080)
10+
CHARACTER_NAME = os.getenv("CHARACTER_NAME", "I")
11+
12+
13+
def get_logger(logger_name):
14+
logger = logging.getLogger(logger_name)
15+
logger.setLevel(logging.DEBUG)
16+
handler = logging.StreamHandler(sys.stdout)
17+
handler.setLevel(logging.DEBUG)
18+
handler.setFormatter(logging.Formatter("%(name)s [%(asctime)s] [%(levelname)s] %(message)s"))
19+
logger.addHandler(handler)
20+
return logger
21+
22+
23+
logger = get_logger("echo-service")
24+
25+
app = Flask(__name__)
26+
27+
28+
@app.get("/healthcheck")
29+
def readiness_probe():
30+
return "OK"
31+
32+
33+
@app.post("/echo")
34+
def echo():
35+
"""
36+
Main handler for input data sent by Snowflake.
37+
"""
38+
message = request.json
39+
logger.debug(f"Received request: {message}")
40+
41+
if message is None or not message["data"]:
42+
logger.info("Received empty message")
43+
return {}
44+
45+
# input format:
46+
# {"data": [
47+
# [row_index, column_1_value, column_2_value, ...],
48+
# ...
49+
# ]}
50+
input_rows = message["data"]
51+
logger.info(f"Received {len(input_rows)} rows")
52+
53+
# output format:
54+
# {"data": [
55+
# [row_index, column_1_value, column_2_value, ...}],
56+
# ...
57+
# ]}
58+
output_rows = [[row[0], get_response(row[1], row[2])] for row in input_rows]
59+
logger.info(f"Produced {len(output_rows)} rows")
60+
61+
response = make_response({"data": output_rows})
62+
response.headers["Content-type"] = "application/json"
63+
logger.debug(f"Sending response: {response.json}")
64+
return response
65+
66+
67+
import pandas as pd
68+
69+
# We add this to speed up running things if you have a lot in your python environment.
70+
from hamilton import registry
71+
72+
registry.disable_autoload()
73+
import my_functions # we import the module here!
74+
75+
from hamilton import driver
76+
from hamilton_sdk import adapters
77+
78+
79+
def get_response(prj_id, output_columns):
80+
tracker = adapters.HamiltonTracker(
81+
project_id=prj_id,
82+
username="admin",
83+
dag_name="MYDAG",
84+
tags={"environment": "DEV", "team": "MY_TEAM", "version": "X"},
85+
)
86+
# Instantiate a common spine for your pipeline
87+
index = pd.date_range("2022-01-01", periods=6, freq="w")
88+
initial_columns = { # load from actuals or wherever -- this is our initial data we use as input.
89+
# Note: these do not have to be all series, they could be scalar inputs.
90+
"signups": pd.Series([1, 10, 50, 100, 200, 400], index=index),
91+
"spend": pd.Series([10, 10, 20, 40, 40, 50], index=index),
92+
}
93+
dr = (
94+
driver.Builder()
95+
.with_config({}) # we don't have any configuration or invariant data for this example.
96+
.with_modules(
97+
my_functions
98+
) # we need to tell hamilton where to load function definitions from
99+
.with_adapters(tracker) # we want a pandas dataframe as output
100+
.build()
101+
)
102+
# we need to specify what we want in the final dataframe (these could be function pointers).
103+
output_columns = ["spend", "signups", "spend_std_dev", "spend_zero_mean_unit_variance"]
104+
# let's create the dataframe!
105+
df = dr.execute(output_columns, inputs=initial_columns)
106+
# `pip install sf-hamilton[visualization]` earlier you can also do
107+
# dr.visualize_execution(output_columns,'./my_dag.png', {})
108+
print(df)
109+
110+
serializable_df = {}
111+
112+
for key, value in df.items():
113+
if isinstance(value, pd.Series):
114+
# Convert Series to dict (or .tolist() for just values)
115+
serializable_df[key] = {str(k): v for k, v in value.to_dict().items()}
116+
else:
117+
# Pass other values as is
118+
serializable_df[key] = value
119+
120+
# Serialize to JSON
121+
print(serializable_df)
122+
return serializable_df
123+
return json.dumps(serializable_df, indent=4)
124+
# return json.dumps(df, indent=4)
125+
126+
127+
if __name__ == "__main__":
128+
app.run(host=SERVICE_HOST, port=SERVICE_PORT)
Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
https://medium.com/@pkantyka/observability-of-python-code-and-application-logic-with-hamilton-ui-on-snowflake-container-services-a26693b46635
2+
3+
CREATE SERVICE public.hamilton_ui
4+
IN COMPUTE POOL TEST_POOL
5+
FROM SPECIFICATION $$
6+
spec:
7+
containers:
8+
- name: hamiltonui
9+
image: <account-url-registry-host>/<db-name>/<schema-name>/<repo-name>/snowflake-hamilton-ui
10+
volumeMounts:
11+
- name: hamilton-basedir
12+
mountPath: /hamilton-basedir
13+
endpoints:
14+
- name: entrypoint
15+
port: 8001
16+
- name: hamilton
17+
port: 8241
18+
public: true
19+
volumes:
20+
- name: hamilton-basedir
21+
source: "@<db-name>.<schema-name>.hamilton_base"
22+
$$
23+
QUERY_WAREHOUSE = <warehause-name>
24+
;
25+
26+
CALL SYSTEM$GET_SERVICE_STATUS('<db-name>.<schema>.hamilton_ui');
27+
28+
CALL SYSTEM$GET_SERVICE_LOGS('<db-name>.<schema>.hamilton_ui', '0', 'hammiltonui', 1000);
29+
30+
SHOW ENDPOINTS IN SERVICE public.hamilton_ui;
31+
32+
CREATE OR REPLACE FUNCTION public.hamilton_pipeline (prj_id number, signups variant, spend variant, output_columns variant)
33+
RETURNS VARIANT
34+
SERVICE=public.hamilton_ui
35+
ENDPOINT=entrypoint
36+
AS '/echo';
37+
38+
39+
SELECT
40+
public.hamilton_pipeline (
41+
1,
42+
[1, 10, 50, 100, 200, 400],
43+
[10, 10, 20, 40, 40, 50],
44+
[ 'spend', 'signups', 'spend_std_dev', 'spend_zero_mean_unit_variance' ]
45+
) as data;
46+
47+
WITH input_data AS (
48+
SELECT
49+
public.hamilton_pipeline (
50+
1,
51+
[1, 10, 50, 100, 200, 400],
52+
[10, 10, 20, 40, 40, 50],
53+
[ 'spend', 'signups', 'spend_std_dev', 'spend_zero_mean_unit_variance' ]
54+
) as data
55+
),
56+
flattened AS (
57+
SELECT
58+
key AS metric_key,
59+
value AS metric_value
60+
FROM
61+
input_data
62+
left join LATERAL FLATTEN(input_data.data)
63+
)
64+
SELECT
65+
*
66+
FROM
67+
flattened f;
68+
69+
WITH input_data AS (
70+
SELECT
71+
public.hamilton_pipeline (
72+
1,
73+
[1, 10, 50, 100, 200, 400],
74+
[10, 10, 20, 40, 40, 50],
75+
[ 'spend', 'signups', 'spend_std_dev', 'spend_zero_mean_unit_variance' ]
76+
) as data
77+
),
78+
flattened AS (
79+
SELECT
80+
key AS metric_key,
81+
value AS metric_value
82+
FROM
83+
input_data
84+
left join LATERAL FLATTEN(input_data.data)
85+
)
86+
SELECT
87+
f2.key,
88+
f2.value
89+
FROM
90+
flattened f
91+
left join lateral flatten(metric_value) f2
92+
where
93+
metric_key = 'spend_zero_mean_unit_variance';

0 commit comments

Comments
 (0)