Skip to content

Commit ea35bab

Browse files
authored
Merge branch 'main' into nikhil/orchestrator_stubs
2 parents 6d44563 + fec5359 commit ea35bab

File tree

82 files changed

+1659
-633
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

82 files changed

+1659
-633
lines changed

WORKSPACE

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,21 @@ scalafmt_default_config()
102102

103103
scalafmt_repositories()
104104

105+
# For jar jar to help with shading
106+
load("@bazel_tools//tools/build_defs/repo:git.bzl", "git_repository")
107+
108+
git_repository(
109+
name = "com_github_johnynek_bazel_jar_jar",
110+
commit = "352e66efa42434154ff2c0406ffd395efcbec92c", # Latest commit SHA as of 2024/11/05
111+
remote = "https://github.com/johnynek/bazel_jar_jar.git",
112+
)
113+
114+
load(
115+
"@com_github_johnynek_bazel_jar_jar//:jar_jar.bzl",
116+
"jar_jar_repositories",
117+
)
118+
jar_jar_repositories()
119+
105120
# For Protobuf support
106121
http_archive(
107122
name = "rules_proto",

aggregator/BUILD.bazel

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@ scala_library(
1616
maven_artifact("org.apache.datasketches:datasketches-java"),
1717
maven_artifact("org.apache.commons:commons-lang3"),
1818
maven_artifact("org.slf4j:slf4j-api"),
19-
maven_artifact("org.apache.logging.log4j:log4j-slf4j-impl"),
2019
maven_artifact_with_suffix("org.scala-lang.modules:scala-collection-compat"),
2120
],
2221
)
@@ -27,7 +26,6 @@ test_deps = [
2726
"//api:thrift_java",
2827
# Libraries
2928
maven_artifact("org.slf4j:slf4j-api"),
30-
maven_artifact("org.apache.logging.log4j:log4j-slf4j-impl"),
3129
maven_artifact("com.google.code.gson:gson"),
3230
maven_artifact("org.apache.datasketches:datasketches-memory"),
3331
maven_artifact("org.apache.datasketches:datasketches-java"),

api/BUILD.bazel

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@ java_library(
1212
deps = [
1313
maven_artifact("javax.annotation:javax.annotation.api"),
1414
maven_artifact("org.slf4j:slf4j-api"),
15-
maven_artifact("org.apache.logging.log4j:log4j-slf4j-impl"),
1615
maven_artifact("org.apache.commons:commons-lang3"),
1716
maven_artifact("com.google.code.gson:gson"),
1817
],
@@ -35,7 +34,6 @@ scala_library(
3534
maven_artifact("com.fasterxml.jackson.core:jackson-core"),
3635
maven_artifact("com.fasterxml.jackson.core:jackson-databind"),
3736
maven_artifact("org.slf4j:slf4j-api"),
38-
maven_artifact("org.apache.logging.log4j:log4j-slf4j-impl"),
3937
maven_artifact("org.apache.commons:commons-lang3"),
4038
maven_artifact("com.google.code.gson:gson"),
4139
maven_artifact_with_suffix("org.scala-lang.modules:scala-collection-compat"),
@@ -51,7 +49,6 @@ test_deps = [
5149
maven_artifact("com.fasterxml.jackson.core:jackson-core"),
5250
maven_artifact("com.fasterxml.jackson.core:jackson-databind"),
5351
maven_artifact("org.slf4j:slf4j-api"),
54-
maven_artifact("org.apache.logging.log4j:log4j-slf4j-impl"),
5552
# Testing
5653
maven_artifact_with_suffix("org.scala-lang.modules:scala-parser-combinators"),
5754
maven_artifact_with_suffix("org.scala-lang.modules:scala-collection-compat"),

api/py/ai/chronon/cli/compile/parse_teams.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,7 @@ def _merge_mode_maps(
123123
result.backfill = _merge_maps(result.common, result.backfill)
124124
result.upload = _merge_maps(result.common, result.upload)
125125
result.streaming = _merge_maps(result.common, result.streaming)
126+
result.serving = _merge_maps(result.common, result.serving)
126127
result.common = None
127128
continue
128129

@@ -135,5 +136,6 @@ def _merge_mode_maps(
135136
result.streaming = _merge_maps(
136137
result.streaming, mode_map.common, mode_map.streaming
137138
)
139+
result.serving = _merge_maps(result.serving, mode_map.common, mode_map.serving)
138140

139141
return result

api/py/ai/chronon/repo/aws.py

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -31,14 +31,15 @@
3131
class AwsRunner(Runner):
3232
def __init__(self, args):
3333
aws_jar_path = AwsRunner.download_zipline_aws_jar(
34-
ZIPLINE_DIRECTORY, get_customer_id(), ZIPLINE_AWS_JAR_DEFAULT
34+
ZIPLINE_DIRECTORY, get_customer_id(), args["version"], ZIPLINE_AWS_JAR_DEFAULT
3535
)
3636
service_jar_path = AwsRunner.download_zipline_aws_jar(
37-
ZIPLINE_DIRECTORY, get_customer_id(), ZIPLINE_AWS_SERVICE_JAR
37+
ZIPLINE_DIRECTORY, get_customer_id(), args["version"], ZIPLINE_AWS_SERVICE_JAR
3838
)
3939
jar_path = (
4040
f"{service_jar_path}:{aws_jar_path}" if args['mode'] == "fetch" else aws_jar_path
4141
)
42+
self.version = args.get("version", "latest")
4243

4344
super().__init__(args, os.path.expanduser(jar_path))
4445

@@ -58,10 +59,10 @@ def upload_s3_file(
5859
raise RuntimeError(f"Failed to upload {source_file_name}: {str(e)}")
5960

6061
@staticmethod
61-
def download_zipline_aws_jar(destination_dir: str, customer_id: str, jar_name: str):
62+
def download_zipline_aws_jar(destination_dir: str, customer_id: str, version: str, jar_name: str):
6263
s3_client = boto3.client("s3")
6364
destination_path = f"{destination_dir}/{jar_name}"
64-
source_key_name = f"jars/{jar_name}"
65+
source_key_name = f"release/{version}/jars/{jar_name}"
6566
bucket_name = f"zipline-artifacts-{customer_id}"
6667

6768
are_identical = (
@@ -163,7 +164,7 @@ def generate_emr_submitter_args(
163164
# include jar uri. should also already be in the bucket
164165
jar_uri = (
165166
f"{zipline_artifacts_bucket_prefix}-{get_customer_id()}"
166-
+ f"/jars/{ZIPLINE_AWS_JAR_DEFAULT}"
167+
+ f"/release/{self.version}/jars/{ZIPLINE_AWS_JAR_DEFAULT}"
167168
)
168169

169170
final_args = "{user_args} --jar-uri={jar_uri} --job-type={job_type} --main-class={main_class}"
@@ -193,7 +194,7 @@ def generate_emr_submitter_args(
193194
job_type=job_type.value,
194195
main_class=main_class,
195196
)
196-
+ f" --additional-conf-path=additional-confs.yaml --files={s3_file_args}"
197+
+ f" --additional-conf-path={EMR_MOUNT_FILE_PREFIX}additional-confs.yaml --files={s3_file_args}"
197198
)
198199
else:
199200
raise ValueError(f"Invalid job type: {job_type}")

api/py/ai/chronon/repo/default_runner.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@ def __init__(self, args, jar_path):
2424
self.kafka_bootstrap = args.get("kafka_bootstrap")
2525
self.mock_source = args.get("mock_source")
2626
self.savepoint_uri = args.get("savepoint_uri")
27+
self.validate = args.get("validate")
28+
self.validate_rows = args.get("validate_rows")
2729

2830
valid_jar = args["online_jar"] and os.path.exists(args["online_jar"])
2931

api/py/ai/chronon/repo/gcp.py

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -31,10 +31,10 @@
3131
class GcpRunner(Runner):
3232
def __init__(self, args):
3333
gcp_jar_path = GcpRunner.download_zipline_dataproc_jar(
34-
ZIPLINE_DIRECTORY, get_customer_id(), ZIPLINE_GCP_JAR_DEFAULT
34+
ZIPLINE_DIRECTORY, get_customer_id(), args["version"], ZIPLINE_GCP_JAR_DEFAULT
3535
)
3636
service_jar_path = GcpRunner.download_zipline_dataproc_jar(
37-
ZIPLINE_DIRECTORY, get_customer_id(), ZIPLINE_GCP_SERVICE_JAR
37+
ZIPLINE_DIRECTORY, get_customer_id(), args["version"], ZIPLINE_GCP_SERVICE_JAR
3838
)
3939
jar_path = (
4040
f"{service_jar_path}:{gcp_jar_path}" if args["mode"] == "fetch" else gcp_jar_path
@@ -156,11 +156,11 @@ def compare_gcs_and_local_file_hashes(
156156

157157
@staticmethod
158158
def download_zipline_dataproc_jar(
159-
destination_dir: str, customer_id: str, jar_name: str
159+
destination_dir: str, customer_id: str, version: str, jar_name: str
160160
):
161161
bucket_name = f"zipline-artifacts-{customer_id}"
162162

163-
source_blob_name = f"jars/{jar_name}"
163+
source_blob_name = f"release/{version}/jars/{jar_name}"
164164
destination_path = f"{destination_dir}/{jar_name}"
165165

166166
are_identical = (
@@ -186,6 +186,7 @@ def generate_dataproc_submitter_args(
186186
self,
187187
user_args: str,
188188
job_type: JobType = JobType.SPARK,
189+
version: str = "latest",
189190
local_files_to_upload: List[str] = [],
190191
):
191192
customer_warehouse_bucket_name = f"zipline-warehouse-{get_customer_id()}"
@@ -215,7 +216,7 @@ def generate_dataproc_submitter_args(
215216
# include jar uri. should also already be in the bucket
216217
jar_uri = (
217218
f"{zipline_artifacts_bucket_prefix}-{get_customer_id()}"
218-
+ f"/jars/{ZIPLINE_GCP_JAR_DEFAULT}"
219+
+ f"/release/{version}/jars/{ZIPLINE_GCP_JAR_DEFAULT}"
219220
)
220221

221222
final_args = "{user_args} --jar-uri={jar_uri} --job-type={job_type} --main-class={main_class}"
@@ -224,7 +225,7 @@ def generate_dataproc_submitter_args(
224225
main_class = "ai.chronon.flink.FlinkJob"
225226
flink_jar_uri = (
226227
f"{zipline_artifacts_bucket_prefix}-{get_customer_id()}"
227-
+ f"/jars/{ZIPLINE_GCP_FLINK_JAR_DEFAULT}"
228+
+ f"/release/{version}/jars/{ZIPLINE_GCP_FLINK_JAR_DEFAULT}"
228229
)
229230
return (
230231
final_args.format(
@@ -258,9 +259,13 @@ def run_dataproc_flink_streaming(self):
258259
"-ZGCP_PROJECT_ID": GcpRunner.get_gcp_project_id(),
259260
"-ZGCP_BIGTABLE_INSTANCE_ID": GcpRunner.get_gcp_bigtable_instance_id(),
260261
"--savepoint-uri": self.savepoint_uri,
262+
"--validate-rows": self.validate_rows,
261263
}
262264

263-
flag_args = {"--mock-source": self.mock_source}
265+
flag_args = {
266+
"--mock-source": self.mock_source,
267+
"--validate": self.validate
268+
}
264269
flag_args_str = " ".join(key for key, value in flag_args.items() if value)
265270

266271
user_args_str = " ".join(

api/py/ai/chronon/repo/init.py

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
#!/usr/bin/env python
2+
3+
import shutil
4+
import os
5+
from importlib_resources import files
6+
7+
import click
8+
9+
10+
@click.command(name="init")
11+
@click.option(
12+
"--chronon_root",
13+
envvar="CHRONON_ROOT",
14+
help="Path to the root chronon folder",
15+
default=os.path.join(os.getcwd(), "zipline"),
16+
type=click.Path(file_okay=False, writable=True),
17+
)
18+
@click.pass_context
19+
def main(ctx, chronon_root):
20+
try:
21+
template_path = files("ai.chronon").joinpath("../../test/sample")
22+
print(template_path)
23+
except AttributeError:
24+
click.echo("Error: Template directory not found in package.", err=True)
25+
return
26+
27+
if not template_path.exists():
28+
click.echo("Error: Template directory not found in package.", err=True)
29+
return
30+
31+
target_path = os.path.abspath(chronon_root)
32+
33+
# Prevent accidental overwrites
34+
if os.path.exists(target_path) and os.listdir(target_path):
35+
click.confirm(f"Warning: {target_path} is not empty. Proceed?", abort=True)
36+
37+
click.echo(f"Generating scaffolding at {target_path}...")
38+
39+
try:
40+
shutil.copytree(template_path, target_path, dirs_exist_ok=True)
41+
click.echo("Project scaffolding created successfully! 🎉")
42+
click.echo(
43+
f""""Please copy the following command to your shell config:
44+
`export PYTHONPATH={target_path}:$PYTHONPATH`"""
45+
)
46+
except Exception as e:
47+
click.echo(f"Error: {e}", err=True)
48+
49+
50+
if __name__ == "__main__":
51+
main()

api/py/ai/chronon/repo/run.py

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ def set_defaults(ctx):
5353
defaults = {
5454
"mode": "backfill",
5555
"dataproc": False,
56-
"ds": today,
56+
"ds": today, # TODO: this breaks if the partition column is not the same as yyyy-MM-dd.
5757
"app_name": os.environ.get("APP_NAME"),
5858
"online_jar": os.environ.get("CHRONON_ONLINE_JAR"),
5959
"repo": chronon_repo_path,
@@ -118,7 +118,7 @@ def set_defaults(ctx):
118118
"--online-class",
119119
help="Class name of Online Impl. Used for streaming and metadata-upload mode.",
120120
)
121-
@click.option("--version", help="Chronon version to use.")
121+
@click.option("--version", default="latest", help="Chronon version to use.")
122122
@click.option(
123123
"--spark-version", default="2.4.0", help="Spark version to use for downloading jar."
124124
)
@@ -159,6 +159,10 @@ def set_defaults(ctx):
159159
help="Use a mocked data source instead of a real source for groupby-streaming Flink.",
160160
)
161161
@click.option("--savepoint-uri", help="Savepoint URI for Flink streaming job")
162+
@click.option("--validate", is_flag=True,
163+
help="Validate the catalyst util Spark expression evaluation logic")
164+
@click.option("--validate-rows", default="10000",
165+
help="Number of rows to run the validation on")
162166
@click.pass_context
163167
def main(
164168
ctx,
@@ -190,6 +194,8 @@ def main(
190194
kafka_bootstrap,
191195
mock_source,
192196
savepoint_uri,
197+
validate,
198+
validate_rows
193199
):
194200
unknown_args = ctx.args
195201
click.echo("Running with args: {}".format(ctx.params))

api/py/ai/chronon/repo/zipline.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
from ai.chronon.repo.compile import extract_and_convert
44
from ai.chronon.repo.run import main as run_main
5+
from ai.chronon.repo.init import main as init_main
56

67

78
@click.group()
@@ -11,3 +12,4 @@ def zipline():
1112

1213
zipline.add_command(extract_and_convert)
1314
zipline.add_command(run_main)
15+
zipline.add_command(init_main)

api/py/requirements/base.in

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,4 +6,5 @@ pyspark==3.5.4
66
sqlglot
77
crcmod==1.7
88
glom
9-
boto3
9+
boto3
10+
importlib-resources==6.5.2

api/py/setup.py

Lines changed: 14 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
21
# Copyright (C) 2023 The Chronon Authors.
32
#
43
# Licensed under the Apache License, Version 2.0 (the "License");
@@ -16,6 +15,7 @@
1615
import os
1716
import re
1817
from setuptools import find_packages, setup
18+
import glob
1919

2020
current_dir = os.path.abspath(os.path.dirname(__file__))
2121
with open(os.path.join(current_dir, "README.md"), "r") as fh:
@@ -26,8 +26,10 @@
2626
basic_requirements = [line for line in infile]
2727

2828

29-
__version__ = "local"
29+
__version__ = "0.0.1"
3030
__branch__ = "main"
31+
32+
3133
def get_version():
3234
version_str = os.environ.get("VERSION", __version__)
3335
branch_str = os.environ.get("BRANCH", __branch__)
@@ -36,19 +38,20 @@ def get_version():
3638
# If the prefix is the branch name, then convert it as suffix after '+' to make it Python PEP440 complaint
3739
if version_str.startswith(branch_str + "-"):
3840
version_str = "{}+{}".format(
39-
version_str.replace(branch_str + "-", ""),
40-
branch_str
41+
version_str.replace(branch_str + "-", ""), branch_str
4142
)
4243

4344
# Replace multiple continuous '-' or '_' with a single period '.'.
4445
# In python version string, the label identifier that comes after '+', is all separated by periods '.'
45-
version_str = re.sub(r'[-_]+', '.', version_str)
46+
version_str = re.sub(r"[-_]+", ".", version_str)
4647

4748
return version_str
4849

50+
51+
resources = [f for f in glob.glob('test/sample/**/*', recursive=True) if os.path.isfile(f)]
4952
setup(
5053
classifiers=[
51-
"Programming Language :: Python :: 3.7"
54+
"Programming Language :: Python :: 3.11"
5255
],
5356
long_description=long_description,
5457
long_description_content_type="text/markdown",
@@ -58,16 +61,17 @@ def get_version():
5861
]
5962
},
6063
description="Zipline python API library",
61-
include_package_data=True,
6264
install_requires=basic_requirements,
6365
name="zipline-ai",
64-
packages=find_packages(),
66+
packages=find_packages(include=["ai*"]),
67+
include_package_data=True,
68+
package_data={"": resources},
6569
extras_require={
6670
# Extra requirement to have access to cli commands in python2 environments.
6771
"pip2compat": ["click<8"]
6872
},
69-
python_requires=">=3.7",
73+
python_requires=">=3.11",
7074
url=None,
7175
version=get_version(),
72-
zip_safe=False
76+
zip_safe=False,
7377
)

0 commit comments

Comments
 (0)