Skip to content

Commit c3183cc

Browse files
authored
Adds code for snowflake container example running the UI (#1257)
* Adds code for snowflake container example running the UI Note: this is a toy example. For real production needs, you'd need to modify a few things: 1. not use SQLLITE, and instead postgresql, or implement django-snowflake connection. 2. likely not use the Hamilton code within the flask container, instead package up properly and define a UDF or UDTF. 3. could use snowpark dataframes or have hamilton code do other things, e.g. LLM calls.. See https://medium.com/@pkantyka/observability-of-python-code-and-application-logic-with-hamilton-ui-on-snowflake-container-services-a26693b46635 for the write up.
1 parent 1e1fb44 commit c3183cc

File tree

7 files changed

+271
-0
lines changed

7 files changed

+271
-0
lines changed

docs/hamilton-ui/ui.rst

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,12 @@ More extensive self-hosting documentation is in the works, e.g. Snowflake, Datab
123123
chart contribution!
124124

125125

126+
Running on Snowflake
127+
---------------------
128+
You can run the Hamilton UI on Snowflake Container Services. For a detailed guide, see the blog post
129+
`Observability of Python code and application logic with Hamilton UI on Snowflake Container Services <https://medium.com/@pkantyka/observability-of-python-code-and-application-logic-with-hamilton-ui-on-snowflake-container-services-a26693b46635>`_ by
130+
`Greg Kantyka <https://medium.com/@pkantyka>`_ and the `Hamilton Snowflake Example <https://github.com/DAGWorks-Inc/hamilton/tree/main/examples/snowflake/hamilton_ui>`_.
131+
126132
-----------
127133
Get started
128134
-----------
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
FROM python:3.13.1-slim
2+
3+
RUN apt update
4+
RUN apt upgrade sqlite3 -y
5+
RUN pip install "sf-hamilton[ui,sdk]"
6+
RUN pip install flask
7+
8+
ENV HAMILTON_ALLOWED_HOSTS=".snowflakecomputing.app"
9+
ENV SERVER_PORT=8001
10+
11+
COPY pipeline_endpoint.py /pipeline_endpoint.py
12+
COPY my_functions.py /my_functions.py
13+
14+
ENTRYPOINT /bin/bash -c "(hamilton ui --base-dir /hamilton-basedir &) && python /pipeline_endpoint.py"
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
# Running the Hamilton & the Hamilton UI in Snowflake
2+
3+
This example is code for the ["Observability of Python code and application logic with Hamilton UI on Snowflake Container Services" post](https://medium.com/@pkantyka/observability-of-python-code-and-application-logic-with-hamilton-ui-on-snowflake-container-services-a26693b46635) by
4+
[Greg Kantyka](https://medium.com/@pkantyka).
5+
6+
Here we show the code required to be packaged up for use on Snowflake:
7+
8+
1. Docker file that runs the Hamilton UI and a flask endpoint to exercise Hamilton code
9+
2. my_functions.py - the Hamilton code that is exercised by the flask endpoint
10+
3. pipeline_endpoint.py - the flask endpoint that exercises the Hamilton code
11+
12+
To run see:
13+
- snowflake.sql that contains all the SQL to create the necessary objects in Snowflake and exercise things.
14+
15+
For more details see ["Observability of Python code and application logic with Hamilton UI on Snowflake Container Services" post](https://medium.com/@pkantyka/observability-of-python-code-and-application-logic-with-hamilton-ui-on-snowflake-container-services-a26693b46635) by
16+
[Greg Kantyka](https://medium.com/@pkantyka).
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
import pandas as pd
2+
3+
4+
def spend_mean(spend: pd.Series) -> float:
5+
"""Shows function creating a scalar. In this case it computes the mean of the entire column."""
6+
return spend.mean()
7+
8+
9+
def spend_zero_mean(spend: pd.Series, spend_mean: float) -> pd.Series:
10+
"""Shows function that takes a scalar. In this case to zero mean spend."""
11+
return spend - spend_mean
12+
13+
14+
def spend_std_dev(spend: pd.Series) -> float:
15+
"""Function that computes the standard deviation of the spend column."""
16+
return spend.std()
17+
18+
19+
def spend_zero_mean_unit_variance(spend_zero_mean: pd.Series, spend_std_dev: float) -> pd.Series:
20+
"""Function showing one way to make spend have zero mean and unit variance."""
21+
return spend_zero_mean / spend_std_dev
Lines changed: 118 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,118 @@
1+
"""
2+
This module:
3+
- Defines a flask app that listens for POST requests on /echo
4+
- the /echo command is invoked from a Snowflake SQL query
5+
- the /echo command calls a function get_response that is defined in this module
6+
- get_response uses Hamilton to execute a pipeline defined in my_functions.py
7+
- my_functions.py contains the functions that are used in the pipeline
8+
- the pipeline is executed with the input data from the Snowflake query
9+
- the output of the pipeline is returned to Snowflake
10+
- the Hamilton UI tracker is used to track the execution of the pipeline
11+
"""
12+
13+
import logging
14+
import os
15+
import sys
16+
17+
import pandas as pd
18+
from flask import Flask, make_response, request
19+
20+
from hamilton import registry
21+
22+
registry.disable_autoload()
23+
import my_functions # we import the module here!
24+
25+
from hamilton import driver
26+
from hamilton_sdk import adapters
27+
28+
# WRAPPER CODE FOR SNOWFLAKE FUNCTION ######
29+
30+
SERVICE_HOST = os.getenv("SERVER_HOST", "0.0.0.0")
31+
SERVICE_PORT = os.getenv("SERVER_PORT", 8080)
32+
CHARACTER_NAME = os.getenv("CHARACTER_NAME", "I")
33+
34+
35+
def get_logger(logger_name):
36+
logger = logging.getLogger(logger_name)
37+
logger.setLevel(logging.DEBUG)
38+
handler = logging.StreamHandler(sys.stdout)
39+
handler.setLevel(logging.DEBUG)
40+
handler.setFormatter(logging.Formatter("%(name)s [%(asctime)s] [%(levelname)s] %(message)s"))
41+
logger.addHandler(handler)
42+
return logger
43+
44+
45+
logger = get_logger("echo-service")
46+
47+
app = Flask(__name__)
48+
49+
50+
@app.get("/healthcheck")
51+
def readiness_probe():
52+
return "OK"
53+
54+
55+
@app.post("/echo")
56+
def echo():
57+
"""This is the endpoint that Snowflake will call to run Hamilton code."""
58+
message = request.json
59+
logger.debug(f"Received request: {message}")
60+
61+
if message is None or not message["data"]:
62+
logger.info("Received empty message")
63+
return {}
64+
65+
input_rows = message["data"]
66+
logger.info(f"Received {len(input_rows)} rows")
67+
68+
output_rows = [[row[0], get_response(row[1], row[2], row[3], row[4])] for row in input_rows]
69+
logger.info(f"Produced {len(output_rows)} rows")
70+
71+
response = make_response({"data": output_rows})
72+
response.headers["Content-type"] = "application/json"
73+
logger.debug(f"Sending response: {response.json}")
74+
return response
75+
76+
77+
# END OF WRAPPER CODE FOR SNOWFLAKE FUNCTION ######
78+
79+
80+
def get_response(prj_id, spend, signups, output_columns):
81+
"""The function that is called from SQL on Snowflake."""
82+
tracker = adapters.HamiltonTracker(
83+
project_id=prj_id,
84+
username="admin",
85+
dag_name="MYDAG",
86+
tags={"environment": "R&D", "team": "MY_TEAM", "version": "Beta"},
87+
)
88+
input_columns = {
89+
"signups": pd.Series(spend),
90+
"spend": pd.Series(signups),
91+
}
92+
dr = (
93+
driver.Builder()
94+
.with_config({}) # we don't have any configuration or invariant data for this example.
95+
.with_modules(
96+
my_functions
97+
) # we need to tell hamilton where to load function definitions from
98+
.with_adapters(tracker) # we add the Hamilton UI tracker
99+
.build()
100+
)
101+
102+
df = dr.execute(output_columns, inputs=input_columns)
103+
104+
serializable_df = {}
105+
106+
for key, value in df.items():
107+
if isinstance(value, pd.Series):
108+
# Convert Series to dict (or .tolist() for just values)
109+
serializable_df[key] = {str(k): v for k, v in value.to_dict().items()}
110+
else:
111+
# Pass other values as is
112+
serializable_df[key] = value
113+
114+
return serializable_df
115+
116+
117+
if __name__ == "__main__":
118+
app.run(host=SERVICE_HOST, port=SERVICE_PORT)
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
flask
2+
sf-hamilton[ui,sdk]
Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
-- For more details visit:
2+
-- https://medium.com/@pkantyka/observability-of-python-code-and-application-logic-with-hamilton-ui-on-snowflake-container-services-a26693b46635
3+
4+
CREATE SERVICE public.hamilton_ui
5+
IN COMPUTE POOL TEST_POOL
6+
FROM SPECIFICATION $$
7+
spec:
8+
containers:
9+
- name: hamiltonui
10+
image: <account-url-registry-host>/<db-name>/<schema-name>/<repo-name>/snowflake-hamilton-ui
11+
volumeMounts:
12+
- name: hamilton-basedir
13+
mountPath: /hamilton-basedir
14+
endpoints:
15+
- name: entrypoint
16+
port: 8001
17+
- name: hamilton
18+
port: 8241
19+
public: true
20+
volumes:
21+
- name: hamilton-basedir
22+
source: "@<db-name>.<schema-name>.hamilton_base"
23+
$$
24+
QUERY_WAREHOUSE = <warehause-name>
25+
;
26+
27+
CALL SYSTEM$GET_SERVICE_STATUS('<db-name>.<schema>.hamilton_ui');
28+
29+
CALL SYSTEM$GET_SERVICE_LOGS('<db-name>.<schema>.hamilton_ui', '0', 'hammiltonui', 1000);
30+
31+
SHOW ENDPOINTS IN SERVICE public.hamilton_ui;
32+
33+
CREATE OR REPLACE FUNCTION public.hamilton_pipeline (prj_id number, signups variant, spend variant, output_columns variant)
34+
RETURNS VARIANT
35+
SERVICE=public.hamilton_ui
36+
ENDPOINT=entrypoint
37+
AS '/echo';
38+
39+
40+
SELECT
41+
public.hamilton_pipeline (
42+
1,
43+
[1, 10, 50, 100, 200, 400],
44+
[10, 10, 20, 40, 40, 50],
45+
[ 'spend', 'signups', 'spend_std_dev', 'spend_zero_mean_unit_variance' ]
46+
) as data;
47+
48+
WITH input_data AS (
49+
SELECT
50+
public.hamilton_pipeline (
51+
1,
52+
[1, 10, 50, 100, 200, 400],
53+
[10, 10, 20, 40, 40, 50],
54+
[ 'spend', 'signups', 'spend_std_dev', 'spend_zero_mean_unit_variance' ]
55+
) as data
56+
),
57+
flattened AS (
58+
SELECT
59+
key AS metric_key,
60+
value AS metric_value
61+
FROM
62+
input_data
63+
left join LATERAL FLATTEN(input_data.data)
64+
)
65+
SELECT
66+
*
67+
FROM
68+
flattened f;
69+
70+
WITH input_data AS (
71+
SELECT
72+
public.hamilton_pipeline (
73+
1,
74+
[1, 10, 50, 100, 200, 400],
75+
[10, 10, 20, 40, 40, 50],
76+
[ 'spend', 'signups', 'spend_std_dev', 'spend_zero_mean_unit_variance' ]
77+
) as data
78+
),
79+
flattened AS (
80+
SELECT
81+
key AS metric_key,
82+
value AS metric_value
83+
FROM
84+
input_data
85+
left join LATERAL FLATTEN(input_data.data)
86+
)
87+
SELECT
88+
f2.key,
89+
f2.value
90+
FROM
91+
flattened f
92+
left join lateral flatten(metric_value) f2
93+
where
94+
metric_key = 'spend_zero_mean_unit_variance';

0 commit comments

Comments
 (0)