Skip to content

Commit dba3332

Browse files
authored
Merge pull request #20 from zipline-ai/docker-init
Docker init for POC
2 parents 65a5e10 + 8550c70 commit dba3332

File tree

7 files changed

+355
-0
lines changed

7 files changed

+355
-0
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
*.ear
1010
*.logs
1111
*.iml
12+
*.db
1213
.idea/
1314
.eclipse
1415
**/.vscode/

docker-init/Dockerfile

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
FROM amazoncorretto:17
2+
3+
# Install python deps
4+
COPY requirements.txt .
5+
RUN yum install -y python3
6+
RUN pip3 install --upgrade pip; pip3 install -r requirements.txt
7+
8+
RUN mkdir -p /app
9+
COPY generate_anomalous_data.py /app/
10+
COPY start.sh /start.sh
11+
RUN chmod +x /start.sh
12+
WORKDIR /app
13+
14+
ENV DYNAMO_ENDPOINT="http://localhost:8000"
15+
ENV AWS_DEFAULT_REGION="fakeregion"
16+
ENV AWS_ACCESS_KEY_ID="fakeaccesskey"
17+
ENV AWS_SECRET_ACCESS_KEY="fakesecretkey"
18+
19+
ENTRYPOINT ["/start.sh"]

docker-init/README.md

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
# Initialize Demo Data
2+
3+
This directory holds code to setup docker containers for dynamoDB, a spark master, and a spark worker. It also creates a container which contains a parquet table with example data containing anomolies. To start, run:
4+
5+
```docker-compose up```
6+
7+
To access the parquet table, from another terminal run:
8+
9+
```docker-compose exec app bash```
10+
11+
The parquet is available as data.parquet

