-
Notifications
You must be signed in to change notification settings - Fork 0
chore: Pull schema stuff into its own online lib #593
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
WalkthroughThis PR reorganizes the code by updating and consolidating import paths across multiple modules (cloud, flink, online, spark) to use the new Changes
Sequence Diagram(s)sequenceDiagram
participant C as Client
participant A as AvroSerde
C->>A: toBytes(Mutation)
A->>A: Serialize using AvroCodec
A-->>C: Return byte[]
C->>A: fromBytes(byte[])
A->>A: Deserialize into Mutation
A-->>C: Return Mutation
Possibly related PRs
Poem
Warning Review ran into problems🔥 ProblemsGitHub Actions and Pipeline Checks: Resource not accessible by integration - https://docs.github.com/rest/actions/workflow-runs#list-workflow-runs-for-a-repository. Please grant the required permissions to the CodeRabbit GitHub App under the organization or repository settings. 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
9aeb4d2
to
2215dc9
Compare
4aabafd
to
ffe03a3
Compare
0495e10
to
a500c32
Compare
ffe03a3
to
cb6cce0
Compare
cb6cce0
to
7fa5468
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (6)
online/src/main/scala/ai/chronon/online/JoinCodec.scala (1)
30-30
: Review: Wildcard Import ChangeThe change to use a wildcard import (
import ai.chronon.online.serde._
) fits the restructuring. Verify that it brings in only the required classes to avoid namespace clutter.spark/src/main/scala/ai/chronon/spark/stats/CompareMetrics.scala (1)
25-25
: Import Path Updated.
The SparkConversions import now correctly referencesai.chronon.online.serde.SparkConversions
.flink/src/main/scala/org/apache/spark/sql/avro/AvroDeserializationSupport.scala (1)
5-5
: Import Refactored.
Usingai.chronon.online.serde.SparkConversions
is correct per the refactor.online/src/main/scala/ai/chronon/online/fetcher/MetadataStore.scala (1)
28-28
: Wildcard Serde Import.
The wildcard import fromai.chronon.online.serde._
now makes all serde members available.online/src/test/scala/ai/chronon/online/test/DataStreamBuilderTest.scala (1)
27-27
: Correct Import Update.
SparkConversions
is imported fromai.chronon.online.serde
, matching the new structure.flink/src/main/scala/ai/chronon/flink/SparkExpressionEval.scala (1)
11-11
: Import Updated.
The update toai.chronon.online.serde.SparkConversions
aligns with the schema refactoring.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro (Legacy)
📒 Files selected for processing (49)
cloud_aws/src/main/scala/ai/chronon/integrations/aws/AwsApiImpl.scala
(1 hunks)cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/GcpApiImpl.scala
(1 hunks)flink/src/main/scala/ai/chronon/flink/AvroCodecFn.scala
(1 hunks)flink/src/main/scala/ai/chronon/flink/SparkExpressionEval.scala
(1 hunks)flink/src/main/scala/org/apache/spark/sql/avro/AvroDeserializationSupport.scala
(1 hunks)flink/src/test/scala/ai/chronon/flink/test/FlinkJobIntegrationTest.scala
(1 hunks)flink/src/test/scala/org/apache/spark/sql/avro/AvroSourceProjectionDeSerializationSupportSpec.scala
(1 hunks)online/BUILD.bazel
(1 hunks)online/src/main/scala/ai/chronon/online/Api.scala
(1 hunks)online/src/main/scala/ai/chronon/online/CatalystUtil.scala
(1 hunks)online/src/main/scala/ai/chronon/online/Extensions.scala
(1 hunks)online/src/main/scala/ai/chronon/online/GroupByServingInfoParsed.scala
(1 hunks)online/src/main/scala/ai/chronon/online/JoinCodec.scala
(1 hunks)online/src/main/scala/ai/chronon/online/TileCodec.scala
(1 hunks)online/src/main/scala/ai/chronon/online/fetcher/Fetcher.scala
(2 hunks)online/src/main/scala/ai/chronon/online/fetcher/GroupByResponseHandler.scala
(1 hunks)online/src/main/scala/ai/chronon/online/fetcher/MetadataStore.scala
(1 hunks)online/src/main/scala/ai/chronon/online/serde/AvroCodec.scala
(0 hunks)online/src/main/scala/ai/chronon/online/serde/AvroConversions.scala
(3 hunks)online/src/main/scala/ai/chronon/online/serde/AvroSerde.scala
(1 hunks)online/src/main/scala/ai/chronon/online/serde/SparkConversions.scala
(1 hunks)online/src/test/scala/ai/chronon/online/test/CatalystUtilComplexAvroTest.scala
(1 hunks)online/src/test/scala/ai/chronon/online/test/DataStreamBuilderTest.scala
(1 hunks)spark/BUILD.bazel
(1 hunks)spark/src/main/scala/ai/chronon/spark/Analyzer.scala
(1 hunks)spark/src/main/scala/ai/chronon/spark/BootstrapInfo.scala
(1 hunks)spark/src/main/scala/ai/chronon/spark/Extensions.scala
(1 hunks)spark/src/main/scala/ai/chronon/spark/GroupBy.scala
(1 hunks)spark/src/main/scala/ai/chronon/spark/GroupByUpload.scala
(1 hunks)spark/src/main/scala/ai/chronon/spark/Join.scala
(1 hunks)spark/src/main/scala/ai/chronon/spark/KvRdd.scala
(1 hunks)spark/src/main/scala/ai/chronon/spark/LogFlattenerJob.scala
(1 hunks)spark/src/main/scala/ai/chronon/spark/batch/BootstrapJob.scala
(1 hunks)spark/src/main/scala/ai/chronon/spark/stats/CompareBaseJob.scala
(1 hunks)spark/src/main/scala/ai/chronon/spark/stats/CompareJob.scala
(1 hunks)spark/src/main/scala/ai/chronon/spark/stats/CompareMetrics.scala
(1 hunks)spark/src/main/scala/ai/chronon/spark/stats/StatsCompute.scala
(1 hunks)spark/src/main/scala/ai/chronon/spark/streaming/GroupBy.scala
(1 hunks)spark/src/main/scala/ai/chronon/spark/streaming/JoinSourceRunner.scala
(1 hunks)spark/src/main/scala/ai/chronon/spark/utils/InMemoryStream.scala
(1 hunks)spark/src/main/scala/ai/chronon/spark/utils/MockApi.scala
(1 hunks)spark/src/test/scala/ai/chronon/spark/test/DataFrameGen.scala
(1 hunks)spark/src/test/scala/ai/chronon/spark/test/OnlineUtils.scala
(1 hunks)spark/src/test/scala/ai/chronon/spark/test/SchemaEvolutionTest.scala
(1 hunks)spark/src/test/scala/ai/chronon/spark/test/StatsComputeTest.scala
(1 hunks)spark/src/test/scala/ai/chronon/spark/test/TestUtils.scala
(1 hunks)spark/src/test/scala/ai/chronon/spark/test/fetcher/ChainingFetcherTest.scala
(1 hunks)spark/src/test/scala/ai/chronon/spark/test/fetcher/FetcherTest.scala
(1 hunks)spark/src/test/scala/ai/chronon/spark/test/groupby/GroupByTest.scala
(1 hunks)
💤 Files with no reviewable changes (1)
- online/src/main/scala/ai/chronon/online/serde/AvroCodec.scala
🧰 Additional context used
🧬 Code Definitions (31)
spark/src/test/scala/ai/chronon/spark/test/SchemaEvolutionTest.scala (2)
online/src/main/scala/ai/chronon/online/fetcher/Fetcher.scala (2)
online
(89-89)online
(132-136)online/src/main/scala/ai/chronon/online/fetcher/FetcherCache.scala (4)
online
(61-82)online
(95-117)online
(160-165)online
(167-177)
spark/src/test/scala/ai/chronon/spark/test/OnlineUtils.scala (4)
online/src/main/scala/ai/chronon/online/fetcher/GroupByResponseHandler.scala (2)
online
(269-298)online
(324-337)online/src/main/scala/ai/chronon/online/fetcher/Fetcher.scala (2)
online
(89-89)online
(132-136)online/src/main/scala/ai/chronon/online/fetcher/FetcherCache.scala (4)
online
(61-82)online
(95-117)online
(160-165)online
(167-177)online/src/main/scala/ai/chronon/online/serde/AvroConversions.scala (1)
AvroConversions
(31-201)
spark/src/test/scala/ai/chronon/spark/test/TestUtils.scala (1)
online/src/main/scala/ai/chronon/online/serde/SparkConversions.scala (1)
SparkConversions
(56-159)
spark/src/main/scala/ai/chronon/spark/streaming/JoinSourceRunner.scala (4)
online/src/main/scala/ai/chronon/online/fetcher/GroupByResponseHandler.scala (2)
online
(269-298)online
(324-337)online/src/main/scala/ai/chronon/online/fetcher/Fetcher.scala (2)
online
(89-89)online
(132-136)online/src/main/scala/ai/chronon/online/fetcher/FetcherCache.scala (4)
online
(61-82)online
(95-117)online
(160-165)online
(167-177)online/src/main/scala/ai/chronon/online/OnlineDerivationUtil.scala (1)
online
(23-31)
spark/src/main/scala/ai/chronon/spark/stats/CompareJob.scala (1)
online/src/main/scala/ai/chronon/online/serde/SparkConversions.scala (1)
SparkConversions
(56-159)
spark/src/main/scala/ai/chronon/spark/utils/MockApi.scala (2)
online/src/main/scala/ai/chronon/online/fetcher/Fetcher.scala (2)
online
(89-89)online
(132-136)online/src/main/scala/ai/chronon/online/fetcher/FetcherCache.scala (4)
online
(61-82)online
(95-117)online
(160-165)online
(167-177)
flink/src/main/scala/org/apache/spark/sql/avro/AvroDeserializationSupport.scala (1)
online/src/main/scala/ai/chronon/online/serde/SparkConversions.scala (1)
SparkConversions
(56-159)
spark/src/test/scala/ai/chronon/spark/test/DataFrameGen.scala (1)
online/src/main/scala/ai/chronon/online/serde/SparkConversions.scala (1)
SparkConversions
(56-159)
spark/src/main/scala/ai/chronon/spark/stats/CompareBaseJob.scala (2)
online/src/main/scala/ai/chronon/online/serde/SparkConversions.scala (1)
SparkConversions
(56-159)spark/src/main/scala/ai/chronon/spark/TableUtils.scala (1)
sql
(298-326)
spark/src/test/scala/ai/chronon/spark/test/StatsComputeTest.scala (1)
online/src/main/scala/ai/chronon/online/serde/SparkConversions.scala (2)
SparkConversions
(56-159)toChrononSchema
(112-117)
online/src/main/scala/ai/chronon/online/JoinCodec.scala (3)
online/src/main/scala/ai/chronon/online/fetcher/GroupByResponseHandler.scala (2)
online
(269-298)online
(324-337)online/src/main/scala/ai/chronon/online/fetcher/Fetcher.scala (2)
online
(89-89)online
(132-136)online/src/main/scala/ai/chronon/online/fetcher/FetcherCache.scala (4)
online
(61-82)online
(95-117)online
(160-165)online
(167-177)
spark/src/main/scala/ai/chronon/spark/stats/CompareMetrics.scala (1)
online/src/main/scala/ai/chronon/online/serde/SparkConversions.scala (1)
SparkConversions
(56-159)
online/src/main/scala/ai/chronon/online/CatalystUtil.scala (2)
api/src/main/scala/ai/chronon/api/DataType.scala (1)
DataType
(25-136)online/src/main/scala/ai/chronon/online/Extensions.scala (2)
Extensions
(24-54)StructTypeOps
(32-53)
spark/src/main/scala/ai/chronon/spark/batch/BootstrapJob.scala (1)
online/src/main/scala/ai/chronon/online/serde/SparkConversions.scala (1)
SparkConversions
(56-159)
spark/src/main/scala/ai/chronon/spark/stats/StatsCompute.scala (1)
online/src/main/scala/ai/chronon/online/serde/SparkConversions.scala (1)
SparkConversions
(56-159)
spark/src/main/scala/ai/chronon/spark/GroupByUpload.scala (1)
online/src/main/scala/ai/chronon/online/serde/SparkConversions.scala (1)
SparkConversions
(56-159)
online/src/main/scala/ai/chronon/online/GroupByServingInfoParsed.scala (3)
api/src/main/scala/ai/chronon/api/Constants.scala (1)
Constants
(23-100)online/src/main/scala/ai/chronon/online/Extensions.scala (1)
Extensions
(24-54)online/src/main/scala/ai/chronon/online/OnlineDerivationUtil.scala (2)
OnlineDerivationUtil
(13-124)buildDerivationFunction
(49-62)
spark/src/test/scala/ai/chronon/spark/test/fetcher/ChainingFetcherTest.scala (1)
online/src/main/scala/ai/chronon/online/serde/SparkConversions.scala (1)
SparkConversions
(56-159)
spark/src/main/scala/ai/chronon/spark/Join.scala (1)
online/src/main/scala/ai/chronon/online/serde/SparkConversions.scala (1)
SparkConversions
(56-159)
cloud_aws/src/main/scala/ai/chronon/integrations/aws/AwsApiImpl.scala (2)
online/src/main/scala/ai/chronon/online/fetcher/GroupByResponseHandler.scala (2)
online
(269-298)online
(324-337)online/src/main/scala/ai/chronon/online/fetcher/Fetcher.scala (2)
online
(89-89)online
(132-136)
spark/src/main/scala/ai/chronon/spark/LogFlattenerJob.scala (2)
spark/src/main/scala/ai/chronon/spark/Extensions.scala (2)
Extensions
(36-298)StructTypeOps
(38-56)spark/src/main/scala/ai/chronon/spark/TableUtils.scala (1)
sql
(298-326)
online/src/main/scala/ai/chronon/online/fetcher/GroupByResponseHandler.scala (3)
online/src/main/scala/ai/chronon/online/fetcher/FetcherCache.scala (4)
online
(61-82)online
(95-117)online
(160-165)online
(167-177)online/src/main/scala/ai/chronon/online/serde/AvroConversions.scala (1)
AvroConversions
(31-201)online/src/main/scala/ai/chronon/online/GroupByServingInfoParsed.scala (1)
GroupByServingInfoParsed
(31-137)
online/src/test/scala/ai/chronon/online/test/DataStreamBuilderTest.scala (1)
online/src/main/scala/ai/chronon/online/serde/SparkConversions.scala (1)
SparkConversions
(56-159)
spark/src/test/scala/ai/chronon/spark/test/fetcher/FetcherTest.scala (3)
spark/src/main/scala/ai/chronon/spark/Extensions.scala (1)
Extensions
(36-298)online/src/main/scala/ai/chronon/online/fetcher/Fetcher.scala (5)
online
(89-89)online
(132-136)Fetcher
(44-87)Fetcher
(92-484)Request
(48-51)online/src/main/scala/ai/chronon/online/Api.scala (2)
KVStore
(34-51)GetRequest
(38-41)
spark/src/main/scala/ai/chronon/spark/KvRdd.scala (2)
online/src/main/scala/ai/chronon/online/serde/AvroConversions.scala (1)
AvroConversions
(31-201)online/src/main/scala/ai/chronon/online/serde/SparkConversions.scala (1)
SparkConversions
(56-159)
flink/src/test/scala/ai/chronon/flink/test/FlinkJobIntegrationTest.scala (1)
online/src/main/scala/ai/chronon/online/serde/SparkConversions.scala (1)
SparkConversions
(56-159)
flink/src/main/scala/ai/chronon/flink/SparkExpressionEval.scala (1)
online/src/main/scala/ai/chronon/online/serde/SparkConversions.scala (1)
SparkConversions
(56-159)
flink/src/test/scala/org/apache/spark/sql/avro/AvroSourceProjectionDeSerializationSupportSpec.scala (1)
online/src/main/scala/ai/chronon/online/serde/SparkConversions.scala (1)
SparkConversions
(56-159)
spark/src/main/scala/ai/chronon/spark/GroupBy.scala (1)
online/src/main/scala/ai/chronon/online/serde/SparkConversions.scala (2)
RowWrapper
(29-54)SparkConversions
(56-159)
online/src/main/scala/ai/chronon/online/serde/AvroConversions.scala (1)
online/src/main/scala/ai/chronon/online/serde/SparkConversions.scala (2)
fromChrononSchema
(126-131)fromChrononSchema
(131-136)
online/src/main/scala/ai/chronon/online/fetcher/Fetcher.scala (2)
online/src/main/scala/ai/chronon/online/fetcher/FetcherCache.scala (4)
online
(61-82)online
(95-117)online
(160-165)online
(167-177)online/src/main/scala/ai/chronon/online/serde/AvroSerde.scala (1)
schema
(80-80)
⏰ Context from checks skipped due to timeout of 90000ms (17)
- GitHub Check: spark_tests
- GitHub Check: streaming_tests
- GitHub Check: streaming_tests
- GitHub Check: groupby_tests
- GitHub Check: spark_tests
- GitHub Check: join_tests
- GitHub Check: groupby_tests
- GitHub Check: fetcher_tests
- GitHub Check: analyzer_tests
- GitHub Check: batch_tests
- GitHub Check: join_tests
- GitHub Check: non_spark_tests
- GitHub Check: analyzer_tests
- GitHub Check: scala_compile_fmt_fix
- GitHub Check: fetcher_tests
- GitHub Check: non_spark_tests
- GitHub Check: enforce_triggered_workflows
🔇 Additional comments (68)
online/src/main/scala/ai/chronon/online/GroupByServingInfoParsed.scala (4)
20-20
: Consolidated Constants import.
21-21
: Combined Extensions imports for clarity.
24-24
: Grouped OnlineDerivationUtil imports.
25-25
: Wildcard serde import reduces redundancy.spark/src/test/scala/ai/chronon/spark/test/SchemaEvolutionTest.scala (1)
27-27
: New serde import added.
This aligns with moving schema-related components to the online lib.online/src/main/scala/ai/chronon/online/CatalystUtil.scala (5)
19-19
: Clean API import consolidation.
Streamlined import ofDataType
andStructType
fromai.chronon.api
.
20-20
: Explicit self-import.
BringsPoolKey
andpoolMap
into scope for later usage.
22-22
: New Serde import.
Imports all members fromai.chronon.online.serde
per the new package structure.
25-25
: Spark import consolidation.
CombinedSparkSession
andtypes
enhances readability.
28-28
: Unified Java concurrency imports.
GroupedArrayBlockingQueue
andConcurrentHashMap
for cleaner code.online/src/main/scala/ai/chronon/online/TileCodec.scala (1)
28-28
: Consolidated import. Wildcard import now covers all serde members; aligns with refactor.online/src/main/scala/ai/chronon/online/Extensions.scala (1)
20-20
: LGTM! Importing serialization functionality.The import gives access to serde utilities, needed for serialization/deserialization.
spark/src/test/scala/ai/chronon/spark/test/OnlineUtils.scala (1)
28-28
: LGTM! Updated import path.Import path correctly reflects
AvroConversions
relocation to the serde package.online/src/test/scala/ai/chronon/online/test/CatalystUtilComplexAvroTest.scala (1)
4-5
: LGTM! Properly separated imports.Imports now correctly reflect the package reorganization with each class in its proper location.
online/src/main/scala/ai/chronon/online/Api.scala (2)
24-25
: LGTM! Added serde package import.Imports the relocated serialization functionality from the new serde package.
30-32
: LGTM! Consolidated scala imports.More organized import structure improves readability.
online/src/main/scala/ai/chronon/online/serde/AvroConversions.scala (5)
17-17
: Package structure updated correctly.Package declaration changed to align with new structure.
20-23
: Import statements organized.Consolidated Avro imports for better readability.
29-29
: Scala collection imports consolidated.Combined AbstractIterator and mutable imports.
183-183
: Removed redundant prefix.Removed
serde.
prefix since class is now in the same package.
193-193
: Removed redundant prefix.Removed
serde.
prefix since class is now in the same package.cloud_aws/src/main/scala/ai/chronon/integrations/aws/AwsApiImpl.scala (1)
6-6
: Added serde import.Import added to access schema functionality moved to new package.
spark/src/main/scala/ai/chronon/spark/streaming/GroupBy.scala (1)
25-25
: Added serde import.Import added to access serialization utilities moved to the dedicated package. Needed for SparkConversions and AvroConversions usage.
cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/GcpApiImpl.scala (1)
10-10
: Updated import path.Updated Serde import to reflect new package location.
spark/src/test/scala/ai/chronon/spark/test/TestUtils.scala (1)
23-23
: Import Updated.
New SparkConversions import reflects the new structure.spark/src/main/scala/ai/chronon/spark/streaming/JoinSourceRunner.scala (1)
25-25
: Serde Import Added.
Added wildcard import from ai.chronon.online.serde is correct.spark/src/main/scala/ai/chronon/spark/Join.scala (1)
24-24
: SparkConversions Import Updated.
Import now uses ai.chronon.online.serde.SparkConversions.spark/src/main/scala/ai/chronon/spark/utils/MockApi.scala (1)
25-25
: Wildcard Import for Serde.
Switching to import ai.chronon.online.serde._ is acceptable.spark/src/main/scala/ai/chronon/spark/stats/CompareJob.scala (1)
26-26
: SparkConversions Import Revised.
Updated to use the new serde package path.flink/src/main/scala/ai/chronon/flink/AvroCodecFn.scala (1)
13-13
: Updated import pathImport path updated to use the new serde package.
spark/src/main/scala/ai/chronon/spark/Analyzer.scala (1)
25-25
: Updated import pathImport path now uses the new serde package.
spark/src/main/scala/ai/chronon/spark/GroupByUpload.scala (1)
38-38
: Updated import pathImport path now references the new serde package.
online/BUILD.bazel (1)
19-33
: Added new serde_libNew library created for schema serialization/deserialization functionality.
spark/src/main/scala/ai/chronon/spark/batch/BootstrapJob.scala (1)
6-6
: Import Updated: Changed to useai.chronon.online.serde.SparkConversions
.spark/src/main/scala/ai/chronon/spark/stats/CompareBaseJob.scala (1)
19-23
: Import Update: Revised SparkConversions import to the new serde package.spark/src/test/scala/ai/chronon/spark/test/DataFrameGen.scala (1)
25-25
: Import Updated: Usingai.chronon.online.serde.SparkConversions
in tests.spark/src/main/scala/ai/chronon/spark/stats/StatsCompute.scala (1)
23-23
: Import Updated: Now importing SparkConversions fromai.chronon.online.serde
.spark/src/test/scala/ai/chronon/spark/test/StatsComputeTest.scala (1)
20-22
: Import Updated: Adjusted to importtoChrononSchema
from the new serde path.spark/BUILD.bazel (1)
65-65
: Updated dependency to use the new serde_lib.Dependency correctly updated to reference the newly created serialization/deserialization library.
spark/src/main/scala/ai/chronon/spark/utils/InMemoryStream.scala (1)
22-23
: Import paths updated for relocated classes.Updated import paths to reflect the moved serialization classes.
spark/src/main/scala/ai/chronon/spark/LogFlattenerJob.scala (4)
27-27
: Added import for serde package.Added wildcard import for the new serialization/deserialization package.
28-28
: Updated Extensions import with explicit StructTypeOps.Modified to explicitly import StructTypeOps alongside wildcard import.
32-33
: Consolidated Spark SQL and slf4j imports.Grouped related imports for better organization.
36-37
: Consolidated Scala collection and util imports.Grouped related imports for improved readability.
spark/src/test/scala/ai/chronon/spark/test/groupby/GroupByTest.scala (1)
35-36
: Updated imports for RowWrapper and SparkConversions.Import paths updated to reflect the moved classes in the new serde package.
spark/src/main/scala/ai/chronon/spark/Extensions.scala (1)
23-23
: Import updated to use the new serde package.The import path has been updated to reference classes from the dedicated serialization/deserialization package, aligning with the PR's goal of reorganizing schema components.
flink/src/test/scala/org/apache/spark/sql/avro/AvroSourceProjectionDeSerializationSupportSpec.scala (1)
5-5
: Updated import path for SparkConversions.Import path changed to use the new serde package location.
spark/src/main/scala/ai/chronon/spark/GroupBy.scala (1)
38-39
: Imports updated to reference classes from the new serde package.Both RowWrapper and SparkConversions imports now point to the dedicated serialization package.
spark/src/main/scala/ai/chronon/spark/BootstrapInfo.scala (2)
22-23
: Updated and consolidated imports for better organization.Imports have been reorganized to group related classes and update the serde package reference.
27-31
: Grouped related imports for cleaner code organization.Import statements have been consolidated to reduce verbosity.
spark/src/test/scala/ai/chronon/spark/test/fetcher/FetcherTest.scala (2)
22-23
: Removed unused GroupByOps import.The GroupByOps import has been removed as it's no longer needed in this file.
29-29
: Added wildcard import for the serde package.New import provides access to all serialization utilities from the dedicated package.
spark/src/test/scala/ai/chronon/spark/test/fetcher/ChainingFetcherTest.scala (1)
26-26
: Import Update: Changed toai.chronon.online.serde.SparkConversions
.spark/src/main/scala/ai/chronon/spark/KvRdd.scala (1)
20-21
: Import Update: Now usingai.chronon.online.serde.AvroConversions
andai.chronon.online.serde.SparkConversions
.online/src/main/scala/ai/chronon/online/fetcher/GroupByResponseHandler.scala (1)
7-8
: Import Update:AvroConversions
is now imported fromai.chronon.online.serde
;GroupByServingInfoParsed
is directly imported.flink/src/test/scala/ai/chronon/flink/test/FlinkJobIntegrationTest.scala (1)
9-10
: Import Update: Updated to importApi
andGroupByServingInfoParsed
fromai.chronon.online
andSparkConversions
fromai.chronon.online.serde
.online/src/main/scala/ai/chronon/online/serde/SparkConversions.scala (2)
17-17
: Package Update: Package changed toai.chronon.online.serde
.
25-26
: Import Consolidation: Merged Scala collection imports into one line.online/src/main/scala/ai/chronon/online/fetcher/Fetcher.scala (3)
28-28
: Adjusted import to use new serde package.Direct import from new serde package aligns with PR objective to separate schema components.
224-226
: Updated codec parameter type to AvroCodec.Simplified type reference by using direct import from serde package.
229-231
: Updated codec parameter type in encodeOnce method.Consistent type change to align with encode method signature.
online/src/main/scala/ai/chronon/online/serde/AvroSerde.scala (7)
3-3
: Updated import statement for Constants and StructType.Required for Serde abstract class implementation.
6-10
: Reorganized imports for Avro and Java I/O classes.Better organization of dependencies.
11-18
: Added Serde abstract class.New serialization interface with default toBytes implementation that throws exception.
20-51
: Added comprehensive Mutation documentation and case class.Well-documented explanation of mutations vs events and type conversions.
53-53
: Updated AvroSerde to extend Serde.Part of refactoring to standardize serialization interfaces.
64-77
: Implemented fromBytes method.Converts byte array to Mutation object with proper handling of reversals.
80-80
: Implemented schema method.Simple implementation returning inputSchema as required by Serde interface.
Co-authored-by: Thomas Chow <[email protected]>
Co-authored-by: Thomas Chow <[email protected]>
7e51c35
to
15a8efc
Compare
Co-authored-by: Thomas Chow <[email protected]>
011f699
to
3505ae4
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (2)
online/src/main/scala/ai/chronon/online/serde/AvroSerde.scala (2)
11-18
: New Serde abstract class accurately defines the interface.Consider making
toBytes
abstract rather than throwing an exception if implementations are expected to provide it.abstract class Serde extends Serializable { def fromBytes(bytes: Array[Byte]): Mutation def schema: StructType - def toBytes(mutation: Mutation): Array[Byte] = { - // not implemented - throw new UnsupportedOperationException("toBytes not implemented") - } + def toBytes(mutation: Mutation): Array[Byte] }
51-51
: Mutation case class definition is clean.Consider using Option types instead of nulls for better type safety.
-case class Mutation(schema: StructType = null, before: Array[Any] = null, after: Array[Any] = null) +case class Mutation(schema: Option[StructType] = None, before: Option[Array[Any]] = None, after: Option[Array[Any]] = None)
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro (Legacy)
📒 Files selected for processing (51)
cloud_aws/src/main/scala/ai/chronon/integrations/aws/AwsApiImpl.scala
(1 hunks)cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/GcpApiImpl.scala
(1 hunks)flink/src/main/scala/ai/chronon/flink/AvroCodecFn.scala
(1 hunks)flink/src/main/scala/ai/chronon/flink/SparkExpressionEval.scala
(1 hunks)flink/src/main/scala/org/apache/spark/sql/avro/AvroDeserializationSupport.scala
(1 hunks)flink/src/test/scala/ai/chronon/flink/test/FlinkJobIntegrationTest.scala
(1 hunks)flink/src/test/scala/org/apache/spark/sql/avro/AvroSourceProjectionDeSerializationSupportSpec.scala
(1 hunks)online/BUILD.bazel
(1 hunks)online/src/main/scala/ai/chronon/online/Api.scala
(1 hunks)online/src/main/scala/ai/chronon/online/CatalystUtil.scala
(1 hunks)online/src/main/scala/ai/chronon/online/Extensions.scala
(1 hunks)online/src/main/scala/ai/chronon/online/GroupByServingInfoParsed.scala
(1 hunks)online/src/main/scala/ai/chronon/online/JoinCodec.scala
(1 hunks)online/src/main/scala/ai/chronon/online/TileCodec.scala
(1 hunks)online/src/main/scala/ai/chronon/online/fetcher/Fetcher.scala
(2 hunks)online/src/main/scala/ai/chronon/online/fetcher/GroupByResponseHandler.scala
(1 hunks)online/src/main/scala/ai/chronon/online/fetcher/MetadataStore.scala
(1 hunks)online/src/main/scala/ai/chronon/online/serde/AvroCodec.scala
(0 hunks)online/src/main/scala/ai/chronon/online/serde/AvroConversions.scala
(3 hunks)online/src/main/scala/ai/chronon/online/serde/AvroSerde.scala
(1 hunks)online/src/main/scala/ai/chronon/online/serde/SparkConversions.scala
(1 hunks)online/src/test/scala/ai/chronon/online/test/CatalystUtilComplexAvroTest.scala
(1 hunks)online/src/test/scala/ai/chronon/online/test/DataStreamBuilderTest.scala
(1 hunks)spark/BUILD.bazel
(1 hunks)spark/src/main/scala/ai/chronon/spark/Analyzer.scala
(1 hunks)spark/src/main/scala/ai/chronon/spark/BootstrapInfo.scala
(1 hunks)spark/src/main/scala/ai/chronon/spark/ChrononKryoRegistrator.scala
(1 hunks)spark/src/main/scala/ai/chronon/spark/Extensions.scala
(1 hunks)spark/src/main/scala/ai/chronon/spark/GroupBy.scala
(1 hunks)spark/src/main/scala/ai/chronon/spark/GroupByUpload.scala
(1 hunks)spark/src/main/scala/ai/chronon/spark/Join.scala
(1 hunks)spark/src/main/scala/ai/chronon/spark/KvRdd.scala
(1 hunks)spark/src/main/scala/ai/chronon/spark/LogFlattenerJob.scala
(1 hunks)spark/src/main/scala/ai/chronon/spark/batch/BootstrapJob.scala
(1 hunks)spark/src/main/scala/ai/chronon/spark/batch/MergeJob.scala
(1 hunks)spark/src/main/scala/ai/chronon/spark/stats/CompareBaseJob.scala
(1 hunks)spark/src/main/scala/ai/chronon/spark/stats/CompareJob.scala
(1 hunks)spark/src/main/scala/ai/chronon/spark/stats/CompareMetrics.scala
(1 hunks)spark/src/main/scala/ai/chronon/spark/stats/StatsCompute.scala
(1 hunks)spark/src/main/scala/ai/chronon/spark/streaming/GroupBy.scala
(1 hunks)spark/src/main/scala/ai/chronon/spark/streaming/JoinSourceRunner.scala
(1 hunks)spark/src/main/scala/ai/chronon/spark/utils/InMemoryStream.scala
(1 hunks)spark/src/main/scala/ai/chronon/spark/utils/MockApi.scala
(1 hunks)spark/src/test/scala/ai/chronon/spark/test/DataFrameGen.scala
(1 hunks)spark/src/test/scala/ai/chronon/spark/test/OnlineUtils.scala
(1 hunks)spark/src/test/scala/ai/chronon/spark/test/SchemaEvolutionTest.scala
(1 hunks)spark/src/test/scala/ai/chronon/spark/test/StatsComputeTest.scala
(1 hunks)spark/src/test/scala/ai/chronon/spark/test/TestUtils.scala
(1 hunks)spark/src/test/scala/ai/chronon/spark/test/fetcher/ChainingFetcherTest.scala
(1 hunks)spark/src/test/scala/ai/chronon/spark/test/fetcher/FetcherTest.scala
(1 hunks)spark/src/test/scala/ai/chronon/spark/test/groupby/GroupByTest.scala
(1 hunks)
💤 Files with no reviewable changes (1)
- online/src/main/scala/ai/chronon/online/serde/AvroCodec.scala
✅ Files skipped from review due to trivial changes (1)
- spark/src/main/scala/ai/chronon/spark/batch/MergeJob.scala
🚧 Files skipped from review as they are similar to previous changes (47)
- cloud_aws/src/main/scala/ai/chronon/integrations/aws/AwsApiImpl.scala
- spark/src/main/scala/ai/chronon/spark/stats/CompareJob.scala
- flink/src/main/scala/ai/chronon/flink/SparkExpressionEval.scala
- spark/src/main/scala/ai/chronon/spark/streaming/GroupBy.scala
- spark/src/main/scala/ai/chronon/spark/Analyzer.scala
- online/src/test/scala/ai/chronon/online/test/CatalystUtilComplexAvroTest.scala
- flink/src/main/scala/org/apache/spark/sql/avro/AvroDeserializationSupport.scala
- spark/src/main/scala/ai/chronon/spark/utils/InMemoryStream.scala
- online/src/main/scala/ai/chronon/online/GroupByServingInfoParsed.scala
- spark/src/test/scala/ai/chronon/spark/test/SchemaEvolutionTest.scala
- spark/src/main/scala/ai/chronon/spark/streaming/JoinSourceRunner.scala
- spark/src/test/scala/ai/chronon/spark/test/OnlineUtils.scala
- spark/src/test/scala/ai/chronon/spark/test/DataFrameGen.scala
- online/src/main/scala/ai/chronon/online/TileCodec.scala
- online/src/main/scala/ai/chronon/online/fetcher/MetadataStore.scala
- spark/src/test/scala/ai/chronon/spark/test/StatsComputeTest.scala
- spark/src/main/scala/ai/chronon/spark/batch/BootstrapJob.scala
- spark/src/main/scala/ai/chronon/spark/KvRdd.scala
- spark/src/main/scala/ai/chronon/spark/stats/CompareBaseJob.scala
- online/src/main/scala/ai/chronon/online/JoinCodec.scala
- flink/src/main/scala/ai/chronon/flink/AvroCodecFn.scala
- cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/GcpApiImpl.scala
- online/src/main/scala/ai/chronon/online/fetcher/GroupByResponseHandler.scala
- online/src/main/scala/ai/chronon/online/CatalystUtil.scala
- spark/src/main/scala/ai/chronon/spark/stats/StatsCompute.scala
- spark/src/main/scala/ai/chronon/spark/Extensions.scala
- spark/src/main/scala/ai/chronon/spark/LogFlattenerJob.scala
- spark/src/main/scala/ai/chronon/spark/utils/MockApi.scala
- spark/src/main/scala/ai/chronon/spark/BootstrapInfo.scala
- spark/BUILD.bazel
- spark/src/main/scala/ai/chronon/spark/ChrononKryoRegistrator.scala
- online/src/main/scala/ai/chronon/online/Extensions.scala
- online/BUILD.bazel
- online/src/test/scala/ai/chronon/online/test/DataStreamBuilderTest.scala
- spark/src/test/scala/ai/chronon/spark/test/TestUtils.scala
- spark/src/main/scala/ai/chronon/spark/stats/CompareMetrics.scala
- flink/src/test/scala/org/apache/spark/sql/avro/AvroSourceProjectionDeSerializationSupportSpec.scala
- spark/src/main/scala/ai/chronon/spark/Join.scala
- spark/src/test/scala/ai/chronon/spark/test/fetcher/FetcherTest.scala
- flink/src/test/scala/ai/chronon/flink/test/FlinkJobIntegrationTest.scala
- spark/src/main/scala/ai/chronon/spark/GroupByUpload.scala
- spark/src/test/scala/ai/chronon/spark/test/groupby/GroupByTest.scala
- spark/src/main/scala/ai/chronon/spark/GroupBy.scala
- spark/src/test/scala/ai/chronon/spark/test/fetcher/ChainingFetcherTest.scala
- online/src/main/scala/ai/chronon/online/serde/SparkConversions.scala
- online/src/main/scala/ai/chronon/online/serde/AvroConversions.scala
- online/src/main/scala/ai/chronon/online/fetcher/Fetcher.scala
🧰 Additional context used
🧬 Code Graph Analysis (2)
online/src/main/scala/ai/chronon/online/Api.scala (4)
online/src/main/scala/ai/chronon/online/fetcher/GroupByResponseHandler.scala (2)
online
(269-298)online
(324-337)online/src/main/scala/ai/chronon/online/fetcher/Fetcher.scala (2)
online
(89-89)online
(132-136)online/src/main/scala/ai/chronon/online/fetcher/FetcherCache.scala (4)
online
(61-82)online
(95-117)online
(160-165)online
(167-177)online/src/main/scala/ai/chronon/online/OnlineDerivationUtil.scala (1)
online
(23-31)
online/src/main/scala/ai/chronon/online/serde/AvroSerde.scala (1)
api/src/main/scala/ai/chronon/api/Constants.scala (1)
Constants
(23-100)
⏰ Context from checks skipped due to timeout of 90000ms (16)
- GitHub Check: spark_tests
- GitHub Check: streaming_tests
- GitHub Check: streaming_tests
- GitHub Check: join_tests
- GitHub Check: groupby_tests
- GitHub Check: groupby_tests
- GitHub Check: spark_tests
- GitHub Check: fetcher_tests
- GitHub Check: batch_tests
- GitHub Check: analyzer_tests
- GitHub Check: fetcher_tests
- GitHub Check: analyzer_tests
- GitHub Check: batch_tests
- GitHub Check: join_tests
- GitHub Check: non_spark_tests
- GitHub Check: non_spark_tests
🔇 Additional comments (6)
online/src/main/scala/ai/chronon/online/Api.scala (3)
23-25
: Import reorganization looks good.Consolidated import statements and added new import for the serde package.
30-32
: Scala imports reorganized appropriately.Imports are now organized more logically.
181-181
:✅ Verification successful
Verify that all Serde implementations are compatible with the new structure.
The
streamDecoder
method still returns aSerde
type, which now refers to the class moved to the serde package.
🏁 Script executed:
#!/bin/bash # Check for any custom implementations of Serde that might need updating rg "extends\s+Serde" --type scalaLength of output: 154
All Serde implementations work with the new structure.
AvroSerde
inonline/src/main/scala/ai/chronon/online/serde/AvroSerde.scala
correctly extends the updatedSerde
.streamDecoder
still returns a validSerde
.online/src/main/scala/ai/chronon/online/serde/AvroSerde.scala (3)
20-50
: Excellent documentation.Clear explanation of mutations vs. events, time assumptions, and type conversions.
64-78
: Implementation of fromBytes is correct.Properly converts from bytes to Mutation objects based on reversal flag.
80-80
: Simple schema implementation.Correctly returns the inputSchema.
## Summary - Slim down the batch module, to not drag all of online lib with it. ## Checklist - [ ] Added Unit Tests - [ ] Covered by existing CI - [ ] Integration tested - [ ] Documentation update <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit - **New Features** - Introduced an extensible serialization interface to enhance data conversion support. - **Refactor** - Reorganized internal dependencies by consolidating import statements into a dedicated serialization package. - Updated import paths for several classes to reflect their new locations within the serialization package. - **Chores** - Updated build configurations to incorporate the new serialization library. - **Tests** - Adjusted test setups to reflect the updated package structure. <!-- end of auto-generated comment: release notes by coderabbit.ai --> <!-- av pr metadata This information is embedded by the av CLI when creating PRs to track the status of stacks when using Aviator. Please do not delete or edit this section of the PR. ``` {"parent":"main","parentHead":"","trunk":"main"} ``` --> --------- Co-authored-by: Thomas Chow <[email protected]>
## Summary - Slim down the batch module, to not drag all of online lib with it. ## Checklist - [ ] Added Unit Tests - [ ] Covered by existing CI - [ ] Integration tested - [ ] Documentation update <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit - **New Features** - Introduced an extensible serialization interface to enhance data conversion support. - **Refactor** - Reorganized internal dependencies by consolidating import statements into a dedicated serialization package. - Updated import paths for several classes to reflect their new locations within the serialization package. - **Chores** - Updated build configurations to incorporate the new serialization library. - **Tests** - Adjusted test setups to reflect the updated package structure. <!-- end of auto-generated comment: release notes by coderabbit.ai --> <!-- av pr metadata This information is embedded by the av CLI when creating PRs to track the status of stacks when using Aviator. Please do not delete or edit this section of the PR. ``` {"parent":"main","parentHead":"","trunk":"main"} ``` --> --------- Co-authored-by: Thomas Chow <[email protected]>
## Summary - Slim down the batch module, to not drag all of online lib with it. ## Checklist - [ ] Added Unit Tests - [ ] Covered by existing CI - [ ] Integration tested - [ ] Documentation update <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit - **New Features** - Introduced an extensible serialization interface to enhance data conversion support. - **Refactor** - Reorganized internal dependencies by consolidating import statements into a dedicated serialization package. - Updated import paths for several classes to reflect their new locations within the serialization package. - **Chores** - Updated build configurations to incorporate the new serialization library. - **Tests** - Adjusted test setups to reflect the updated package structure. <!-- end of auto-generated comment: release notes by coderabbit.ai --> <!-- av pr metadata This information is embedded by the av CLI when creating PRs to track the status of stacks when using Aviator. Please do not delete or edit this section of the PR. ``` {"parent":"main","parentHead":"","trunk":"main"} ``` --> --------- Co-authored-by: Thomas Chow <[email protected]>
## Summary - Slim down the batch module, to not drag all of online lib with it. ## Checklist - [ ] Added Unit Tests - [ ] Covered by existing CI - [ ] Integration tested - [ ] Documentation update <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit - **New Features** - Introduced an extensible serialization interface to enhance data conversion support. - **Refactor** - Reorganized internal dependencies by consolidating import statements into a dedicated serialization package. - Updated import paths for several classes to reflect their new locations within the serialization package. - **Chores** - Updated build configurations to incorporate the new serialization library. - **Tests** - Adjusted test setups to reflect the updated package structure. <!-- end of auto-generated comment: release notes by coderabbit.ai --> <!-- av pr metadata This information is embedded by the av CLI when creating PRs to track the status of stacks when using Aviator. Please do not delete or edit this section of the PR. ``` {"parent":"main","parentHead":"","trunk":"main"} ``` --> --------- Co-authored-by: Thomas Chow <[email protected]>
## Summary - Slim down the batch module, to not drag all of online lib with it. ## Cheour clientslist - [ ] Added Unit Tests - [ ] Covered by existing CI - [ ] Integration tested - [ ] Documentation update <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit - **New Features** - Introduced an extensible serialization interface to enhance data conversion support. - **Refactor** - Reorganized internal dependencies by consolidating import statements into a dedicated serialization paour clientsage. - Updated import paths for several classes to reflect their new locations within the serialization paour clientsage. - **Chores** - Updated build configurations to incorporate the new serialization library. - **Tests** - Adjusted test setups to reflect the updated paour clientsage structure. <!-- end of auto-generated comment: release notes by coderabbit.ai --> <!-- av pr metadata This information is embedded by the av CLI when creating PRs to traour clients the status of staour clientss when using Aviator. Please do not delete or edit this section of the PR. ``` {"parent":"main","parentHead":"","trunk":"main"} ``` --> --------- Co-authored-by: Thomas Chow <[email protected]>
Summary
Checklist
Summary by CodeRabbit