Skip to content

chore: split out expensive spark tests to parallelize #382

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 15 commits into from
Feb 14, 2025
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
130 changes: 130 additions & 0 deletions .github/workflows/test_scala_spark.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -44,3 +44,133 @@ jobs:
--google_credentials=bazel-cache-key.json \
--test_env=JAVA_OPTS="-Xmx8G -Xms2G" \
//spark:tests

fetcher_tests:
runs-on: ubuntu-8_cores-32_gb
container:
image: ghcr.io/${{ github.repository }}-ci:latest
credentials:
username: ${{ github.actor }}
password: ${{ secrets.GITHUB_TOKEN }}
defaults:
run:
working-directory: ${{ github.workspace }}

steps:
- uses: actions/checkout@v4

- name: Setup Bazel cache credentials
run: |
echo "${{ secrets.BAZEL_CACHE_CREDENTIALS }}" | base64 -d > bazel-cache-key.json

- name: Run Fetcher tests
run: |
bazel test \
--remote_cache=https://storage.googleapis.com/zipline-bazel-cache \
--google_credentials=bazel-cache-key.json \
--test_env=JAVA_OPTS="-Xmx16G -Xms8G" \
//spark:fetcher_test

join_tests:
runs-on: ubuntu-8_cores-32_gb
container:
image: ghcr.io/${{ github.repository }}-ci:latest
credentials:
username: ${{ github.actor }}
password: ${{ secrets.GITHUB_TOKEN }}
defaults:
run:
working-directory: ${{ github.workspace }}

steps:
- uses: actions/checkout@v4

- name: Setup Bazel cache credentials
run: |
echo "${{ secrets.BAZEL_CACHE_CREDENTIALS }}" | base64 -d > bazel-cache-key.json

- name: Run Join tests
run: |
bazel test \
--remote_cache=https://storage.googleapis.com/zipline-bazel-cache \
--google_credentials=bazel-cache-key.json \
--test_env=JAVA_OPTS="-Xmx16G -Xms8G" \
//spark:join_test

groupby_tests:
runs-on: ubuntu-8_cores-32_gb
container:
image: ghcr.io/${{ github.repository }}-ci:latest
credentials:
username: ${{ github.actor }}
password: ${{ secrets.GITHUB_TOKEN }}
defaults:
run:
working-directory: ${{ github.workspace }}

steps:
- uses: actions/checkout@v4

- name: Setup Bazel cache credentials
run: |
echo "${{ secrets.BAZEL_CACHE_CREDENTIALS }}" | base64 -d > bazel-cache-key.json

- name: Run GroupBy tests
run: |
bazel test \
--remote_cache=https://storage.googleapis.com/zipline-bazel-cache \
--google_credentials=bazel-cache-key.json \
--test_env=JAVA_OPTS="-Xmx16G -Xms8G" \
//spark:groupby_test

analyzer_tests:
runs-on: ubuntu-8_cores-32_gb
container:
image: ghcr.io/${{ github.repository }}-ci:latest
credentials:
username: ${{ github.actor }}
password: ${{ secrets.GITHUB_TOKEN }}
defaults:
run:
working-directory: ${{ github.workspace }}

steps:
- uses: actions/checkout@v4

- name: Setup Bazel cache credentials
run: |
echo "${{ secrets.BAZEL_CACHE_CREDENTIALS }}" | base64 -d > bazel-cache-key.json

- name: Run Analyzer tests
run: |
bazel test \
--remote_cache=https://storage.googleapis.com/zipline-bazel-cache \
--google_credentials=bazel-cache-key.json \
--test_env=JAVA_OPTS="-Xmx16G -Xms8G" \
//spark:analyzer_test

streamimg_tests:
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Fix typo in job name.

"streamimg_tests" should be "streaming_tests".

-  streamimg_tests:
+  streaming_tests:
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
streamimg_tests:
streaming_tests:

runs-on: ubuntu-8_cores-32_gb
container:
image: ghcr.io/${{ github.repository }}-ci:latest
credentials:
username: ${{ github.actor }}
password: ${{ secrets.GITHUB_TOKEN }}
defaults:
run:
working-directory: ${{ github.workspace }}

steps:
- uses: actions/checkout@v4

- name: Setup Bazel cache credentials
run: |
echo "${{ secrets.BAZEL_CACHE_CREDENTIALS }}" | base64 -d > bazel-cache-key.json

- name: Run Streaming tests
run: |
bazel test \
--remote_cache=https://storage.googleapis.com/zipline-bazel-cache \
--google_credentials=bazel-cache-key.json \
--test_env=JAVA_OPTS="-Xmx16G -Xms8G" \
//spark:streaming_test
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Refactor duplicate job configurations using a reusable workflow.

