Skip to content

Commit 8f1145d

Browse files
committed
untracked files
1 parent 866e5c6 commit 8f1145d

File tree

2 files changed

+75
-0
lines changed

2 files changed

+75
-0
lines changed

api/py/ai/chronon/staging_query.py

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
from ai.chronon.api.ttypes import MetaData, StagingQuery
2+
import inspect
3+
import json
4+
5+
6+
# Takes in an conf object class like GroupBy, Join and StagingQuery
7+
# And returns a function that dispatches the arguments correctly to the object class and inner metadata
8+
# Remaining args will end up in object.metaData.customJson
9+
def _metadata_shim(conf_class):
10+
constructor_params = list(inspect.signature(conf_class.__init__).parameters.keys())
11+
assert (
12+
constructor_params[0] == "self"
13+
), "First param should be 'self', found {}".format(constructor_params[0])
14+
assert (
15+
constructor_params[1] == "metaData"
16+
), "Second param should be 'metaData', found {}".format(constructor_params[1])
17+
outer_params = constructor_params[2:]
18+
metadata_params = list(inspect.signature(MetaData.__init__).parameters.keys())[1:]
19+
intersected_params = set(outer_params) & set(metadata_params)
20+
unioned_params = set(outer_params) | set(metadata_params)
21+
err_msg = "Cannot shim {}, because params: {} are intersecting with MetaData's params".format(
22+
conf_class, intersected_params
23+
)
24+
assert len(intersected_params) == 0, err_msg
25+
26+
def shimmed_func(**kwargs):
27+
meta_kwargs = {
28+
key: value for key, value in kwargs.items() if key in metadata_params
29+
}
30+
outer_kwargs = {
31+
key: value for key, value in kwargs.items() if key in outer_params
32+
}
33+
custom_json_args = {
34+
key: value for key, value in kwargs.items() if key not in unioned_params
35+
}
36+
meta = MetaData(customJson=json.dumps(custom_json_args), **meta_kwargs)
37+
return conf_class(metaData=meta, **outer_kwargs)
38+
39+
return shimmed_func
40+
41+
42+
StagingQuery = _metadata_shim(StagingQuery)
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
package ai.chronon.orchestration
2+
3+
object RepoTypes {
4+
5+
case class Name(name: String)
6+
7+
case class Branch(name: String)
8+
9+
case class FileHash(hash: String)
10+
11+
case class LocalHash(hash: String)
12+
13+
case class GlobalHash(hash: String)
14+
15+
case class LocalData(name: Name, fileHash: FileHash, localHash: LocalHash, inputs: Seq[Name], outputs: Seq[Name])
16+
17+
object Branch {
18+
val main: Branch = Branch("main")
19+
}
20+
21+
case class Version(name: String)
22+
23+
case class FileContent(content: String) {
24+
def hash: FileHash = FileHash(content.hashCode().toHexString)
25+
}
26+
27+
case class Table(name: String)
28+
29+
trait ConfProcessor[T] {
30+
def toLocalData(t: T): LocalData
31+
def parse(name: String, fileContent: FileContent): Seq[T]
32+
}
33+
}

0 commit comments

Comments
 (0)