docker-init/compose.yaml

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
version: '3.8'
2+
services:
3+
dynamo:
4+
command: "-jar DynamoDBLocal.jar -sharedDb -dbPath ./data"
5+
image: "amazon/dynamodb-local:2.5.2"
6+
container_name: dynamodb-local
7+
ports:
8+
- "8000:8000"
9+
volumes:
10+
- "./docker/dynamodb:/home/dynamodblocal/data"
11+
working_dir: /home/dynamodblocal
12+
user: dynamodblocal
13+
14+
spark:
15+
image: bitnami/spark:3.5.2
16+
environment:
17+
- SPARK_MODE=master
18+
- SPARK_RPC_AUTHENTICATION_ENABLED=no
19+
- SPARK_RPC_ENCRYPTION_ENABLED=no
20+
- SPARK_LOCAL_STORAGE_ENCRYPTION_ENABLED=yes
21+
- SPARK_SSL_ENABLED=no
22+
- SPARK_USER=spark
23+
ports:
24+
- '8080:8080'
25+
- '7077:7077'
26+
27+
spark-worker:
28+
image: bitnami/spark:3.5.2
29+
environment:
30+
- SPARK_MODE=worker
31+
- SPARK_MASTER_URL=spark://spark:7077
32+
- SPARK_WORKER_MEMORY=${SPARK_WORKER_MEMORY:-1G}
33+
- SPARK_WORKER_CORES=${SPARK_WORKER_CORES:-1}
34+
- SPARK_RPC_AUTHENTICATION_ENABLED=no
35+
- SPARK_RPC_ENCRYPTION_ENABLED=no
36+
- SPARK_LOCAL_STORAGE_ENCRYPTION_ENABLED=yes
37+
- SPARK_SSL_ENABLED=no
38+
- SPARK_USER=spark
39+
40+
app:
41+
build: .
42+
command: tail -F nothing
43+
environment:
44+
- DYNAMO_ENDPOINT=http://dynamo:8000
45+
- AWS_DEFAULT_REGION=fakeregion
46+
- AWS_ACCESS_KEY_ID=fakeaccesskey
47+
- AWS_SECRET_ACCESS_KEY=fakesecretkey
Lines changed: 264 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,264 @@
1+
import random
2+
from datetime import datetime, timedelta
3+
import numpy as np
4+
from pyspark.sql import SparkSession
5+
from pyspark.sql.types import StructType, StructField, DoubleType, IntegerType, StringType, TimestampType, BooleanType
6+
import boto3
7+
import awswrangler as wr
8+
import os
9+
10+
# Initialize Spark session
11+
spark = SparkSession.builder.appName("FraudClassificationSchema").getOrCreate()
12+
13+
ENDPOINT_URL = os.environ.get("DYNAMO_ENDPOINT") if os.environ.get("DYNAMO_ENDPOINT") is not None else 'http://localhost:8000'
14+
15+
wr.config.dynamodb_endpoint_url = ENDPOINT_URL
16+
dynamodb = boto3.client('dynamodb', endpoint_url=ENDPOINT_URL)
17+
18+
19+
def time_to_value(t, base_value, amplitude, noise_level, scale=1):
20+
if scale is None:
21+
return None
22+
hours = t.hour + t.minute / 60 + t.second / 3600
23+
x = hours / 24 * 2 * np.pi
24+
y = (np.sin(x) + np.sin(2*x)) / 2
25+
value = base_value + amplitude * y + np.random.normal(0, noise_level)
26+
return float(max(0, value * scale))
27+
28+
def generate_non_overlapping_windows(start_date, end_date, num_windows):
29+
total_days = (end_date - start_date).days
30+
window_lengths = [random.randint(3, 7) for _ in range(num_windows)]
31+
gap_days = random.randint(7, 30)
32+
gap = timedelta(days=gap_days)
33+
windows = []
34+
current_start = start_date + timedelta(days=random.randint(0, total_days - sum(window_lengths) - gap_days))
35+
for length in window_lengths:
36+
window_end = current_start + timedelta(days=length)
37+
if window_end > end_date:
38+
break
39+
windows.append((current_start, window_end))
40+
current_start = window_end + gap
41+
if current_start >= end_date:
42+
break
43+
44+
return windows
45+
46+
def generate_timeseries_with_anomalies(num_samples=1000, base_value=100, amplitude=50, noise_level=10):
47+
start_date = datetime(2023, 1, 1)
48+
end_date = datetime(2023, 12, 31)
49+
50+
anomaly_windows = generate_non_overlapping_windows(start_date, end_date, 2)
51+
null_window, spike_window = anomaly_windows
52+
53+
data = []
54+
time_delta = (end_date - start_date) / num_samples
55+
56+
for i in range(num_samples):
57+
transaction_time = start_date + i * time_delta
58+
59+
60+
# Determine if we're in an anomaly window
61+
if null_window[0] <= transaction_time <= null_window[1]:
62+
scale = None
63+
elif spike_window[0] <= transaction_time <= spike_window[1]:
64+
scale = 5 # Spike multiplier
65+
else:
66+
scale = 1
67+
68+
69+
value = time_to_value(transaction_time, base_value=base_value, amplitude=amplitude, noise_level=noise_level, scale=scale)
70+
71+
data.append((transaction_time, value))
72+
73+
return data, {'null': null_window, 'spike': spike_window}
74+
75+
76+
fraud_fields = [
77+
# join.source - txn_events
78+
StructField("user_id", IntegerType(), True),
79+
StructField("merchant_id", IntegerType(), True),
80+
81+
# Contextual - 3
82+
StructField("transaction_amount", DoubleType(), True),
83+
StructField("transaction_time", TimestampType(), True),
84+
StructField("transaction_type", StringType(), True),
85+
86+
# Transactions agg’d by user - 7 (txn_events)
87+
StructField("user_average_transaction_amount", DoubleType(), True),
88+
StructField("user_transactions_last_hour", IntegerType(), True),
89+
StructField("user_transactions_last_day", IntegerType(), True),
90+
StructField("user_transactions_last_week", IntegerType(), True),
91+
StructField("user_transactions_last_month", IntegerType(), True),
92+
StructField("user_transactions_last_year", IntegerType(), True),
93+
StructField("user_amount_last_hour", DoubleType(), True),
94+
95+
# Transactions agg’d by merchant - 7 (txn_events)
96+
StructField("merchant_average_transaction_amount", DoubleType(), True),
97+
StructField("merchant_transactions_last_hour", IntegerType(), True),
98+
StructField("merchant_transactions_last_day", IntegerType(), True),
99+
StructField("merchant_transactions_last_week", IntegerType(), True),
100+
StructField("merchant_transactions_last_month", IntegerType(), True),
101+
StructField("merchant_transactions_last_year", IntegerType(), True),
102+
StructField("merchant_amount_last_hour", DoubleType(), True),
103+
104+
# User features (dim_user) – 7
105+
StructField("user_account_age", IntegerType(), True),
106+
StructField("account_balance", DoubleType(), True),
107+
StructField("credit_score", IntegerType(), True),
108+
StructField("number_of_devices", IntegerType(), True),
109+
StructField("user_country", StringType(), True),
110+
StructField("user_account_type", IntegerType(), True),
111+
StructField("user_preferred_language", StringType(), True),
112+
113+
# merchant features (dim_merchant) – 4
114+
StructField("merchant_account_age", IntegerType(), True),
115+
StructField("zipcode", IntegerType(), True),
116+
# set to true for 100 merchant_ids
117+
StructField("is_big_merchant", BooleanType(), True),
118+
StructField("merchant_country", StringType(), True),
119+
StructField("merchant_account_type", IntegerType(), True),
120+
StructField("merchant_preferred_language", StringType(), True),
121+
122+
123+
# derived features - transactions_last_year / account_age - 1
124+
StructField("transaction_frequency_last_year", DoubleType(), True),
125+
]
126+
127+
fraud_schema = StructType(fraud_fields)
128+
def generate_fraud_sample_data(num_samples=10000):
129+
start_date = datetime(2023, 1, 1)
130+
end_date = datetime(2023, 12, 31)
131+
132+
data = []
133+
time_delta = (end_date - start_date) / num_samples
134+
135+
anomaly_windows = generate_non_overlapping_windows(start_date, end_date, 2)
136+
137+
# Generate base values
138+
transaction_amount, _ = generate_timeseries_with_anomalies(num_samples=num_samples, base_value=100, amplitude=50, noise_level=10)
139+
account_balance, _ = generate_timeseries_with_anomalies(num_samples=num_samples, base_value=5000, amplitude=2000, noise_level=500)
140+
user_average_transaction_amount, _ = generate_timeseries_with_anomalies(num_samples=num_samples, base_value=80, amplitude=30, noise_level=5)
141+
merchant_average_transaction_amount, _ = generate_timeseries_with_anomalies(num_samples=num_samples, base_value=80, amplitude=30, noise_level=5)
142+
user_last_hour_list, _ = generate_timeseries_with_anomalies(num_samples=num_samples, base_value=5, amplitude=3, noise_level=1)
143+
merchant_last_hour_list, _ = generate_timeseries_with_anomalies(num_samples=num_samples, base_value=5, amplitude=3, noise_level=1)
144+
145+
# print(len(transaction_amount), len(transaction_frequency), len(average_transaction_amount), len(account_balance))
146+
for i in range(num_samples):
147+
transaction_time = start_date + i * time_delta
148+
merchant_id = random.randint(1,250)
149+
if user_last_hour_list[i][1] is None:
150+
user_last_hour = user_last_hour_list[i][1]
151+
user_last_day = None
152+
user_last_week = None
153+
user_last_month = None
154+
user_last_year = None
155+
else:
156+
user_last_hour = int(user_last_hour_list[i][1])
157+
user_last_day = random.randint(user_last_hour, 100)
158+
user_last_week = random.randint(user_last_day, 500)
159+
user_last_month = random.randint(user_last_week, 1000)
160+
user_last_year = random.randint(user_last_month, 10000)
161+
user_account_age = random.randint(1, 3650)
162+
163+
if merchant_last_hour_list[i][1] is None:
164+
merchant_last_hour = merchant_last_hour_list[i][1]
165+
merchant_last_day = None
166+
merchant_last_week = None
167+
merchant_last_month = None
168+
merchant_last_year = None
169+
else:
170+
merchant_last_hour = int(merchant_last_hour_list[i][1])
171+
merchant_last_day = random.randint(merchant_last_hour, 100)
172+
merchant_last_week = random.randint(merchant_last_day, 500)
173+
merchant_last_month = random.randint(merchant_last_week, 1000)
174+
merchant_last_year = random.randint(merchant_last_month, 10000)
175+
# Generate other features
176+
177+
is_fast_drift = transaction_time > anomaly_windows[0][0] and transaction_time < anomaly_windows[0][1]
178+
is_slow_drift = transaction_time > anomaly_windows[1][0] and transaction_time < anomaly_windows[1][1]
179+
180+
if is_fast_drift and user_last_hour is not None:
181+
user_last_hour *= 10
182+
user_last_day *= 10
183+
user_last_week *= 10
184+
user_last_month *= 10
185+
user_last_year *= 10
186+
187+
if is_fast_drift and merchant_last_hour is not None:
188+
merchant_last_hour *= 10
189+
merchant_last_day *= 10
190+
merchant_last_week *= 10
191+
merchant_last_month *= 10
192+
merchant_last_year *= 10
193+
194+
if is_slow_drift and user_last_hour is not None:
195+
user_last_hour = int(user_last_hour * (1.05)**((transaction_time-anomaly_windows[1][0])).days)
196+
user_last_day = int(user_last_day * (1.05)**((transaction_time-anomaly_windows[1][0])).days)
197+
user_last_week = int(user_last_week * (1.05)**((transaction_time-anomaly_windows[1][0])).days)
198+
user_last_month = int(user_last_month * (1.05)**((transaction_time-anomaly_windows[1][0])).days)
199+
user_last_year = int(user_last_year * (1.05)**((transaction_time-anomaly_windows[1][0])).days)
200+
201+
if is_slow_drift and merchant_last_hour is not None:
202+
merchant_last_hour = int(merchant_last_hour * (1.05)**((transaction_time-anomaly_windows[1][0])).days)
203+
merchant_last_day = int(merchant_last_day * (1.05)**((transaction_time-anomaly_windows[1][0])).days)
204+
merchant_last_week = int(merchant_last_week * (1.05)**((transaction_time-anomaly_windows[1][0])).days)
205+
merchant_last_month = int(merchant_last_month * (1.05)**((transaction_time-anomaly_windows[1][0])).days)
206+
merchant_last_year = int(merchant_last_year * (1.05)**((transaction_time-anomaly_windows[1][0])).days)
207+
208+
row = [
209+
# join.source - txn_events
210+
random.randint(1,100),
211+
merchant_id,
212+
213+
# Contextual - 3
214+
transaction_amount[i][1],
215+
transaction_time,
216+
random.choice(['purchase', 'withdrawal', 'transfer']),
217+
218+
# Transactions agg’d by user - 7 (txn_events)
219+
user_average_transaction_amount[i][1],
220+
user_last_hour,
221+
user_last_day,
222+
user_last_week,
223+
user_last_month,
224+
user_last_year,
225+
random.uniform(0,100.0),
226+
227+
# Transactions agg’d by merchant - 7 (txn_events)
228+
merchant_average_transaction_amount[i][1],
229+
merchant_last_hour,
230+
merchant_last_day,
231+
merchant_last_week,
232+
merchant_last_month,
233+
merchant_last_year,
234+
random.uniform(0,1000.0),
235+
236+
# User features (dim_user) – 7
237+
user_account_age,
238+
account_balance[i][1],
239+
random.randint(300, 850),
240+
random.randint(1, 5),
241+
random.choice(['US', 'UK', 'CA', 'AU', 'DE', 'FR']) if not is_fast_drift else random.choice(['US', 'UK', 'CA', 'BR', 'ET', 'GE']),
242+
random.randint(0, 100),
243+
random.choice(['en-US', 'es-ES', 'fr-FR', 'de-DE', 'zh-CN']),
244+
245+
# merchant features (dim_merchant) – 4
246+
random.randint(1, 3650),
247+
random.randint(10000, 99999),
248+
merchant_id < 100,
249+
random.choice(['US', 'UK', 'CA', 'AU', 'DE', 'FR']) if not is_fast_drift else random.choice(['US', 'UK', 'CA', 'BR', 'ET', 'GE']),
250+
random.randint(0, 100),
251+
random.choice(['en-US', 'es-ES', 'fr-FR', 'de-DE', 'zh-CN']),
252+
253+
# derived features - transactions_last_year / account_age - 1
254+
user_last_year/user_account_age if user_last_year is not None else None,
255+
]
256+
257+
data.append(tuple(row))
258+
return data
259+
260+
fraud_data = generate_fraud_sample_data(20000)
261+
fraud_df = spark.createDataFrame(fraud_data, schema=fraud_schema)
262+
263+
fraud_df.write.mode("overwrite").parquet("data")
264+
print("Successfully wrote user data to parquet")

docker-init/requirements.txt

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
awswrangler==2.20.1
2+
boto3==1.28.62
3+
boto3-stubs[dynamodb]==1.28.62
4+
chronon-ai==0.0.82
5+
numpy==1.21.6
6+
parquet-tools==0.2.0
7+
pyspark==3.4.3

docker-init/start.sh

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
#!/bin/bash
2+
if ! python3 generate_anomalous_data.py; then
3+
echo "Error: Failed to generate anomalous data" >&2
4+
exit 1
5+
fi
6+
exec "$@"

0 commit comments

Comments
 (0)