Extract common configuration into a reusable workflow to reduce duplication.

Create .github/workflows/spark-test-job.yaml:

name: Spark Test Job
on:
  workflow_call:
    inputs:
      test_target:
        required: true
        type: string
      memory_opts:
        required: true
        type: string
      step_name:
        required: true
        type: string

jobs:
  test:
    runs-on: ubuntu-8_cores-32_gb
    container:
      image: ghcr.io/${{ github.repository }}-ci:latest
      credentials:
        username: ${{ github.actor }}
        password: ${{ secrets.GITHUB_TOKEN }}
    defaults:
      run:
        working-directory: ${{ github.workspace }}
    steps:
      - uses: actions/checkout@v4
      - name: Setup Bazel cache credentials
        run: |
          echo "${{ secrets.BAZEL_CACHE_CREDENTIALS }}" | base64 -d > bazel-cache-key.json
      - name: ${{ inputs.step_name }}
        run: |
          bazel test \
            --remote_cache=https://storage.googleapis.com/zipline-bazel-cache \
            --google_credentials=bazel-cache-key.json \
            --test_env=JAVA_OPTS="${{ inputs.memory_opts }}" \
            ${{ inputs.test_target }}

Then simplify the main workflow:

jobs:
  spark_tests:
    uses: ./.github/workflows/spark-test-job.yaml
    with:
      test_target: //spark:tests
      memory_opts: "-Xmx8G -Xms2G"
      step_name: "Run Spark tests"
  fetcher_tests:
    uses: ./.github/workflows/spark-test-job.yaml
    with:
      test_target: //spark:fetcher_test
      memory_opts: "-Xmx16G -Xms8G"
      step_name: "Run Fetcher tests"
  # ... repeat for other jobs
🧰 Tools
🪛 actionlint (1.7.4)

49-49: label "ubuntu-8_cores-32_gb" is unknown. available labels are "windows-latest", "windows-latest-8-cores", "windows-2022", "windows-2019", "ubuntu-latest", "ubuntu-latest-4-cores", "ubuntu-latest-8-cores", "ubuntu-latest-16-cores", "ubuntu-24.04", "ubuntu-22.04", "ubuntu-20.04", "macos-latest", "macos-latest-xl", "macos-latest-xlarge", "macos-latest-large", "macos-15-xlarge", "macos-15-large", "macos-15", "macos-14-xl", "macos-14-xlarge", "macos-14-large", "macos-14", "macos-13-xl", "macos-13-xlarge", "macos-13-large", "macos-13", "macos-12-xl", "macos-12-xlarge", "macos-12-large", "macos-12", "self-hosted", "x64", "arm", "arm64", "linux", "macos", "windows". if it is a custom label for self-hosted runner, set list of labels in actionlint.yaml config file

(runner-label)


75-75: label "ubuntu-8_cores-32_gb" is unknown. available labels are "windows-latest", "windows-latest-8-cores", "windows-2022", "windows-2019", "ubuntu-latest", "ubuntu-latest-4-cores", "ubuntu-latest-8-cores", "ubuntu-latest-16-cores", "ubuntu-24.04", "ubuntu-22.04", "ubuntu-20.04", "macos-latest", "macos-latest-xl", "macos-latest-xlarge", "macos-latest-large", "macos-15-xlarge", "macos-15-large", "macos-15", "macos-14-xl", "macos-14-xlarge", "macos-14-large", "macos-14", "macos-13-xl", "macos-13-xlarge", "macos-13-large", "macos-13", "macos-12-xl", "macos-12-xlarge", "macos-12-large", "macos-12", "self-hosted", "x64", "arm", "arm64", "linux", "macos", "windows". if it is a custom label for self-hosted runner, set list of labels in actionlint.yaml config file

(runner-label)


101-101: label "ubuntu-8_cores-32_gb" is unknown. available labels are "windows-latest", "windows-latest-8-cores", "windows-2022", "windows-2019", "ubuntu-latest", "ubuntu-latest-4-cores", "ubuntu-latest-8-cores", "ubuntu-latest-16-cores", "ubuntu-24.04", "ubuntu-22.04", "ubuntu-20.04", "macos-latest", "macos-latest-xl", "macos-latest-xlarge", "macos-latest-large", "macos-15-xlarge", "macos-15-large", "macos-15", "macos-14-xl", "macos-14-xlarge", "macos-14-large", "macos-14", "macos-13-xl", "macos-13-xlarge", "macos-13-large", "macos-13", "macos-12-xl", "macos-12-xlarge", "macos-12-large", "macos-12", "self-hosted", "x64", "arm", "arm64", "linux", "macos", "windows". if it is a custom label for self-hosted runner, set list of labels in actionlint.yaml config file

