-
Notifications
You must be signed in to change notification settings - Fork 0
Add a Flink canary app that can be run on demand #762
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
WalkthroughThis update introduces a new Python canary group-by test for GCP item events, refactors the Flink Kafka event driver and schema to focus on item events, updates related Scala and Python test configurations, and adds a mock custom schema provider for deserialization. Environment variables and job parameters are revised to match the new item event flow. Changes
Sequence Diagram(s)sequenceDiagram
participant Test as Canary Python Test
participant Source as BigQuery Source
participant Kafka as GCP Kafka
participant Flink as FlinkKafkaItemEventDriver
participant SerDe as MockCustomSchemaProvider
Test->>Source: Query item events (BigQuery)
Test->>Kafka: Connect with TLS/OAuth, subscribe to topic
Kafka->>Flink: Stream item event messages
Flink->>SerDe: Deserialize Avro bytes to Mutation
Flink->>Test: Aggregate and output group-by results
Possibly related PRs
Suggested reviewers
Poem
Tip ⚡️ Faster reviews with caching
Enjoy the performance boost—your workflow just got faster. 📜 Recent review detailsConfiguration used: CodeRabbit UI 📒 Files selected for processing (1)
🚧 Files skipped from review as they are similar to previous changes (1)
⏰ Context from checks skipped due to timeout of 90000ms (18)
🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
SupportNeed help? Create a ticket on our support page for assistance with any issues or questions. Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 4
🧹 Nitpick comments (10)
flink/src/test/scala/ai/chronon/flink/test/deser/SourceIdentityDeSerializationSupportSpec.scala (3)
32-32
: Consider adding null/empty check before accessing head.This could throw if deserialization fails.
- val row = resultList.toScala.head + assert(resultList.nonEmpty, "Deserialized result list shouldn't be empty") + val row = resultList.toScala.head
43-46
: Add assertion messages for easier debugging.These assertions lack descriptive messages.
- assert(schema.fieldNames sameElements avroSchemaFields.map(_.name())) - schema.fieldNames.foreach(name => assert(schema(name).dataType == avroSchemaToSparkDataTypes(name))) - // spot check a couple of fields - assert(row.get(0) == 12345) - assert(row.getString(1) == "johndoe") + assert(schema.fieldNames sameElements avroSchemaFields.map(_.name()), "Field names should match") + schema.fieldNames.foreach(name => assert(schema(name).dataType == avroSchemaToSparkDataTypes(name), s"Data type mismatch for field $name")) + // spot check a couple of fields + assert(row.get(0) == 12345, "ID field should match expected value") + assert(row.getString(1) == "johndoe", "Username field should match expected value")
65-65
: Use consistent assertion style.Switch from JUnit to ScalaTest assertion.
- assertTrue(resultList.isEmpty) + assert(resultList.isEmpty, "Result list should be empty for corrupted data")flink/src/main/scala/ai/chronon/flink/deser/CustomSchemaSerDe.scala (2)
28-30
: Hard-coded schema guard limits reuse.
require(schemaName == "beacon")
blocks other schemas. Consider accepting any name or validating against a list.
31-40
: Redundant schema conversion each JVM.Both
chrononSchema
andavroSerDe
recompute on every class-loader instantiation. Cache in companion object or pass prebuilt schema if many topics share it.api/python/test/canary/group_bys/gcp/beacon_canary.py (1)
37-40
: String-built URI is fragile.Manual
/
&;
concatenation is error-prone and obscures param parsing. Preferurllib.parse
or a helper builder to encode/escape values.from urllib.parse import quote_plus kafka_topic = ( f"kafka://test-beacon-main/" f"{quote_plus(schema_provider_cfgs)}/" f"{quote_plus(google_kafka_cfgs)}" )flink/src/main/scala/ai/chronon/flink/deser/SchemaRegistrySerDe.scala (1)
49-69
: Network call executed during lazy val may block job start.
retrieveTopicSchema
hits the registry in object construction. Consider asynchronous fetch or retry back-off to avoid long startup stalls.flink/src/main/scala/ai/chronon/flink/deser/ChrononDeserializationSchema.scala (1)
29-38
: Consider making the builder returnChrononDeserializationSchema[_]
instead of concrete types.
Returning the generic parent type allows for future polymorphism (e.g., passing either schema to a common method) without an explicit cast.flink/src/main/scala/ai/chronon/flink/deser/DeserializationSchema.scala (2)
71-75
:rowSerializer
is set but never used.
Dead code adds cognitive load; remove unless a follow-up change will reference it.- @transient private var rowSerializer: ExpressionEncoder.Serializer[Row] = _ ... - rowSerializer = eventExprEncoder.createSerializer()
77-82
: Avoid recreating heavySparkExpressionEval
on every call.
projectedSchema
instantiates a new evaluator each invocation; cache the result to save driver & TM memory/CPU.-private def projectedSchemaLazy = - new SparkExpressionEval[Row](sourceEventEncoder, groupBy).getOutputSchema -override def projectedSchema = projectedSchemaLazy.fields.map( ... ) +lazy val outputSchema = new SparkExpressionEval[Row](sourceEventEncoder, groupBy).getOutputSchema +override def projectedSchema: Array[(String, DataType)] = + outputSchema.fields.map(f => (f.name, SparkConversions.toChrononType(f.name, f.dataType)))
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro (Legacy)
📒 Files selected for processing (33)
api/python/test/canary/group_bys/gcp/beacon_canary.py
(1 hunks)api/python/test/canary/teams.py
(1 hunks)cloud_aws/src/main/scala/ai/chronon/integrations/aws/AwsApiImpl.scala
(1 hunks)cloud_gcp/BUILD.bazel
(1 hunks)cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/GcpApiImpl.scala
(2 hunks)cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitterTest.scala
(2 hunks)flink/src/main/scala/ai/chronon/flink/FlinkJob.scala
(2 hunks)flink/src/main/scala/ai/chronon/flink/FlinkKafkaBeaconEventDriver.scala
(3 hunks)flink/src/main/scala/ai/chronon/flink/KafkaFlinkSource.scala
(1 hunks)flink/src/main/scala/ai/chronon/flink/SchemaProvider.scala
(0 hunks)flink/src/main/scala/ai/chronon/flink/SchemaRegistrySchemaProvider.scala
(0 hunks)flink/src/main/scala/ai/chronon/flink/SparkExpressionEval.scala
(1 hunks)flink/src/main/scala/ai/chronon/flink/deser/ChrononDeserializationSchema.scala
(1 hunks)flink/src/main/scala/ai/chronon/flink/deser/CustomSchemaSerDe.scala
(1 hunks)flink/src/main/scala/ai/chronon/flink/deser/DeserializationSchema.scala
(1 hunks)flink/src/main/scala/ai/chronon/flink/deser/FlinkSerDeProvider.scala
(1 hunks)flink/src/main/scala/ai/chronon/flink/deser/SchemaRegistrySerDe.scala
(1 hunks)flink/src/main/scala/ai/chronon/flink/validation/ValidationFlinkJob.scala
(3 hunks)flink/src/main/scala/org/apache/spark/sql/avro/AvroDeserializationSupport.scala
(0 hunks)flink/src/test/scala/ai/chronon/flink/test/SchemaRegistrySchemaProviderSpec.scala
(0 hunks)flink/src/test/scala/ai/chronon/flink/test/deser/AvroDeSerTestUtils.scala
(2 hunks)flink/src/test/scala/ai/chronon/flink/test/deser/SchemaRegistryDeSerSchemaProviderSpec.scala
(1 hunks)flink/src/test/scala/ai/chronon/flink/test/deser/SourceIdentityDeSerializationSupportSpec.scala
(1 hunks)flink/src/test/scala/ai/chronon/flink/test/deser/SourceProjectionDeSerializationSupportSpec.scala
(4 hunks)flink/src/test/scala/org/apache/spark/sql/avro/AvroSourceIdentityDeSerializationSupportSpec.scala
(0 hunks)online/src/main/scala/ai/chronon/online/Api.scala
(1 hunks)online/src/main/scala/ai/chronon/online/CatalystUtil.scala
(1 hunks)online/src/main/scala/ai/chronon/online/metrics/Metrics.scala
(1 hunks)online/src/main/scala/ai/chronon/online/metrics/OtelMetricsReporter.scala
(1 hunks)online/src/main/scala/ai/chronon/online/serde/AvroSerDe.scala
(1 hunks)online/src/main/scala/ai/chronon/online/serde/SerDe.scala
(2 hunks)service_commons/src/main/java/ai/chronon/service/ChrononServiceLauncher.java
(1 hunks)spark/src/main/scala/ai/chronon/spark/utils/MockApi.scala
(1 hunks)
💤 Files with no reviewable changes (5)
- flink/src/main/scala/ai/chronon/flink/SchemaRegistrySchemaProvider.scala
- flink/src/test/scala/org/apache/spark/sql/avro/AvroSourceIdentityDeSerializationSupportSpec.scala
- flink/src/main/scala/ai/chronon/flink/SchemaProvider.scala
- flink/src/main/scala/org/apache/spark/sql/avro/AvroDeserializationSupport.scala
- flink/src/test/scala/ai/chronon/flink/test/SchemaRegistrySchemaProviderSpec.scala
🧰 Additional context used
🧬 Code Graph Analysis (10)
flink/src/main/scala/ai/chronon/flink/KafkaFlinkSource.scala (1)
flink/src/main/scala/ai/chronon/flink/deser/ChrononDeserializationSchema.scala (1)
ChrononDeserializationSchema
(16-20)
online/src/main/scala/ai/chronon/online/Api.scala (3)
cloud_aws/src/main/scala/ai/chronon/integrations/aws/AwsApiImpl.scala (1)
streamDecoder
(45-45)cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/GcpApiImpl.scala (1)
streamDecoder
(42-45)spark/src/main/scala/ai/chronon/spark/utils/MockApi.scala (1)
streamDecoder
(106-111)
cloud_aws/src/main/scala/ai/chronon/integrations/aws/AwsApiImpl.scala (2)
cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/GcpApiImpl.scala (1)
streamDecoder
(42-45)spark/src/main/scala/ai/chronon/spark/utils/MockApi.scala (1)
streamDecoder
(106-111)
spark/src/main/scala/ai/chronon/spark/utils/MockApi.scala (4)
online/src/main/scala/ai/chronon/online/serde/SparkConversions.scala (4)
get
(32-32)SparkConversions
(56-159)fromChrononSchema
(126-131)fromChrononSchema
(131-136)online/src/main/scala/ai/chronon/online/serde/AvroConversions.scala (2)
fromChrononSchema
(82-134)AvroConversions
(32-274)online/src/main/scala/ai/chronon/online/GroupByServingInfoParsed.scala (1)
streamChrononSchema
(140-145)online/src/main/scala/ai/chronon/online/Extensions.scala (1)
catalogString
(27-29)
online/src/main/scala/ai/chronon/online/CatalystUtil.scala (1)
online/src/main/scala/ai/chronon/online/SparkInternalRowConversions.scala (1)
SparkInternalRowConversions
(32-184)
online/src/main/scala/ai/chronon/online/metrics/Metrics.scala (1)
online/src/main/scala/ai/chronon/online/metrics/OtelMetricsReporter.scala (4)
OtelMetricsReporter
(19-79)OtelMetricsReporter
(81-149)buildOtelMetricReader
(97-118)buildOpenTelemetryClient
(120-148)
online/src/main/scala/ai/chronon/online/metrics/OtelMetricsReporter.scala (1)
api/src/main/java/ai/chronon/api/thrift/Option.java (2)
Option
(25-143)Some
(93-111)
flink/src/main/scala/ai/chronon/flink/SparkExpressionEval.scala (1)
online/src/main/scala/ai/chronon/online/CatalystUtil.scala (4)
performSql
(97-99)performSql
(131-136)performSql
(138-141)performSql
(143-147)
flink/src/main/scala/ai/chronon/flink/deser/SchemaRegistrySerDe.scala (3)
online/src/main/scala/ai/chronon/online/serde/AvroConversions.scala (1)
AvroConversions
(32-274)online/src/main/scala/ai/chronon/online/serde/AvroSerDe.scala (3)
AvroSerDe
(11-38)schema
(37-37)fromBytes
(24-35)online/src/main/scala/ai/chronon/online/serde/SerDe.scala (2)
Mutation
(51-51)SerDe
(11-18)
flink/src/main/scala/ai/chronon/flink/deser/DeserializationSchema.scala (3)
flink/src/main/scala/ai/chronon/flink/SparkExpressionEval.scala (5)
SparkExpressionEval
(37-191)getOutputSchema
(116-118)initialize
(65-84)performSql
(86-93)performSql
(95-98)online/src/main/scala/ai/chronon/online/serde/SerDe.scala (2)
Mutation
(51-51)SerDe
(11-18)online/src/main/scala/ai/chronon/online/serde/SparkConversions.scala (3)
SparkConversions
(56-159)toSparkRow
(136-158)toChrononType
(61-87)
🔇 Additional comments (38)
service_commons/src/main/java/ai/chronon/service/ChrononServiceLauncher.java (3)
30-30
: Improved defensive checkAdding URL validation prevents metric initialization with invalid configuration.
32-32
: Enhanced condition for metrics initializationNow properly checks both if metrics are enabled and URL is defined.
39-39
: Safe access to optional URLThis change properly accesses the URL using
.get()
after verification, avoiding potential NPE.online/src/main/scala/ai/chronon/online/metrics/Metrics.scala (1)
138-143
: Improved handling of optional metric readerNow safely handles cases when metric reader isn't available by:
- Using
map
for transformation- Falling back to no-op client
This makes metrics initialization more robust against missing configurations.
online/src/main/scala/ai/chronon/online/metrics/OtelMetricsReporter.scala (4)
93-95
: Better exporter URL handlingReturns
Option[String]
for safer null handling.
97-97
: Enhanced return type safetyReturn type
Option[MetricReader]
propagates configuration optionality up the call stack.
101-106
: Conditional metric reader creationCreates metric reader only when URL is defined using functional
map
approach.
111-114
: Consistent option wrappingPrometheus server properly wrapped in
Some()
to match return type.cloud_gcp/BUILD.bazel (1)
38-38
: Added Avro dependency correctly.Maven artifact for Apache Avro is properly inserted in alphabetical order.
flink/src/main/scala/ai/chronon/flink/KafkaFlinkSource.scala (1)
3-3
: Added necessary import.Import aligns with existing class usage in KafkaFlinkSource and ProjectedKafkaFlinkSource.
api/python/test/canary/teams.py (1)
28-28
: Added Flink state URI configuration.GCS path for Flink state storage supports the canary app requirements.
flink/src/main/scala/ai/chronon/flink/SparkExpressionEval.scala (1)
95-98
: Good method overload implementation.Cleanly converts Array[Any] to InternalRow before delegating to existing method.
online/src/main/scala/ai/chronon/online/Api.scala (1)
175-175
: Method return type updated to SerDeMethod signature now returns
SerDe
instead ofSerde
, aligning with similar changes in implementing classes.cloud_aws/src/main/scala/ai/chronon/integrations/aws/AwsApiImpl.scala (1)
45-45
: Return type updated to match parent classChanged return type from
Serde
toSerDe
to match the abstract method in Api.scala.online/src/main/scala/ai/chronon/online/CatalystUtil.scala (1)
122-122
: Made inputArrEncoder public with explicit typeChanged from private val to public val with explicit
Any => Any
type, enabling access from Flink deserialization schemas.cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitterTest.scala (2)
79-80
: Updated DataprocSubmitter initializationNow uses explicit SubmitterConf with canary project parameters instead of default constructor.
95-96
: Updated test parameters for Flink canary appChanged data file path to canary events location and added event delay parameter, supporting the new Flink canary application.
spark/src/main/scala/ai/chronon/spark/utils/MockApi.scala (1)
106-110
: Updated SerDe interface.Correctly updates return type and implementation to match the renamed SerDe interface and AvroSerDe implementation with explicit schema conversion.
flink/src/test/scala/ai/chronon/flink/test/deser/AvroDeSerTestUtils.scala (2)
1-5
: Relocated package and updated imports.Package changed from Apache Spark to Chronon's namespace with appropriate imports.
22-28
: Added SerDe test implementation.New test utility class properly implements SerDe interface by delegating to AvroSerDe.
flink/src/main/scala/ai/chronon/flink/deser/FlinkSerDeProvider.scala (1)
9-26
: Good factory pattern implementation.This provider centralizes SerDe creation logic with proper validation for mutually exclusive configuration options.
cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/GcpApiImpl.scala (2)
10-10
: Updated import statement.Consolidated imports for serialization-related classes.
42-43
: Updated SerDe interface implementation.Similar to changes in MockApi, correctly implements the renamed SerDe interface with explicit schema conversion.
flink/src/main/scala/ai/chronon/flink/validation/ValidationFlinkJob.scala (3)
21-21
: Clean import for new SerDe abstractions.Appropriate import for the refactored deserialization components.
142-142
: Better abstraction with FlinkSerDeProvider.Centralizes schema provider creation logic, improving maintainability.
144-145
: Simplified deserialization schema creation.Uses builder pattern for deserialization schema creation, making the code more readable.
flink/src/main/scala/ai/chronon/flink/FlinkJob.scala (3)
11-11
: Updated imports for new deserialization framework.Adds required imports for the refactored components.
315-315
: Improved schema provider creation.Uses centralized provider for consistent schema handling.
317-318
: Standardized deserialization schema creation.Uses builder pattern for projection-based deserialization, maintaining consistency with ValidationFlinkJob.
flink/src/main/scala/ai/chronon/flink/FlinkKafkaBeaconEventDriver.scala (3)
101-101
: Improved efficiency with lazy val.Avoids repeated schema parsing by caching the parsed schema.
105-107
: Updated schema identity.Changed record name and namespace for better organization.
108-120
: Enhanced schema definition.Updated field definitions to support canary app requirements.
online/src/main/scala/ai/chronon/online/serde/AvroSerDe.scala (1)
11-38
: Well-designed SerDe implementation.Clean implementation with:
- Lazy initialization for expensive conversions
- Clear separation of concerns
- Proper Mutation object construction
The class properly handles reversal column checks for before/after state determination.
online/src/main/scala/ai/chronon/online/serde/SerDe.scala (1)
5-11
: Rename looks good.The renaming and improved documentation clarify the role of SerDe implementations.
flink/src/test/scala/ai/chronon/flink/test/deser/SourceProjectionDeSerializationSupportSpec.scala (1)
26-28
: Builder pattern adoption looks good.Clean transition to the new deserialization schema builder.
flink/src/test/scala/ai/chronon/flink/test/deser/SchemaRegistryDeSerSchemaProviderSpec.scala (2)
14-20
: Good mock design.Clean mock implementation for testing.
62-73
: Test failure path for unsupported schema types.Good coverage of expected failure case.
flink/src/main/scala/ai/chronon/flink/deser/DeserializationSchema.scala (1)
54-57
: VerifytoSparkRow
signature.
toSparkRow
is invoked with aStructType
; earlier code shows it requires anapi.DataType
. Confirm an overload exists; otherwise the call will not compile.
val recordBytes = createDummyRecordBytes(schemaStr) | ||
deserSchema.deserialize(recordBytes, listCollector) | ||
|
||
// sanity check projected schemas is what we expect | ||
val projectedSchema = deserSchema.projectedSchema | ||
val projectedSchema = deserSchema.asInstanceOf[SourceProjectionDeserializationSchema].projectedSchema |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Avoid unsafe cast.
This cast could fail if implementation changes.
- val projectedSchema = deserSchema.asInstanceOf[SourceProjectionDeserializationSchema].projectedSchema
+ deserSchema match {
+ case schema: SourceProjectionDeserializationSchema =>
+ val projectedSchema = schema.projectedSchema
+ // rest of the code
+ case _ => fail("Expected a SourceProjectionDeserializationSchema")
+ }
flink/src/main/scala/ai/chronon/flink/deser/CustomSchemaSerDe.scala
Outdated
Show resolved
Hide resolved
override def fromBytes(message: Array[Byte]): Mutation = { | ||
val messageBytes = | ||
if (schemaRegistryWireFormat) { | ||
// schema id is set, we skip the first byte and read the schema id based on the wire format: | ||
// https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#messages-wire-format | ||
// unfortunately we need to drop the first 5 bytes (and thus copy the rest of the byte array) as the AvroDataToCatalyst | ||
// interface takes a byte array and the methods to do the Row conversion etc are all private so we can't reach in | ||
message.drop(5) | ||
} else { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Guard against short messages, avoid copy.
message.drop(5)
allocates and will throw if message.length < 5
.
- val messageBytes =
- if (schemaRegistryWireFormat) {
- message.drop(5)
- } else message
+ val messageBytes =
+ if (schemaRegistryWireFormat) {
+ require(message.length >= 5, "Message too short for wire-format")
+ java.util.Arrays.copyOfRange(message, 5, message.length)
+ } else message
override def sourceEventEncoder: Encoder[Row] = | ||
Encoders.row(SparkConversions.fromChrononSchema(deserSchemaProvider.schema)) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fromChrononSchema
likely takes a Chronon schema, but a Spark StructType
is being passed.
This mismatched type will not compile unless an overloaded method exists. Prefer supplying the existing Spark schema directly:
- override def sourceEventEncoder: Encoder[Row] =
- Encoders.row(SparkConversions.fromChrononSchema(deserSchemaProvider.schema))
+ override def sourceEventEncoder: Encoder[Row] =
+ Encoders.row(deserSchemaProvider.schema)
Committable suggestion skipped: line range outside the PR's diff.
9de5069
to
eeb6ae2
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🔭 Outside diff range comments (1)
flink/src/main/scala/ai/chronon/flink/FlinkKafkaItemEventDriver.scala (1)
126-133
: 🛠️ Refactor suggestion
Thread.sleep
insideMapFunction
blocks the operator thread.Sleeping per record throttles throughput but also stalls checkpointing & watermark progress.
Prefer aProcessFunction
that uses timers or throttle upstream reading speed instead.
🧹 Nitpick comments (2)
flink/src/main/scala/ai/chronon/flink/FlinkKafkaItemEventDriver.scala (2)
68-74
: Consider specifying a key serializer as well.With no
key
serializer, all records will have anull
key which can hamper partitioning & compaction in Kafka.
If you do not actually need keyed partitioning, please add a short comment clarifying that intent.
98-99
: Job name now mismatches the new purpose.
"Periodic Kafka Data Producer"
still references the old beacon-style driver. Renaming keeps the Flink Web-UI and logs self-describing.- env.execute("Periodic Kafka Data Producer") + env.execute("Periodic Kafka Item-Event Producer")
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro (Legacy)
📒 Files selected for processing (5)
api/python/test/canary/group_bys/gcp/item_event_canary.py
(1 hunks)api/python/test/canary/teams.py
(1 hunks)cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitterTest.scala
(2 hunks)flink/src/main/scala/ai/chronon/flink/FlinkKafkaItemEventDriver.scala
(4 hunks)flink/src/main/scala/ai/chronon/flink/deser/CustomSchemaSerDe.scala
(1 hunks)
✅ Files skipped from review due to trivial changes (1)
- api/python/test/canary/teams.py
🚧 Files skipped from review as they are similar to previous changes (2)
- cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitterTest.scala
- flink/src/main/scala/ai/chronon/flink/deser/CustomSchemaSerDe.scala
🔇 Additional comments (1)
api/python/test/canary/group_bys/gcp/item_event_canary.py (1)
37-40
: Slash-delimited property string can confuse the URI parser.Several Kafka clients interpret
/
literally, causing one long property key.
Consider using;
or&
separators, or supply properties viaextra_configs
rather than the URI.
listing_id="EXPLODE(TRANSFORM(SPLIT(COALESCE(attributes['sold_listing_ids'], attributes['listing_id']), ','), e -> CAST(e AS LONG)))", | ||
add_cart="IF(event_type = 'backend_add_to_cart', 1, 0)", | ||
view="IF(event_type = 'view_listing', 1, 0)", | ||
purchase="IF(event_type = 'backend_cart_payment', 1, 0)", | ||
favorite="IF(event_type = 'backend_favorite_item2', 1, 0)", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
BigQuery type alias LONG
is invalid – use INT64
.
Standard SQL accepts INT64
, INTEGER
, or BIGINT
; LONG
will fail at parse time.
- listing_id="EXPLODE(TRANSFORM(SPLIT(COALESCE(attributes['sold_listing_ids'], attributes['listing_id']), ','), e -> CAST(e AS LONG)))",
+ listing_id="EXPLODE(TRANSFORM(SPLIT(COALESCE(attributes['sold_listing_ids'], attributes['listing_id']), ','), e -> CAST(e AS INT64)))",
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
listing_id="EXPLODE(TRANSFORM(SPLIT(COALESCE(attributes['sold_listing_ids'], attributes['listing_id']), ','), e -> CAST(e AS LONG)))", | |
add_cart="IF(event_type = 'backend_add_to_cart', 1, 0)", | |
view="IF(event_type = 'view_listing', 1, 0)", | |
purchase="IF(event_type = 'backend_cart_payment', 1, 0)", | |
favorite="IF(event_type = 'backend_favorite_item2', 1, 0)", | |
listing_id="EXPLODE(TRANSFORM(SPLIT(COALESCE(attributes['sold_listing_ids'], attributes['listing_id']), ','), e -> CAST(e AS INT64)))", | |
add_cart="IF(event_type = 'backend_add_to_cart', 1, 0)", | |
view="IF(event_type = 'view_listing', 1, 0)", | |
purchase="IF(event_type = 'backend_cart_payment', 1, 0)", | |
favorite="IF(event_type = 'backend_favorite_item2', 1, 0)", |
eeb6ae2
to
f21ba4b
Compare
## Summary Builds on top of PR: #751. This PR adds a streaming GroupBy that can be run as a canary to sanity check and test things out while making Flink changes. I used this to sanity check the creation & use of a Mock schema serde that some users have been asking for. Can be submitted via: ``` $ CHRONON_ROOT=`pwd`/api/python/test/canary $ zipline compile --chronon-root=$CHRONON_ROOT $ zipline run --repo=$CHRONON_ROOT --version $VERSION --mode streaming --conf compiled/group_bys/gcp/item_event_canary.actions_v1 --kafka-bootstrap=bootstrap.zipline-kafka-cluster.us-central1.managedkafka.canary-443022.cloud.goog:9092 --groupby-name gcp.item_event_canary.actions_v1 --validate ``` (Needs the Flink event driver to be running - triggered via DataProcSubmitterTest) ## Checklist - [ ] Added Unit Tests - [ ] Covered by existing CI - [X] Integration tested - [ ] Documentation update <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit ## Summary by CodeRabbit - **New Features** - Introduced a new group-by aggregation for item event actions, supporting real-time analytics by listing ID with data sourced from GCP Kafka and BigQuery. - Added a mock schema provider for testing item event ingestion. - **Bug Fixes** - Updated test configurations to use new event schemas, topics, and data paths for improved accuracy in Flink Kafka ingest job tests. - **Refactor** - Renamed and restructured the event driver to focus on item events, with a streamlined schema and updated job naming. - **Chores** - Added new environment variable for Flink state storage configuration. - Updated build configuration to reference the renamed event driver. <!-- end of auto-generated comment: release notes by coderabbit.ai -->
## Summary Builds on top of PR: #751. This PR adds a streaming GroupBy that can be run as a canary to sanity check and test things out while making Flink changes. I used this to sanity check the creation & use of a Mock schema serde that some users have been asking for. Can be submitted via: ``` $ CHRONON_ROOT=`pwd`/api/python/test/canary $ zipline compile --chronon-root=$CHRONON_ROOT $ zipline run --repo=$CHRONON_ROOT --version $VERSION --mode streaming --conf compiled/group_bys/gcp/item_event_canary.actions_v1 --kafka-bootstrap=bootstrap.zipline-kafka-cluster.us-central1.managedkafka.canary-443022.cloud.goog:9092 --groupby-name gcp.item_event_canary.actions_v1 --validate ``` (Needs the Flink event driver to be running - triggered via DataProcSubmitterTest) ## Checklist - [ ] Added Unit Tests - [ ] Covered by existing CI - [X] Integration tested - [ ] Documentation update <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit ## Summary by CodeRabbit - **New Features** - Introduced a new group-by aggregation for item event actions, supporting real-time analytics by listing ID with data sourced from GCP Kafka and BigQuery. - Added a mock schema provider for testing item event ingestion. - **Bug Fixes** - Updated test configurations to use new event schemas, topics, and data paths for improved accuracy in Flink Kafka ingest job tests. - **Refactor** - Renamed and restructured the event driver to focus on item events, with a streamlined schema and updated job naming. - **Chores** - Added new environment variable for Flink state storage configuration. - Updated build configuration to reference the renamed event driver. <!-- end of auto-generated comment: release notes by coderabbit.ai -->
## Summary Builds on top of PR: #751. This PR adds a streaming GroupBy that can be run as a canary to sanity cheour clients and test things out while making Flink changes. I used this to sanity cheour clients the creation & use of a Moour clients schema serde that some users have been asking for. Can be submitted via: ``` $ CHRONON_ROOT=`pwd`/api/python/test/canary $ zipline compile --chronon-root=$CHRONON_ROOT $ zipline run --repo=$CHRONON_ROOT --version $VERSION --mode streaming --conf compiled/group_bys/gcp/item_event_canary.actions_v1 --kafka-bootstrap=bootstrap.zipline-kafka-cluster.us-central1.managedkafka.canary-443022.cloud.goog:9092 --groupby-name gcp.item_event_canary.actions_v1 --validate ``` (Needs the Flink event driver to be running - triggered via DataProcSubmitterTest) ## Cheour clientslist - [ ] Added Unit Tests - [ ] Covered by existing CI - [X] Integration tested - [ ] Documentation update <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit ## Summary by CodeRabbit - **New Features** - Introduced a new group-by aggregation for item event actions, supporting real-time analytics by listing ID with data sourced from GCP Kafka and BigQuery. - Added a moour clients schema provider for testing item event ingestion. - **Bug Fixes** - Updated test configurations to use new event schemas, topics, and data paths for improved accuracy in Flink Kafka ingest job tests. - **Refactor** - Renamed and restructured the event driver to focus on item events, with a streamlined schema and updated job naming. - **Chores** - Added new environment variable for Flink state storage configuration. - Updated build configuration to reference the renamed event driver. <!-- end of auto-generated comment: release notes by coderabbit.ai -->
Summary
Builds on top of PR: #751.
This PR adds a streaming GroupBy that can be run as a canary to sanity check and test things out while making Flink changes. I used this to sanity check the creation & use of a Mock schema serde that some users have been asking for.
Can be submitted via:
(Needs the Flink event driver to be running - triggered via DataProcSubmitterTest)
Checklist
Summary by CodeRabbit
Summary by CodeRabbit
New Features
Bug Fixes
Refactor
Chores