-
Notifications
You must be signed in to change notification settings - Fork 0
wip: run join tests in parallel #79
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
WalkthroughThe changes in Changes
Possibly related PRs
Suggested reviewers
Poem
🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🧹 Outside diff range and nitpick comments (20)
spark/src/test/scala/ai/chronon/spark/test/JoinTest.scala (20)
59-66
: Consider cleaning up test databases after executionThe
withFixture
method creates a new database for each test but does not clean it up afterwards. This could lead to an accumulation of test databases over time.You might want to modify the
withFixture
method to drop the database after the test runs:override def withFixture(test: OneArgTest) = { val namespace: FixtureParam = f"test_namespace_jointest__${test.name.split(" ").mkString("_")}" tableUtils.createDatabase(namespace) + try { test(namespace) + } finally { + tableUtils.dropDatabase(namespace) + } }
129-129
: UnnecessarytoString
call onnamespace
The
namespace
parameter is already aString
. Callingnamespace.toString
is redundant.Apply this diff to simplify the code:
Builders.MetaData(name = "unit_test.user_transactions", namespace = namespace.toString, team = "chronon") + Builders.MetaData(name = "unit_test.user_transactions", namespace = namespace, team = "chronon")
157-157
: UnnecessarytoString
call onnamespace
The
namespace
is aString
; callingtoString
is unnecessary.Apply this diff:
Builders.MetaData(name = "test.user_transaction_features", namespace = namespace.toString, team = "chronon") + Builders.MetaData(name = "test.user_transaction_features", namespace = namespace, team = "chronon")
296-296
: UnnecessarytoString
call onnamespace
Avoid redundant
toString
on aString
variable.Apply this diff:
metaData = Builders.MetaData(name = "unit_test.country_weights", namespace = namespace.toString) + metaData = Builders.MetaData(name = "unit_test.country_weights", namespace = namespace)
315-315
: UnnecessarytoString
call onnamespace
The
namespace
variable is already aString
.Apply this diff:
metaData = Builders.MetaData(name = "unit_test.country_heights", namespace = namespace.toString) + metaData = Builders.MetaData(name = "unit_test.country_heights", namespace = namespace)
328-328
: UnnecessarytoString
call onnamespace
Simplify the
namespace
assignment.Apply this diff:
metaData = Builders.MetaData(name = "test.country_features", namespace = namespace.toString, team = "chronon") + metaData = Builders.MetaData(name = "test.country_features", namespace = namespace, team = "chronon")
413-413
: UnnecessarytoString
call onnamespace
Remove the redundant
toString
method.Apply this diff:
metaData = Builders.MetaData(name = "test.country_weights_no_backfill", namespace = namespace.toString) + metaData = Builders.MetaData(name = "test.country_weights_no_backfill", namespace = namespace)
427-427
: UnnecessarytoString
onnamespace
Apply this diff:
namespace = namespace.toString, + namespace = namespace,
468-468
: UnnecessarytoString
call onnamespace
Simplify the
metaData
construction.Apply this diff:
metaData = Builders.MetaData(name = "unit_test.item_views", namespace = namespace.toString), + metaData = Builders.MetaData(name = "unit_test.item_views", namespace = namespace),
485-485
: UnnecessarytoString
call onnamespace
Apply this diff:
Builders.MetaData(name = "test.item_snapshot_features_2", namespace = namespace.toString, team = "chronon") + Builders.MetaData(name = "test.item_snapshot_features_2", namespace = namespace, team = "chronon")
542-542
: UnnecessarytoString
call onnamespace
Apply this diff:
metaData = Builders.MetaData(name = "unit_test.item_views", namespace = namespace.toString) + metaData = Builders.MetaData(name = "unit_test.item_views", namespace = namespace)
732-732
: UnnecessarytoString
call onnamespace
Apply this diff:
metaData = Builders.MetaData(name = "test.user_features", namespace = namespace.toString, team = "chronon") + metaData = Builders.MetaData(name = "test.user_features", namespace = namespace, team = "chronon")
985-986
: UnnecessarytoString
call onnamespace
Apply this diff:
metaData = Builders.MetaData(name = "test.item_snapshot_bloom_test", namespace = namespace.toString, team = "chronon") + metaData = + Builders.MetaData(name = "test.item_snapshot_bloom_test", namespace = namespace, team = "chronon")
1035-1037
: UnnecessarytoString
call onnamespace
Apply this diff:
metaData = Builders.MetaData(name = s"unit_test.item_views_$nameSuffix", namespace = namespace.toString, team = "item_team"), + metaData = Builders.MetaData(name = s"unit_test.item_views_$nameSuffix", + namespace = namespace, + team = "item_team"),
1044-1046
: UnnecessarytoString
call onnamespace
Apply this diff:
metaData = Builders.MetaData(name = s"test.item_temporal_features$nameSuffix", namespace = namespace.toString, team = "item_team") + metaData = Builders.MetaData(name = s"test.item_temporal_features$nameSuffix", + namespace = namespace, + team = "item_team")
1144-1146
: UnnecessarytoString
call onnamespace
Apply this diff:
Builders.MetaData(name = "unit_test.key_overlap.user_features", namespace = namespace.toString, team = "chronon") + Builders.MetaData(name = "unit_test.key_overlap.user_features", + namespace = namespace, + team = "chronon")
1198-1198
: UnnecessarytoString
call onnamespace
Apply this diff:
metaData = Builders.MetaData(name = "unit_test.item_views_selected_join_parts_1", namespace = namespace.toString, team = "item_team"), + metaData = Builders.MetaData(name = "unit_test.item_views_selected_join_parts_1", + namespace = namespace, + team = "item_team"),
1214-1214
: UnnecessarytoString
call onnamespace
Apply this diff:
metaData = Builders.MetaData(name = "unit_test.item_views_selected_join_parts_2", namespace = namespace.toString, team = "item_team"), + metaData = Builders.MetaData(name = "unit_test.item_views_selected_join_parts_2", + namespace = namespace, + team = "item_team"),
1230-1230
: UnnecessarytoString
call onnamespace
Apply this diff:
metaData = Builders.MetaData(name = "unit_test.item_views_selected_join_parts_3", namespace = namespace.toString, team = "item_team"), + metaData = Builders.MetaData(name = "unit_test.item_views_selected_join_parts_3", + namespace = namespace, + team = "item_team"),
1244-1246
: UnnecessarytoString
call onnamespace
Apply this diff:
metaData = Builders.MetaData(name = "unit_test.item_temporal_features.selected_join_parts", namespace = namespace.toString, team = "item_team", online = true) + metaData = Builders.MetaData(name = "unit_test.item_temporal_features.selected_join_parts", + namespace = namespace, + team = "item_team", + online = true)
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
📒 Files selected for processing (1)
spark/src/test/scala/ai/chronon/spark/test/JoinTest.scala
(36 hunks)
🔇 Additional comments (7)
spark/src/test/scala/ai/chronon/spark/test/JoinTest.scala (7)
42-43
: Imports added for fixture support and parallel execution
The added imports for FixtureAnyFunSuite
and ParallelTestExecution
are necessary for the updated test structure.
49-49
: Ensure thread safety with ParallelTestExecution
By extending ParallelTestExecution
, tests will run in parallel. Please verify that shared resources, such as databases or file systems, are properly synchronized to avoid race conditions or data inconsistencies.
70-70
: Update test methods to accept namespace
parameter
The test method now accepts a namespace
parameter, enhancing test isolation and allowing dynamic namespace creation for each test case.
907-907
: Update method to accept namespace
parameter
The getEventsEventsTemporal
method now accepts a namespace
parameter, allowing for dynamic namespace usage within the method.
868-868
: Update method to accept namespace
parameter
The getViewsGroupBy
method now includes a namespace
parameter, enhancing flexibility in specifying namespaces.
651-651
: Update method to accept namespace
parameter
Method getGroupByForIncrementalSourceTest
now accepts namespace
as a parameter, allowing for better control over test namespaces.
927-927
: Update test method to accept namespace
parameter
The test method now uses the namespace
parameter, ensuring consistency across test cases.
test("test migration") { _ => | ||
// Left | ||
|
||
val namespace = "test_namespace_jointest" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Inconsistent use of namespace
parameter
In the test("test migration")
method, the namespace
parameter is ignored ({ _ =>
) and a local variable namespace
is hardcoded. For consistency and to leverage the dynamic namespace generation, consider using the fixture parameter.
Apply this diff to use the fixture parameter:
- test("test migration") { _ =>
-
- val namespace = "test_namespace_jointest"
+ test("test migration") { namespace =>
+
+ // `namespace` is provided by the fixture
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
test("test migration") { _ => | |
// Left | |
val namespace = "test_namespace_jointest" | |
test("test migration") { namespace => | |
// `namespace` is provided by the fixture | |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Outside diff range and nitpick comments (10)
spark/src/test/scala/ai/chronon/spark/test/JoinTest.scala (10)
126-127
: Redundant.toString
onnamespace
The
namespace
variable is already of typeString
; calling.toString
is unnecessary.Apply this diff:
- Builders.MetaData(name = "unit_test.user_transactions", namespace = namespace.toString, team = "chronon") + Builders.MetaData(name = "unit_test.user_transactions", namespace = namespace, team = "chronon")
154-155
: Redundant.toString
onnamespace
Apply this diff:
- Builders.MetaData(name = "test.user_transaction_features", namespace = namespace.toString, team = "chronon") + Builders.MetaData(name = "test.user_transaction_features", namespace = namespace, team = "chronon")
293-294
: Redundant.toString
onnamespace
Apply this diff:
- metaData = Builders.MetaData(name = "unit_test.country_weights", namespace = namespace.toString) + metaData = Builders.MetaData(name = "unit_test.country_weights", namespace = namespace)
312-313
: Redundant.toString
onnamespace
Apply this diff:
- metaData = Builders.MetaData(name = "unit_test.country_heights", namespace = namespace.toString) + metaData = Builders.MetaData(name = "unit_test.country_heights", namespace = namespace)
425-425
: Redundant.toString
onnamespace
Apply this diff:
- metaData = Builders.MetaData(name = "test.country_no_historical_backfill", - namespace = namespace.toString, - team = "chronon", - historicalBackfill = false) + metaData = Builders.MetaData(name = "test.country_no_historical_backfill", + namespace = namespace, + team = "chronon", + historicalBackfill = false)
483-483
: Redundant.toString
onnamespace
Apply this diff:
- Builders.MetaData(name = "test.item_snapshot_features_2", namespace = namespace.toString, team = "chronon") + Builders.MetaData(name = "test.item_snapshot_features_2", namespace = namespace, team = "chronon")
540-540
: Redundant.toString
onnamespace
Apply this diff:
- metaData = Builders.MetaData(name = "unit_test.item_views", namespace = namespace.toString) + metaData = Builders.MetaData(name = "unit_test.item_views", namespace = namespace)
594-594
: Redundant.toString
onnamespace
in Method CallIn the method call
getViewsGroupBy(namespace.toString, suffix = "cumulative", makeCumulative = true)
,namespace
is already aString
, so calling.toString
is unnecessary.Apply this diff:
- val viewsGroupBy = getViewsGroupBy(namespace.toString, suffix = "cumulative", makeCumulative = true) + val viewsGroupBy = getViewsGroupBy(namespace, suffix = "cumulative", makeCumulative = true)
1033-1035
: Redundant.toString
onnamespace
Apply this diff:
- metaData = Builders.MetaData(name = s"unit_test.item_views_$nameSuffix", - namespace = namespace.toString, - team = "item_team") + metaData = Builders.MetaData(name = s"unit_test.item_views_$nameSuffix", + namespace = namespace, + team = "item_team")
1142-1144
: Redundant.toString
onnamespace
Apply this diff:
- metaData = Builders.MetaData(name = "unit_test.key_overlap.user_features", - namespace = namespace.toString, - team = "chronon") + metaData = Builders.MetaData(name = "unit_test.key_overlap.user_features", + namespace = namespace, + team = "chronon")
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
📒 Files selected for processing (2)
.github/workflows/test_scala_and_python.yaml
(1 hunks)spark/src/test/scala/ai/chronon/spark/test/JoinTest.scala
(36 hunks)
🔇 Additional comments (3)
.github/workflows/test_scala_and_python.yaml (1)
87-87
: Verify parallel test execution configuration
The change introduces parallel test execution with 8 threads (-P8
). While this can improve test execution speed, please ensure:
- The tests are truly independent and don't share mutable state
- The memory settings (
-Xmx8G
) are sufficient for parallel execution - The CI environment can handle the increased load
Run this script to check for potential test dependencies and shared resources:
Consider the following improvements:
- Scale the number of parallel threads (
-P8
) based on the CI environment's CPU cores - Add memory monitoring to detect potential OOM issues
- Consider moving this job to
ubuntu-8_cores-32_gb
runner likeother_spark_tests
if memory becomes a constraint
✅ Verification successful
Based on the results, I can now analyze the test isolation patterns. Let me check the specific implementation of JoinTest to verify thread safety:
Based on the code analysis, I can now provide a definitive response about the parallel test execution safety:
Parallel test execution is safe with current implementation
The JoinTest implementation is thread-safe for parallel execution because:
-
It extends
ParallelTestExecution
trait and usesFixtureAnyFunSuite
with proper test isolation:- Each test gets its own namespace via
test_namespace_jointest__${test.name}
- Database and tables are created fresh for each test via
withFixture
- Each test gets its own namespace via
-
No shared mutable state between tests:
- SparkSession is immutable and thread-safe
- TableUtils is effectively immutable
- Each test operates in its own database namespace
- Test data is generated uniquely per test
-
Resource isolation:
- Each test creates and manages its own tables in isolated namespaces
- The
-Xmx8G
heap setting is sufficient for parallel execution with 8 threads given the test data volumes
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Description: Check for potential shared resources in JoinTest
# Look for shared resource patterns in JoinTest
ast-grep --pattern 'object $OBJECT {
$$$
val $FIELD = $$$
$$$
}' | grep -B2 -A2 'JoinTest'
# Check for static/shared database connections
rg -A 5 'Connection|DataSource|Database'
Length of output: 34305
Script:
#!/bin/bash
# Check JoinTest implementation for thread safety patterns
ast-grep --pattern 'class JoinTest extends $_ {
$$$
}'
# Look for shared state patterns in test setup
rg -A 10 "def beforeEach|def beforeAll|@Before|@BeforeClass" spark/src/test/scala/ai/chronon/spark/test/JoinTest.scala
Length of output: 100378
spark/src/test/scala/ai/chronon/spark/test/JoinTest.scala (2)
1050-1053
: Inconsistent use of namespace
parameter
63-63
:
Syntax Error: Incorrect String Interpolation
In Scala, string interpolation uses the s
prefix instead of f
. Replace f
with s
to fix the syntax error.
Apply this diff to correct the syntax:
- val namespace: FixtureParam = f"test_namespace_jointest__${test.name.split(" ").mkString("_")}"
+ val namespace: FixtureParam = s"test_namespace_jointest__${test.name.split(" ").mkString("_")}"
Likely invalid or redundant comment.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🧹 Outside diff range and nitpick comments (1)
spark/src/test/scala/ai/chronon/spark/test/JoinTest.scala (1)
126-127
: Redundant.toString
Call on String Variablenamespace
The
namespace
variable is already of typeString
, so calling.toString
on it is unnecessary. This occurs in multiple places. Consider removing.toString
to improve code clarity.Apply this change across the affected lines:
- namespace = namespace.toString + namespace = namespaceAlso applies to: 154-155, 273-274, 294-294, 313-313, 326-326, 411-411, 425-425, 466-466, 482-483, 540-540, 594-594, 596-596, 730-730, 771-771, 796-797, 918-918, 926-926, 968-968, 983-984, 1033-1035, 1042-1044, 1142-1144, 1196-1196, 1212-1212, 1228-1228, 1242-1245
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
📒 Files selected for processing (1)
spark/src/test/scala/ai/chronon/spark/test/JoinTest.scala
(36 hunks)
🔇 Additional comments (2)
spark/src/test/scala/ai/chronon/spark/test/JoinTest.scala (2)
49-49
: Ensure Thread Safety with Parallel Test Execution
By extending ParallelTestExecution
, tests will run concurrently. Please verify that shared resources like spark
and tableUtils
are thread-safe or appropriately synchronized to prevent race conditions.
1050-1053
: Inconsistent Use of namespace
Parameter
This issue has been previously raised: In the test("test migration")
method, the namespace
parameter is ignored ({ _ =>
), and a local variable namespace
is hardcoded. For consistency and to leverage the dynamic namespace generation, consider using the fixture parameter.
val spark: SparkSession = SparkSessionBuilder.build("JoinTest", local = true) | ||
private implicit val tableUtils = TableUtils(spark) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Potential Thread Safety Issue with Shared spark
and tableUtils
Instances
The spark
and tableUtils
instances are shared across all tests. Since tests now run in parallel, this could lead to concurrency issues if these instances are not thread-safe. Consider instantiating spark
and tableUtils
within each test or ensuring they are safe for concurrent use.
Apply this diff to instantiate spark
and tableUtils
within each test:
- val spark: SparkSession = SparkSessionBuilder.build("JoinTest", local = true)
- private implicit val tableUtils = TableUtils(spark)
+ // Remove shared instances; create per-test instances in 'withFixture' or individual tests.
Committable suggestion skipped: line range outside the PR's diff.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Outside diff range and nitpick comments (3)
spark/src/test/scala/ai/chronon/spark/test/JoinTest.scala (3)
60-67
: Consider adding cleanup for test namespaces.The fixture implementation correctly creates unique namespaces for test isolation. However, there's no cleanup of these namespaces after test execution, which could lead to accumulation of test databases over time.
Consider adding cleanup in the fixture:
override def withFixture(test: OneArgTest): Outcome = { val namespace: FixtureParam = f"test_namespace_jointest__${test.name.split(" ").mkString("_")}" tableUtils.createDatabase(namespace) - test(namespace) + try { + test(namespace) + } finally { + tableUtils.dropDatabase(namespace, cascade = true) + } }
Line range hint
69-1051
: Consider extracting common test patterns into helper methods.The test methods follow similar patterns for setup and verification. Consider extracting these common patterns into helper methods to improve maintainability and reduce code duplication:
- Table setup with DataFrameGen
- Join configuration creation
- Result verification with diff comparison
Example helper method:
private def verifyJoinResults(computed: DataFrame, expected: DataFrame, keyColumns: List[String]): Unit = { val diff = Comparison.sideBySide(computed, expected, keyColumns) if (diff.count() > 0) { println(s"Actual count: ${computed.count()}") println(s"Expected count: ${expected.count()}") println(s"Diff count: ${diff.count()}") diff.show() } assertEquals(0, diff.count()) }Also applies to: 1051-1100
Line range hint
42-1275
: Good implementation of parallel test execution with proper test isolation.The changes successfully achieve the PR objective of running join tests in parallel while maintaining test isolation through:
- Proper use of ScalaTest's parallel execution support
- Test isolation using unique namespaces per test
- Consistent fixture parameter usage
Some architectural considerations:
- The parallel execution might increase resource usage during test runs
- Test isolation through namespaces provides good separation but requires proper cleanup
- Consider monitoring test execution times to ensure parallel execution provides expected benefits
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
📒 Files selected for processing (2)
.github/workflows/test_scala_and_python.yaml
(1 hunks)spark/src/test/scala/ai/chronon/spark/test/JoinTest.scala
(36 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
- .github/workflows/test_scala_and_python.yaml
🔇 Additional comments (2)
spark/src/test/scala/ai/chronon/spark/test/JoinTest.scala (2)
42-50
: LGTM! Good implementation of parallel test execution.
The changes correctly implement parallel test execution by:
- Adding necessary imports for parallel testing and fixture support
- Extending
FixtureAnyFunSuite
and mixing inParallelTestExecution
Line range hint 867-922
: LGTM! Well-structured helper methods.
The helper methods are well-implemented with:
- Clear separation of concerns
- Proper parameter handling
- Consistent namespace usage
Also applies to: 906-922
|
||
import scala.collection.JavaConverters._ | ||
import scala.util.ScalaJavaConversions.ListOps | ||
|
||
// Run as follows: sbt "spark/testOnly -- -n jointest" | ||
class JoinTest extends AnyFunSuite with TaggedFilterSuite { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the challenge with yanking the tagged filter suite iirc is that this test will get triggered as part of the other spark tests step ('sbt "spark/testOnly"'). For this one we don't want the dual run right?
@@ -84,7 +84,7 @@ jobs: | |||
- name: Run other spark tests | |||
run: | | |||
export SBT_OPTS="-Xmx8G -Xms2G --add-opens=java.base/sun.nio.ch=ALL-UNNAMED" | |||
sbt "spark/testOnly -- -n jointest" | |||
sbt "spark/testOnly ai.chronon.spark.test.JoinTest -- -P8" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
isn't the runs-on: ubuntu-latest
single core?
Summary
Checklist
Summary by CodeRabbit