(runner-label)


127-127: label "ubuntu-8_cores-32_gb" is unknown. available labels are "windows-latest", "windows-latest-8-cores", "windows-2022", "windows-2019", "ubuntu-latest", "ubuntu-latest-4-cores", "ubuntu-latest-8-cores", "ubuntu-latest-16-cores", "ubuntu-24.04", "ubuntu-22.04", "ubuntu-20.04", "macos-latest", "macos-latest-xl", "macos-latest-xlarge", "macos-latest-large", "macos-15-xlarge", "macos-15-large", "macos-15", "macos-14-xl", "macos-14-xlarge", "macos-14-large", "macos-14", "macos-13-xl", "macos-13-xlarge", "macos-13-large", "macos-13", "macos-12-xl", "macos-12-xlarge", "macos-12-large", "macos-12", "self-hosted", "x64", "arm", "arm64", "linux", "macos", "windows". if it is a custom label for self-hosted runner, set list of labels in actionlint.yaml config file

(runner-label)


153-153: label "ubuntu-8_cores-32_gb" is unknown. available labels are "windows-latest", "windows-latest-8-cores", "windows-2022", "windows-2019", "ubuntu-latest", "ubuntu-latest-4-cores", "ubuntu-latest-8-cores", "ubuntu-latest-16-cores", "ubuntu-24.04", "ubuntu-22.04", "ubuntu-20.04", "macos-latest", "macos-latest-xl", "macos-latest-xlarge", "macos-latest-large", "macos-15-xlarge", "macos-15-large", "macos-15", "macos-14-xl", "macos-14-xlarge", "macos-14-large", "macos-14", "macos-13-xl", "macos-13-xlarge", "macos-13-large", "macos-13", "macos-12-xl", "macos-12-xlarge", "macos-12-large", "macos-12", "self-hosted", "x64", "arm", "arm64", "linux", "macos", "windows". if it is a custom label for self-hosted runner, set list of labels in actionlint.yaml config file

(runner-label)

🪛 YAMLlint (1.35.1)

[error] 176-176: no new line character at the end of file

(new-line-at-end-of-file)

58 changes: 56 additions & 2 deletions spark/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -90,13 +90,67 @@ scala_library(
name = "test_lib",
srcs = glob(["src/test/**/*.scala"]),
format = True,
visibility = ["//visibility:public"],
deps = test_deps,
)

scala_test_suite(
name = "tests",
srcs = glob(["src/test/**/*.scala"]),
tags = ["large"],
srcs = glob(["src/test/scala/ai/chronon/spark/test/*.scala",
"src/test/scala/ai/chronon/spark/test/udafs/*.scala",
"src/test/scala/ai/chronon/spark/test/stats/drift/*.scala",
"src/test/scala/ai/chronon/spark/test/bootstrap/*.scala"]),
data = glob(["spark/src/test/resources/**/*"]),
# defined in prelude_bazel file
jvm_flags = _JVM_FLAGS_FOR_ACCESSING_BASE_JAVA_CLASSES,
visibility = ["//visibility:public"],
deps = test_deps + [":test_lib"],
)

scala_test_suite(
name = "fetcher_test",
srcs = glob(["src/test/scala/ai/chronon/spark/test/fetcher/*.scala"]),
resources = ["//spark/src/test/resources:test-resources"],
# defined in prelude_bazel file
jvm_flags = _JVM_FLAGS_FOR_ACCESSING_BASE_JAVA_CLASSES,
visibility = ["//visibility:public"],
deps = test_deps + [":test_lib"],
)

scala_test_suite(
name = "groupby_test",
srcs = glob(["src/test/scala/ai/chronon/spark/test/groupby/*.scala"]),
data = glob(["spark/src/test/resources/**/*"]),
# defined in prelude_bazel file
jvm_flags = _JVM_FLAGS_FOR_ACCESSING_BASE_JAVA_CLASSES,
visibility = ["//visibility:public"],
deps = test_deps + [":test_lib"],
)

scala_test_suite(
name = "join_test",
srcs = glob(["src/test/scala/ai/chronon/spark/test/join/*.scala"]),
tags = ["large"],
data = glob(["spark/src/test/resources/**/*"]),
# defined in prelude_bazel file
jvm_flags = _JVM_FLAGS_FOR_ACCESSING_BASE_JAVA_CLASSES,
visibility = ["//visibility:public"],
deps = test_deps + [":test_lib"],
)

scala_test_suite(
name = "analyzer_test",
srcs = glob(["src/test/scala/ai/chronon/spark/test/analyzer/*.scala"]),
data = glob(["spark/src/test/resources/**/*"]),
# defined in prelude_bazel file
jvm_flags = _JVM_FLAGS_FOR_ACCESSING_BASE_JAVA_CLASSES,
visibility = ["//visibility:public"],
deps = test_deps + [":test_lib"],
)

scala_test_suite(
name = "streaming_test",
srcs = glob(["src/test/scala/ai/chronon/spark/test/streaming/*.scala"]),
data = glob(["spark/src/test/resources/**/*"]),
# defined in prelude_bazel file
jvm_flags = _JVM_FLAGS_FOR_ACCESSING_BASE_JAVA_CLASSES,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,25 +14,21 @@
* limitations under the License.
*/

package ai.chronon.spark.test
package ai.chronon.spark.test.analyzer

import ai.chronon.aggregator.test.Column
import ai.chronon.api
import ai.chronon.api._
import ai.chronon.spark.Analyzer
import ai.chronon.spark.Extensions._
import ai.chronon.spark.Join
import ai.chronon.spark.SparkSessionBuilder
import ai.chronon.spark.TableUtils
import ai.chronon.spark.{Analyzer, Join, SparkSessionBuilder, TableUtils}
import ai.chronon.spark.test.DataFrameGen
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.functions.lit
import org.apache.spark.sql.functions.{col, lit}
import org.junit.Assert.assertTrue
import org.scalatest.BeforeAndAfter
import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers.convertToAnyShouldWrapper
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import org.slf4j.{Logger, LoggerFactory}

class AnalyzerTest extends AnyFlatSpec with BeforeAndAfter {
@transient lazy val logger: Logger = LoggerFactory.getLogger(getClass)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* limitations under the License.
*/

package ai.chronon.spark.test.bootstrap
package ai.chronon.spark.test.analyzer

import ai.chronon.api.Builders.Derivation
import ai.chronon.api.Extensions._
Expand All @@ -24,17 +24,14 @@ import ai.chronon.online.Fetcher.Request
import ai.chronon.online.MetadataStore
import ai.chronon.spark.Extensions.DataframeOps
import ai.chronon.spark._
import ai.chronon.spark.test.OnlineUtils
import ai.chronon.spark.test.SchemaEvolutionUtils
import ai.chronon.spark.test.{OnlineUtils, SchemaEvolutionUtils}
import ai.chronon.spark.test.bootstrap.BootstrapUtils
import ai.chronon.spark.utils.MockApi
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.junit.Assert.assertEquals
import org.junit.Assert.assertFalse
import org.junit.Assert.assertTrue
import org.junit.Assert.{assertEquals, assertFalse, assertTrue}
import org.scalatest.flatspec.AnyFlatSpec
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import org.slf4j.{Logger, LoggerFactory}

import scala.concurrent.Await
import scala.concurrent.duration.Duration
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,31 +14,26 @@
* limitations under the License.
*/

package ai.chronon.spark.test
package ai.chronon.spark.test.fetcher

import ai.chronon.aggregator.windowing.TsUtils
import ai.chronon.api
import ai.chronon.api.Constants.MetadataDataset
import ai.chronon.api.Extensions.JoinOps
import ai.chronon.api.Extensions.MetadataOps
import ai.chronon.api.Extensions.{JoinOps, MetadataOps}
import ai.chronon.api.ScalaJavaConversions._
import ai.chronon.api._
import ai.chronon.online.Fetcher.Request
import ai.chronon.online.MetadataStore
import ai.chronon.online.SparkConversions
import ai.chronon.online.{MetadataStore, SparkConversions}
import ai.chronon.spark.Extensions._
import ai.chronon.spark.test.{OnlineUtils, TestUtils}
import ai.chronon.spark.utils.MockApi
import ai.chronon.spark.{Join => _, _}
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.Row
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.expressions.GenericRow
import org.apache.spark.sql.functions.lit
import org.junit.Assert.assertEquals
import org.junit.Assert.assertTrue
import org.apache.spark.sql.{DataFrame, Row, SparkSession}
import org.junit.Assert.{assertEquals, assertTrue}
import org.scalatest.flatspec.AnyFlatSpec
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import org.slf4j.{Logger, LoggerFactory}

import java.lang
import java.util.TimeZone
Expand Down
Loading
Loading