Skip to content

[airflow] -- add dependencies for airflow to customJson #648

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 12 commits into from
Apr 16, 2025

Conversation

varant-zlai
Copy link
Collaborator

@varant-zlai varant-zlai commented Apr 15, 2025

Summary

Setting airflow dependencies in customJson.

This is meant to be temporary, until we can ship orchestrator.

Checklist

  • Added Unit Tests
  • Covered by existing CI
  • Integration tested
  • Documentation update

Summary by CodeRabbit

  • New Features

    • Introduced a new utility for defining staging queries with explicit table dependencies and partition columns.
    • Added support for specifying engine type, scheduling, and advanced metadata when creating staging queries.
    • Added helper functions to automatically generate and set Airflow dependency metadata for Chronon objects.
  • Refactor

    • Updated sample staging query definitions to use the new dependency and metadata structure, improving clarity and consistency.
    • Replaced nested metadata objects with direct keyword arguments for easier configuration.
    • Integrated automatic setting of Airflow dependencies during configuration parsing.
  • Chores

    • Enhanced internal handling of Airflow dependencies for relevant objects, ensuring accurate dependency tracking.
    • Updated team configurations to include default partition column settings for improved environment consistency.

Copy link
Contributor

coderabbitai bot commented Apr 15, 2025

Walkthrough

This update introduces a new airflow_helpers.py module to automate the generation and attachment of Airflow dependency metadata for Chronon objects, specifically for Join and GroupBy types. The staging_query.py module is also added, providing a utility function and supporting classes for constructing StagingQuery objects with explicit dependency declarations. Sample staging query scripts are refactored to use the new API, replacing nested metadata with direct parameters and explicit dependency lists. The configuration parsing logic is updated to invoke the new Airflow dependency setter after metadata updates. Additionally, team configurations gain a new conf attribute for partition column settings.

Changes

File(s) Change Summary
api/python/ai/chronon/airflow_helpers.py New module: Helpers to generate and set Airflow dependency metadata for Join and GroupBy Chronon objects.
api/python/ai/chronon/cli/compile/parse_configs.py Imports airflow_helpers and calls set_airflow_deps(obj) after metadata updates in from_folder.
api/python/ai/chronon/staging_query.py New module: Defines StagingQuery function, EngineType class, and TableDependency dataclass for staging queries with explicit dependencies and metadata.
api/python/test/sample/staging_queries/kaggle/outbrain.py Refactored: Uses new StagingQuery API with explicit name, output_namespace, and dependencies list; removes nested MetaData.
api/python/test/sample/staging_queries/quickstart/checkouts_external.py Refactored: Renames variable; updates StagingQuery instantiation to use new API with direct parameters and explicit dependencies.
api/python/test/sample/staging_queries/sample_team/sample_staging_query.py Refactored: Updates import; replaces nested metadata with direct parameters; adds dependencies argument using TableDependency.
api/python/test/sample/teams.py Adds conf: ConfigProperties attribute to default and test teams specifying partition column settings.

Poem

New helpers arrive, dependencies in tow,
Airflow and Chronon now clearly in the flow.
Staging queries refactored, metadata made neat,
Table links explicit—no more tangled sheet!
With each new import, the structure’s more clear,
CodeRabbit’s helpers bring order and cheer.
🚀

Warning

Review ran into problems

🔥 Problems

GitHub Actions and Pipeline Checks: Resource not accessible by integration - https://docs.github.com/rest/actions/workflow-runs#list-workflow-runs-for-a-repository.

Please grant the required permissions to the CodeRabbit GitHub App under the organization or repository settings.


📜 Recent review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro (Legacy)

📥 Commits

Reviewing files that changed from the base of the PR and between b4582d9 and 06f80eb.

📒 Files selected for processing (2)
  • api/python/ai/chronon/airflow_helpers.py (1 hunks)
  • api/python/test/sample/teams.py (3 hunks)
🚧 Files skipped from review as they are similar to previous changes (2)
  • api/python/test/sample/teams.py
  • api/python/ai/chronon/airflow_helpers.py
⏰ Context from checks skipped due to timeout of 90000ms (4)
  • GitHub Check: non_spark_tests
  • GitHub Check: python_tests
  • GitHub Check: non_spark_tests
  • GitHub Check: enforce_triggered_workflows

🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Generate unit testing code for this file.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai generate unit testing code for this file.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and generate unit testing code.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

@varant-zlai varant-zlai requested a review from nikhil-zlai April 15, 2025 23:35
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 14

🧹 Nitpick comments (12)
api/python/test/sample/staging_queries/kaggle/outbrain.py (1)

16-17: Remove unused import.

MetaData import isn't used with the new staging_query API.

-from ai.chronon.api.ttypes import MetaData
 from ai.chronon.staging_query import staging_query, TableDependency
🧰 Tools
🪛 Ruff (0.8.2)

16-16: ai.chronon.api.ttypes.MetaData imported but unused

Remove unused import: ai.chronon.api.ttypes.MetaData

(F401)

api/python/test/sample/compiled/models/quickstart/test.v1 (1)

1-88:

❓ Verification inconclusive

This model config doesn't contain Airflow dependencies.

This configuration file lacks Airflow dependencies in customJson, unlike the other files in this PR. Consider adding dependencies if this model has upstream dependencies.


🏁 Script executed:

#!/bin/bash
# Check if other model files have customJson with airflowDependencies
grep -r '"customJson": "{\\"airflowDependencies\\":' api/python/test/sample/compiled/models/ --include="*v1"

Length of output: 110


Missing Airflow dependencies in customJson
The test model at api/python/test/sample/compiled/models/quickstart/test.v1 lacks a customJson section with Airflow dependencies. Unlike similar models in this PR, if this model has upstream dependencies, please verify manually and add the Airflow dependencies accordingly.

api/python/test/sample/compiled/joins/sample_team/sample_label_join.v1 (1)

9-9: Repeated dependencies.
You reference the same workflow thrice. Confirm if this duplication is intended or needed.

api/python/test/sample/compiled/joins/sample_team/sample_join.no_log_flattener (1)

9-9: Check duplicated dependencies.
You repeat the same dependency. Confirm it’s intentional.

api/python/ai/chronon/airflow_helpers.py (1)

5-5: Remove unused import

StagingQuery is imported but never used in the code.

-from ai.chronon.api.ttypes import Join, GroupBy, StagingQuery
+from ai.chronon.api.ttypes import Join, GroupBy
🧰 Tools
🪛 Ruff (0.8.2)

5-5: ai.chronon.api.ttypes.StagingQuery imported but unused

Remove unused import: ai.chronon.api.ttypes.StagingQuery

(F401)

api/python/test/sample/compiled/joins/sample_team/sample_join_with_derivations_on_external_parts.v1 (1)

9-9: Avoid duplicate dependencies.
Remove the repeated entry to keep the list concise.

-"{\"airflowDependencies\": [{\"name\": \"wf_sample_namespace_sample_table_group_by\", \"spec\": \"sample_namespace.sample_table_group_by/ds={{ ds }}\"}, {\"name\": \"wf_sample_namespace_sample_table_group_by\", \"spec\": \"sample_namespace.sample_table_group_by/ds={{ ds }}\"}, {\"name\": \"wf_sample_table_sample_entity_snapshot\", \"spec\": \"sample_table.sample_entity_snapshot/ds={{ ds }}\"}, {\"name\": \"wf_sample_table_sample_entity_mutations_hr_00_00\", \"spec\": \"sample_table.sample_entity_mutations/hr=00:00/ds={{ ds }}\"}]}"
+"{\"airflowDependencies\": [{\"name\": \"wf_sample_namespace_sample_table_group_by\", \"spec\": \"sample_namespace.sample_table_group_by/ds={{ ds }}\"}, {\"name\": \"wf_sample_table_sample_entity_snapshot\", \"spec\": \"sample_table.sample_entity_snapshot/ds={{ ds }}\"}, {\"name\": \"wf_sample_table_sample_entity_mutations_hr_00_00\", \"spec\": \"sample_table.sample_entity_mutations/hr=00:00/ds={{ ds }}\"}]}"
api/python/test/sample/compiled/joins/sample_team/sample_join_derivation.v1 (1)

9-9: Remove repeated dependency.
Duplicate reference clutters metadata.

-"{\"airflowDependencies\": [{\"name\": \"wf_sample_namespace_sample_table_group_by\", \"spec\": \"sample_namespace.sample_table_group_by/ds={{ ds }}\"}, {\"name\": \"wf_sample_namespace_sample_table_group_by\", \"spec\": \"sample_namespace.sample_table_group_by/ds={{ ds }}\"}, ..."
+"{\"airflowDependencies\": [{\"name\": \"wf_sample_namespace_sample_table_group_by\", \"spec\": \"sample_namespace.sample_table_group_by/ds={{ ds }}\"}, ..."
api/python/test/sample/compiled/group_bys/sample_team/sample_group_by_from_join_part.v2 (2)

12-12: Store JSON dependencies in a nested object


13-78: Repeated env configs can be centralized

api/python/test/sample/compiled/joins/sample_team/sample_online_join.v1 (1)

9-9: Duplicate dependencies in customJson

api/python/test/sample/compiled/joins/sample_team/sample_label_join_with_agg.v1 (1)

9-9: Flatten customJson if possible

api/python/ai/chronon/staging_query.py (1)

90-90: Consider more robust team name extraction.

Current approach fragile if file structure changes.

-    team = inspect.stack()[1].filename.split("/")[-2]
+    # Extract team from caller's filename or path
+    caller_filename = inspect.stack()[1].filename
+    path_parts = caller_filename.split("/")
+    # Default to 'chronon' if we can't determine team
+    team = path_parts[-2] if len(path_parts) >= 2 else "chronon"
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro (Legacy)

📥 Commits

Reviewing files that changed from the base of the PR and between 7ab3c3b and 8a4251f.

📒 Files selected for processing (57)
  • api/python/ai/chronon/airflow_helpers.py (1 hunks)
  • api/python/ai/chronon/cli/compile/parse_configs.py (2 hunks)
  • api/python/ai/chronon/query.py (2 hunks)
  • api/python/ai/chronon/staging_query.py (1 hunks)
  • api/python/test/sample/compiled/group_bys/kaggle/clicks.ad_streaming (1 hunks)
  • api/python/test/sample/compiled/group_bys/kaggle/outbrain.ad_doc (1 hunks)
  • api/python/test/sample/compiled/group_bys/kaggle/outbrain.ad_platform (1 hunks)
  • api/python/test/sample/compiled/group_bys/kaggle/outbrain.ad_uuid (1 hunks)
  • api/python/test/sample/compiled/group_bys/quickstart/purchases.v1 (1 hunks)
  • api/python/test/sample/compiled/group_bys/quickstart/returns.v1 (1 hunks)
  • api/python/test/sample/compiled/group_bys/quickstart/schema.v1 (1 hunks)
  • api/python/test/sample/compiled/group_bys/quickstart/users.v1 (1 hunks)
  • api/python/test/sample/compiled/group_bys/risk/merchant_data.merchant_group_by (1 hunks)
  • api/python/test/sample/compiled/group_bys/risk/user_data.user_group_by (1 hunks)
  • api/python/test/sample/compiled/group_bys/sample_team/entity_sample_group_by_from_module.v1 (1 hunks)
  • api/python/test/sample/compiled/group_bys/sample_team/event_sample_group_by.v1 (1 hunks)
  • api/python/test/sample/compiled/group_bys/sample_team/group_by_with_kwargs.v1 (1 hunks)
  • api/python/test/sample/compiled/group_bys/sample_team/label_part_group_by.label_part_group_by (1 hunks)
  • api/python/test/sample/compiled/group_bys/sample_team/label_part_group_by.label_part_group_by_2 (1 hunks)
  • api/python/test/sample/compiled/group_bys/sample_team/mutation_sample_group_by.v0 (1 hunks)
  • api/python/test/sample/compiled/group_bys/sample_team/sample_group_by.require_backfill (1 hunks)
  • api/python/test/sample/compiled/group_bys/sample_team/sample_group_by.v1 (1 hunks)
  • api/python/test/sample/compiled/group_bys/sample_team/sample_group_by_from_join_part.v2 (1 hunks)
  • api/python/test/sample/compiled/group_bys/sample_team/sample_group_by_from_module.v1 (1 hunks)
  • api/python/test/sample/compiled/group_bys/sample_team/sample_group_by_group_by.require_backfill (1 hunks)
  • api/python/test/sample/compiled/group_bys/sample_team/sample_group_by_group_by.v1 (1 hunks)
  • api/python/test/sample/compiled/group_bys/sample_team/sample_group_by_with_derivations.v1 (1 hunks)
  • api/python/test/sample/compiled/group_bys/sample_team/sample_non_prod_group_by.v1 (1 hunks)
  • api/python/test/sample/compiled/joins/kaggle/outbrain.training_set (1 hunks)
  • api/python/test/sample/compiled/joins/quickstart/training_set.v1 (1 hunks)
  • api/python/test/sample/compiled/joins/quickstart/training_set.v2 (1 hunks)
  • api/python/test/sample/compiled/joins/sample_team/sample_backfill_mutation_join.v0 (1 hunks)
  • api/python/test/sample/compiled/joins/sample_team/sample_chaining_join_parent.parent_join (1 hunks)
  • api/python/test/sample/compiled/joins/sample_team/sample_join.consistency_check (1 hunks)
  • api/python/test/sample/compiled/joins/sample_team/sample_join.group_by_of_group_by (1 hunks)
  • api/python/test/sample/compiled/joins/sample_team/sample_join.never (1 hunks)
  • api/python/test/sample/compiled/joins/sample_team/sample_join.no_log_flattener (1 hunks)
  • api/python/test/sample/compiled/joins/sample_team/sample_join.v1 (1 hunks)
  • api/python/test/sample/compiled/joins/sample_team/sample_join_bootstrap.v1 (1 hunks)
  • api/python/test/sample/compiled/joins/sample_team/sample_join_bootstrap.v2 (1 hunks)
  • api/python/test/sample/compiled/joins/sample_team/sample_join_derivation.v1 (1 hunks)
  • api/python/test/sample/compiled/joins/sample_team/sample_join_external_parts.v1 (1 hunks)
  • api/python/test/sample/compiled/joins/sample_team/sample_join_from_group_by_from_join.v1 (1 hunks)
  • api/python/test/sample/compiled/joins/sample_team/sample_join_from_module.v1 (1 hunks)
  • api/python/test/sample/compiled/joins/sample_team/sample_join_with_derivations_on_external_parts.v1 (1 hunks)
  • api/python/test/sample/compiled/joins/sample_team/sample_label_join.v1 (1 hunks)
  • api/python/test/sample/compiled/joins/sample_team/sample_label_join_with_agg.v1 (1 hunks)
  • api/python/test/sample/compiled/joins/sample_team/sample_online_join.v1 (1 hunks)
  • api/python/test/sample/compiled/models/quickstart/test.v1 (1 hunks)
  • api/python/test/sample/compiled/models/risk/transaction_model.v1 (1 hunks)
  • api/python/test/sample/compiled/staging_queries/kaggle/outbrain.base_table (1 hunks)
  • api/python/test/sample/compiled/staging_queries/quickstart/checkouts_external.checkouts_query (1 hunks)
  • api/python/test/sample/compiled/staging_queries/sample_team/sample_staging_query.v1 (1 hunks)
  • api/python/test/sample/staging_queries/kaggle/outbrain.py (2 hunks)
  • api/python/test/sample/staging_queries/quickstart/checkouts_external.py (2 hunks)
  • api/python/test/sample/staging_queries/sample_team/sample_staging_query.py (2 hunks)
  • pyrightconfig.json (1 hunks)
🧰 Additional context used
🧬 Code Graph Analysis (6)
api/python/test/sample/staging_queries/quickstart/checkouts_external.py (1)
api/python/ai/chronon/staging_query.py (2)
  • staging_query (23-124)
  • TableDependency (19-21)
api/python/ai/chronon/cli/compile/parse_configs.py (1)
api/python/ai/chronon/airflow_helpers.py (1)
  • set_airflow_deps (130-141)
api/python/test/sample/staging_queries/kaggle/outbrain.py (1)
api/python/ai/chronon/staging_query.py (2)
  • staging_query (23-124)
  • TableDependency (19-21)
api/python/ai/chronon/airflow_helpers.py (2)
api/python/ai/chronon/group_by.py (1)
  • GroupBy (429-673)
api/python/ai/chronon/utils.py (1)
  • get_query (135-136)
api/python/ai/chronon/staging_query.py (3)
api/python/ai/chronon/repo/utils.py (1)
  • custom_json (74-78)
api/python/ai/chronon/airflow_helpers.py (1)
  • create_airflow_dependency (11-27)
api/src/main/scala/ai/chronon/api/Builders.scala (1)
  • MetaData (261-315)
api/python/test/sample/staging_queries/sample_team/sample_staging_query.py (1)
api/python/ai/chronon/staging_query.py (2)
  • staging_query (23-124)
  • TableDependency (19-21)
🪛 Ruff (0.8.2)
api/python/test/sample/staging_queries/kaggle/outbrain.py

16-16: ai.chronon.api.ttypes.MetaData imported but unused

Remove unused import: ai.chronon.api.ttypes.MetaData

(F401)

api/python/ai/chronon/airflow_helpers.py

5-5: ai.chronon.api.ttypes.StagingQuery imported but unused

Remove unused import: ai.chronon.api.ttypes.StagingQuery

(F401)

api/python/ai/chronon/staging_query.py

9-9: ai.chronon.repo.utils.custom_json imported but unused

Remove unused import: ai.chronon.repo.utils.custom_json

(F401)


101-101: Redefinition of unused custom_json from line 9

(F811)

⏰ Context from checks skipped due to timeout of 90000ms (2)
  • GitHub Check: non_spark_tests
  • GitHub Check: non_spark_tests
🔇 Additional comments (74)
pyrightconfig.json (1)

1-9: Well-configured Pyright setup.

Basic config file for Python type checking that targets the api/python directory.

api/python/ai/chronon/cli/compile/parse_configs.py (2)

11-11: Good import.

Clearly importing the required airflow_helpers module.


33-34: Integration point for Airflow dependencies.

Smart placing of dependency setup after metadata updates. The comment provides important context.

api/python/test/sample/staging_queries/quickstart/checkouts_external.py (2)

15-15: Updated imports for new API.

Switched to using the newer staging_query API.


33-42: Good use of new staging_query API.

Updated to use the new function-based API with explicit dependencies declaration. Dependencies are correctly specified with partition columns.

api/python/test/sample/staging_queries/kaggle/outbrain.py (1)

19-44: Well-structured query with explicit dependencies.

Good use of the new staging_query API with proper naming and dependency specification.

api/python/test/sample/compiled/joins/sample_team/sample_join.group_by_of_group_by (1)

9-9: Airflow dependencies properly added.

Dependencies correctly defined for staging query and group-by backfill.

api/python/test/sample/compiled/group_bys/sample_team/label_part_group_by.label_part_group_by_2 (1)

8-8: Airflow dependency added correctly.

Single dependency for entity snapshot properly configured.

api/python/test/sample/compiled/group_bys/sample_team/sample_group_by_group_by.v1 (1)

12-12: Dependency correctly configured.

Airflow dependency for the backfill workflow properly set.

api/python/test/sample/compiled/group_bys/risk/merchant_data.merchant_group_by (3)

7-7: Airflow dependency correctly configured.

The airflow dependency is properly set up for the merchant data workflow.


10-73: LGTM: Environment configurations look appropriate.

All required environment variables are defined for each execution mode.


75-95: Query structure is well-formed.

The source configuration and key columns are properly defined.

api/python/ai/chronon/query.py (1)

29-29: Good addition of partition_column parameter.

The addition of the partition_column parameter to the Query function enhances flexibility for Airflow dependency management.

Also applies to: 85-85

api/python/test/sample/compiled/joins/sample_team/sample_join_external_parts.v1 (1)

146-236: External parts configuration looks good.

Both onlineExternalParts are properly defined with appropriate schema details.

api/python/test/sample/compiled/group_bys/kaggle/outbrain.ad_platform (2)

7-7: Airflow dependency correctly configured.

The dependency for the Kaggle Outbrain base table is properly specified.


96-130: Comprehensive aggregation setup.

All required aggregations for the clicked column are defined with appropriate 3-day windows.

api/python/test/sample/compiled/joins/sample_team/sample_label_join.v1 (2)

129-147: Validate aggregator ops.
Are operations 7 and 12 correct? Make sure they match your aggregator definitions.


229-269: Looks consistent.
Label parts appear fine. No issues noted here.

api/python/test/sample/compiled/joins/sample_team/sample_join.no_log_flattener (1)

130-139: Verify aggregator codes.
Double-check operation=7 is valid. Ensure it matches your aggregator logic.

api/python/test/sample/staging_queries/sample_team/sample_staging_query.py (1)

39-39: Partition column check.
Confirm ds is correct for your schema.

api/python/test/sample/compiled/group_bys/sample_team/sample_group_by_from_module.v1 (1)

1-125: Config looks good.

All metadata, dependencies, and environment settings are clear.

api/python/test/sample/compiled/group_bys/sample_team/entity_sample_group_by_from_module.v1 (1)

1-114: Looks good.

Config is well-structured and dependencies are clear.

api/python/test/sample/compiled/staging_queries/quickstart/checkouts_external.checkouts_query (1)

1-76: All good.

Staging query config is clear and dependencies are explicit.

api/python/test/sample/compiled/joins/sample_team/sample_join_from_group_by_from_join.v1 (1)

1-141: Join config looks good.

All metadata and dependencies are present.

api/python/test/sample/compiled/staging_queries/kaggle/outbrain.base_table (1)

1-75: Config is clear.

All fields and dependencies are well-defined.

api/python/test/sample/compiled/group_bys/kaggle/outbrain.ad_uuid (1)

1-130: Validate Airflow reference.
Confirm that the pipeline name wf_kaggle_outbrain_base_table is valid in Airflow.

api/python/test/sample/compiled/group_bys/sample_team/sample_non_prod_group_by.v1 (1)

1-113: Check Airflow DAG reference.
Ensure that wf_sample_namespace_sample_table_group_by is properly defined in Airflow.

api/python/test/sample/compiled/group_bys/sample_team/sample_group_by.v1 (1)

1-113: Confirm staging query dependency.
Verify that wf_sample_namespace_sample_team_sample_staging_query_v1 aligns with your Airflow DAG naming.

api/python/test/sample/compiled/joins/sample_team/sample_join_bootstrap.v2 (1)

1-298: Check repeated dependencies.
wf_sample_namespace_sample_table_group_by is referenced multiple times in customJson. Confirm this is intentional.

api/python/test/sample/compiled/joins/sample_team/sample_backfill_mutation_join.v0 (3)

1-11: Metadata
Looks fine.


12-75: Environment config
No concerns.


76-134: Join setup
Implementation looks correct.

api/python/test/sample/compiled/group_bys/quickstart/returns.v1 (3)

1-9: Metadata
Valid fields and Airflow dependencies.


10-74: Env and scheduling
No issues.


75-160: Source and aggregations
Configuration is consistent.

api/python/test/sample/compiled/group_bys/kaggle/clicks.ad_streaming (3)

1-8: Metadata
Everything looks right.


9-73: Environment/Schedule
No problematic settings.


75-129: Source/Aggregations
Seems coherent.

api/python/test/sample/compiled/group_bys/sample_team/sample_group_by_with_derivations.v1 (3)

12-12: Airflow dependency added as expected

The Airflow dependency for the staging query has been correctly added to the customJson field.


102-113: Aggregation operations look good

Both aggregations use operation code 7 (sum) on the relevant columns.


114-119: Derivation properly implemented

The derivation renames impressed_unique_count_sum to impressed_unique_count_1d_new_name as expected.

api/python/test/sample/compiled/group_bys/kaggle/outbrain.ad_doc (2)

7-7: Airflow dependency correctly configured

The dependency on wf_kaggle_outbrain_base_table is properly added to the customJson.


94-127: Comprehensive aggregation setup

Three different aggregation operations (sum, count, avg) with consistent 3-day windows.

api/python/test/sample/compiled/joins/quickstart/training_set.v2 (2)

9-9: Multiple Airflow dependencies correctly configured

All three required dependencies (checkouts, purchases, returns) are properly specified.


178-194: Advanced quantile operation properly configured

The bucketed quantile configuration is correct with bucket_rand.

api/python/test/sample/compiled/joins/sample_team/sample_join.v1 (1)

132-143: Aggregation operations look correct

Both aggregations use sum operation on the appropriate columns.

api/python/test/sample/compiled/group_bys/sample_team/label_part_group_by.label_part_group_by (1)

8-8: Correct airflow dependencies

Dependencies match the entity sources defined in this group-by configuration.

api/python/test/sample/compiled/group_bys/quickstart/users.v1 (1)

8-8: Correct airflow dependency

The dependency matches the snapshot table source.

api/python/test/sample/compiled/group_bys/sample_team/sample_group_by_group_by.require_backfill (1)

73-73: Potential conflict
"historicalBackfill" is 0 but "backfillStartDate" is set.

Would you like to confirm if a backfill is needed?

Also applies to: 108-108

api/python/test/sample/compiled/group_bys/sample_team/group_by_with_kwargs.v1 (1)

82-83: Check usage
Using “old_version” fields might be unintentional.

api/python/test/sample/compiled/joins/sample_team/sample_join_bootstrap.v1 (1)

9-9: Check duplicates
Same table repeated twice in airflowDependencies.

api/python/test/sample/compiled/joins/sample_team/sample_join_with_derivations_on_external_parts.v1 (1)

1-325: Looks good.
No issues beyond the duplication.

api/python/test/sample/compiled/joins/sample_team/sample_join_derivation.v1 (1)

1-219: All else is fine.
No further concerns.

api/python/test/sample/compiled/group_bys/quickstart/schema.v1 (1)

1-104: Configuration looks solid.
No concerns found.

api/python/test/sample/compiled/group_bys/sample_team/mutation_sample_group_by.v0 (1)

1-103: Config structure looks good.

No issues found in this group-by config. Airflow dependencies and envs are clear.

api/python/test/sample/compiled/joins/sample_team/sample_join.consistency_check (1)

1-146: Join config is clear and well-structured.

Airflow dependencies, envs, and join parts are all set up as expected.

api/python/test/sample/compiled/group_bys/risk/user_data.user_group_by (1)

1-97: No issues in this group-by config.

Airflow dependencies and environment variables are well defined.

api/python/test/sample/compiled/group_bys/quickstart/purchases.v1 (1)

1-170: Config is solid and clear.

All aggregations, envs, and dependencies are well specified.

api/python/test/sample/compiled/joins/sample_team/sample_join.never (1)

1-144: Join config is correct.

Airflow dependencies, envs, and schedule are all as expected.

api/python/test/sample/compiled/group_bys/sample_team/sample_group_by_from_join_part.v2 (2)

1-11: Metadata struct looks fine


79-110: Key columns and aggregations look okay

api/python/test/sample/compiled/joins/sample_team/sample_online_join.v1 (3)

1-9: Metadata is coherent


10-74: Env configs appear correct


79-286: Join parts and mappings look valid

api/python/test/sample/compiled/joins/sample_team/sample_label_join_with_agg.v1 (5)

1-9: Props set properly


10-75: Environment partitioning is fine


79-90: Left source config is consistent


91-228: Joins and groupBy ops look correct


229-285: Label parts are well-defined

api/python/test/sample/compiled/joins/sample_team/sample_join_from_module.v1 (1)

9-9: Airflow dependencies correctly match source tables.

The customJson field properly sets Airflow dependencies for all data sources used in this join configuration.

api/python/test/sample/compiled/group_bys/sample_team/event_sample_group_by.v1 (1)

8-8: Airflow dependency correctly configured.

The customJson properly sets the dependency for sample_namespace.sample_table_group_by.

api/python/test/sample/compiled/joins/quickstart/training_set.v1 (1)

9-9: LGTM: Correctly configured Airflow dependencies.

Dependencies properly set for upstream workflows. Matches pipeline data sources.

api/python/ai/chronon/staging_query.py (1)

13-122: Well-structured function with comprehensive documentation.

Function design cleanly manages dependencies and metadata creation.

🧰 Tools
🪛 Ruff (0.8.2)

101-101: Redefinition of unused custom_json from line 9

(F811)

api/python/test/sample/compiled/group_bys/sample_team/sample_group_by.require_backfill (1)

8-8: LGTM: Correctly configured Airflow dependency.

Dependency properly references source staging query table.

"online": 0,
"production": 0,
"sourceFile": "joins/sample_team/sample_join_external_parts.py",
"customJson": "{\"airflowDependencies\": [{\"name\": \"wf_sample_namespace_sample_team_sample_staging_query_v1\", \"spec\": \"sample_namespace.sample_team_sample_staging_query_v1/ds={{ ds }}\"}, {\"name\": \"wf_sample_namespace_sample_team_sample_staging_query_v1\", \"spec\": \"sample_namespace.sample_team_sample_staging_query_v1/ds={{ ds }}\"}]}",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Remove duplicate Airflow dependency.

The same workflow dependency appears twice in the configuration.

-    "customJson": "{\"airflowDependencies\": [{\"name\": \"wf_sample_namespace_sample_team_sample_staging_query_v1\", \"spec\": \"sample_namespace.sample_team_sample_staging_query_v1/ds={{ ds }}\"}, {\"name\": \"wf_sample_namespace_sample_team_sample_staging_query_v1\", \"spec\": \"sample_namespace.sample_team_sample_staging_query_v1/ds={{ ds }}\"}]}",
+    "customJson": "{\"airflowDependencies\": [{\"name\": \"wf_sample_namespace_sample_team_sample_staging_query_v1\", \"spec\": \"sample_namespace.sample_team_sample_staging_query_v1/ds={{ ds }}\"}]}",
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
"customJson": "{\"airflowDependencies\": [{\"name\": \"wf_sample_namespace_sample_team_sample_staging_query_v1\", \"spec\": \"sample_namespace.sample_team_sample_staging_query_v1/ds={{ ds }}\"}, {\"name\": \"wf_sample_namespace_sample_team_sample_staging_query_v1\", \"spec\": \"sample_namespace.sample_team_sample_staging_query_v1/ds={{ ds }}\"}]}",
"customJson": "{\"airflowDependencies\": [{\"name\": \"wf_sample_namespace_sample_team_sample_staging_query_v1\", \"spec\": \"sample_namespace.sample_team_sample_staging_query_v1/ds={{ ds }}\"}]}",

)
name="sample_staging_query",
output_namespace="sample_namespace",
table_properties={"sample_config_json": """{"sample_key": "sample value}"""},
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Fix JSON syntax.
Missing quote in sample_config_json. Likely a parsing error.

"online": 1,
"production": 0,
"sourceFile": "joins/sample_team/sample_join.py",
"customJson": "{\"airflowDependencies\": [{\"name\": \"wf_sample_namespace_sample_team_sample_staging_query_v1\", \"spec\": \"sample_namespace.sample_team_sample_staging_query_v1/ds={{ ds }}\"}, {\"name\": \"wf_sample_namespace_sample_team_sample_staging_query_v1\", \"spec\": \"sample_namespace.sample_team_sample_staging_query_v1/ds={{ ds }}\"}]}",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Duplicate dependency detected

The same Airflow dependency appears twice in the configuration.

-    "customJson": "{\"airflowDependencies\": [{\"name\": \"wf_sample_namespace_sample_team_sample_staging_query_v1\", \"spec\": \"sample_namespace.sample_team_sample_staging_query_v1/ds={{ ds }}\"}, {\"name\": \"wf_sample_namespace_sample_team_sample_staging_query_v1\", \"spec\": \"sample_namespace.sample_team_sample_staging_query_v1/ds={{ ds }}\"}]}",
+    "customJson": "{\"airflowDependencies\": [{\"name\": \"wf_sample_namespace_sample_team_sample_staging_query_v1\", \"spec\": \"sample_namespace.sample_team_sample_staging_query_v1/ds={{ ds }}\"}]}",
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
"customJson": "{\"airflowDependencies\": [{\"name\": \"wf_sample_namespace_sample_team_sample_staging_query_v1\", \"spec\": \"sample_namespace.sample_team_sample_staging_query_v1/ds={{ ds }}\"}, {\"name\": \"wf_sample_namespace_sample_team_sample_staging_query_v1\", \"spec\": \"sample_namespace.sample_team_sample_staging_query_v1/ds={{ ds }}\"}]}",
"customJson": "{\"airflowDependencies\": [{\"name\": \"wf_sample_namespace_sample_team_sample_staging_query_v1\", \"spec\": \"sample_namespace.sample_team_sample_staging_query_v1/ds={{ ds }}\"}]}",

"online": 1,
"production": 0,
"sourceFile": "joins/sample_team/sample_chaining_join_parent.py",
"customJson": "{\"airflowDependencies\": [{\"name\": \"wf_sample_namespace_sample_table_group_by\", \"spec\": \"sample_namespace.sample_table_group_by/ds={{ ds }}\"}, {\"name\": \"wf_sample_namespace_sample_table_group_by\", \"spec\": \"sample_namespace.sample_table_group_by/ds={{ ds }}\"}, {\"name\": \"wf_sample_table_sample_entity_snapshot\", \"spec\": \"sample_table.sample_entity_snapshot/ds={{ ds }}\"}, {\"name\": \"wf_sample_table_sample_entity_mutations_hr_00_00\", \"spec\": \"sample_table.sample_entity_mutations/hr=00:00/ds={{ ds }}\"}]}",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Duplicate dependency detected

The airflowDependencies array contains duplicate entries for wf_sample_namespace_sample_table_group_by. This may cause redundant Airflow task dependencies.

-  "customJson": "{\"airflowDependencies\": [{\"name\": \"wf_sample_namespace_sample_table_group_by\", \"spec\": \"sample_namespace.sample_table_group_by/ds={{ ds }}\"}, {\"name\": \"wf_sample_namespace_sample_table_group_by\", \"spec\": \"sample_namespace.sample_table_group_by/ds={{ ds }}\"}, {\"name\": \"wf_sample_table_sample_entity_snapshot\", \"spec\": \"sample_table.sample_entity_snapshot/ds={{ ds }}\"}, {\"name\": \"wf_sample_table_sample_entity_mutations_hr_00_00\", \"spec\": \"sample_table.sample_entity_mutations/hr=00:00/ds={{ ds }}\"}]}",
+  "customJson": "{\"airflowDependencies\": [{\"name\": \"wf_sample_namespace_sample_table_group_by\", \"spec\": \"sample_namespace.sample_table_group_by/ds={{ ds }}\"}, {\"name\": \"wf_sample_table_sample_entity_snapshot\", \"spec\": \"sample_table.sample_entity_snapshot/ds={{ ds }}\"}, {\"name\": \"wf_sample_table_sample_entity_mutations_hr_00_00\", \"spec\": \"sample_table.sample_entity_mutations/hr=00:00/ds={{ ds }}\"}]}",
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
"customJson": "{\"airflowDependencies\": [{\"name\": \"wf_sample_namespace_sample_table_group_by\", \"spec\": \"sample_namespace.sample_table_group_by/ds={{ ds }}\"}, {\"name\": \"wf_sample_namespace_sample_table_group_by\", \"spec\": \"sample_namespace.sample_table_group_by/ds={{ ds }}\"}, {\"name\": \"wf_sample_table_sample_entity_snapshot\", \"spec\": \"sample_table.sample_entity_snapshot/ds={{ ds }}\"}, {\"name\": \"wf_sample_table_sample_entity_mutations_hr_00_00\", \"spec\": \"sample_table.sample_entity_mutations/hr=00:00/ds={{ ds }}\"}]}",
"customJson": "{\"airflowDependencies\": [{\"name\": \"wf_sample_namespace_sample_table_group_by\", \"spec\": \"sample_namespace.sample_table_group_by/ds={{ ds }}\"}, {\"name\": \"wf_sample_table_sample_entity_snapshot\", \"spec\": \"sample_table.sample_entity_snapshot/ds={{ ds }}\"}, {\"name\": \"wf_sample_table_sample_entity_mutations_hr_00_00\", \"spec\": \"sample_table.sample_entity_mutations/hr=00:00/ds={{ ds }}\"}]}",

Comment on lines 35 to 67
def _get_airflow_deps_from_source(source, partition_column=None):
"""
Given a source, return a list of Airflow dependencies.

Args:
source: The source object (events, entities, or joinSource)
partition_column: The partition column to use

Returns:
A list of Airflow dependency objects
"""
tables = []
# Assumes source has already been normalized
if source.events:
tables = [source.events.table]
# Use partition column from query if available, otherwise use the provided one
source_partition_column = _get_partition_col_from_query(source.events.query) or partition_column
elif source.entities:
# Given the setup of Query, we currently mandate the same partition column for snapshot and mutations tables
tables = [source.entities.snapshotTable]
if source.entities.mutationTable:
tables.append(source.entities.mutationTable)
source_partition_column = _get_partition_col_from_query(source.entities.query) or partition_column
elif source.joinSource:
namespace = source.joinSource.join.metaData.outputNamespace
table = _sanitize_name(source.joinSource.join.metaData.name)
tables = [f"{namespace}.{table}"]
source_partition_column = _get_partition_col_from_query(source.joinSource.query) or partition_column
else:
# Unknown source type
return []

return [create_airflow_dependency(table, source_partition_column) for table in tables]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Add deduplication for dependencies

Function returns duplicate dependencies when the same table appears multiple times.

Add deduplication:

def _get_airflow_deps_from_source(source, partition_column=None):
    """
    Given a source, return a list of Airflow dependencies.
    
    Args:
        source: The source object (events, entities, or joinSource)
        partition_column: The partition column to use
        
    Returns:
        A list of Airflow dependency objects
    """
    tables = []
    # Assumes source has already been normalized
    if source.events:
        tables = [source.events.table]
        # Use partition column from query if available, otherwise use the provided one
        source_partition_column = _get_partition_col_from_query(source.events.query) or partition_column
    elif source.entities:
        # Given the setup of Query, we currently mandate the same partition column for snapshot and mutations tables
        tables = [source.entities.snapshotTable]
        if source.entities.mutationTable:
            tables.append(source.entities.mutationTable)
        source_partition_column = _get_partition_col_from_query(source.entities.query) or partition_column
    elif source.joinSource:
        namespace = source.joinSource.join.metaData.outputNamespace
        table = _sanitize_name(source.joinSource.join.metaData.name)
        tables = [f"{namespace}.{table}"]
        source_partition_column = _get_partition_col_from_query(source.joinSource.query) or partition_column
    else:
        # Unknown source type
        return []

-    return [create_airflow_dependency(table, source_partition_column) for table in tables]
+    return [create_airflow_dependency(table, source_partition_column) for table in tables]

Committable suggestion skipped: line range outside the PR's diff.

"team": "sample_team",
"outputNamespace": "sample_namespace",
"tableProperties": {
"sample_config_json": "{\"sample_key\": \"sample value}"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Fix broken JSON
Missing closing quote.

Apply this patch:

-      "sample_config_json": "{\"sample_key\": \"sample value}"
+      "sample_config_json": "{\"sample_key\": \"sample value\"}"
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
"sample_config_json": "{\"sample_key\": \"sample value}"
"sample_config_json": "{\"sample_key\": \"sample value\"}"

"online": 0,
"production": 0,
"sourceFile": "joins/kaggle/outbrain.py",
"customJson": "{\"airflowDependencies\": [{\"name\": \"wf_kaggle_outbrain_base_table\", \"spec\": \"kaggle_outbrain_base_table/ds={{ ds }}\"}, {\"name\": \"wf_kaggle_outbrain_base_table\", \"spec\": \"kaggle_outbrain_base_table/ds={{ ds }}\"}, {\"name\": \"wf_kaggle_outbrain_base_table\", \"spec\": \"kaggle_outbrain_base_table/ds={{ ds }}\"}, {\"name\": \"wf_kaggle_outbrain_base_table\", \"spec\": \"kaggle_outbrain_base_table/ds={{ ds }}\"}, {\"name\": \"wf_kaggle_outbrain_base_table\", \"spec\": \"kaggle_outbrain_base_table/ds={{ ds }}\"}]}",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Remove duplicate Airflow dependencies.

The customJson contains five identical dependencies for the same table.

-"customJson": "{\"airflowDependencies\": [{\"name\": \"wf_kaggle_outbrain_base_table\", \"spec\": \"kaggle_outbrain_base_table/ds={{ ds }}\"}, {\"name\": \"wf_kaggle_outbrain_base_table\", \"spec\": \"kaggle_outbrain_base_table/ds={{ ds }}\"}, {\"name\": \"wf_kaggle_outbrain_base_table\", \"spec\": \"kaggle_outbrain_base_table/ds={{ ds }}\"}, {\"name\": \"wf_kaggle_outbrain_base_table\", \"spec\": \"kaggle_outbrain_base_table/ds={{ ds }}\"}, {\"name\": \"wf_kaggle_outbrain_base_table\", \"spec\": \"kaggle_outbrain_base_table/ds={{ ds }}\"}]}",
+"customJson": "{\"airflowDependencies\": [{\"name\": \"wf_kaggle_outbrain_base_table\", \"spec\": \"kaggle_outbrain_base_table/ds={{ ds }}\"}]}",
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
"customJson": "{\"airflowDependencies\": [{\"name\": \"wf_kaggle_outbrain_base_table\", \"spec\": \"kaggle_outbrain_base_table/ds={{ ds }}\"}, {\"name\": \"wf_kaggle_outbrain_base_table\", \"spec\": \"kaggle_outbrain_base_table/ds={{ ds }}\"}, {\"name\": \"wf_kaggle_outbrain_base_table\", \"spec\": \"kaggle_outbrain_base_table/ds={{ ds }}\"}, {\"name\": \"wf_kaggle_outbrain_base_table\", \"spec\": \"kaggle_outbrain_base_table/ds={{ ds }}\"}, {\"name\": \"wf_kaggle_outbrain_base_table\", \"spec\": \"kaggle_outbrain_base_table/ds={{ ds }}\"}]}",
"customJson": "{\"airflowDependencies\": [{\"name\": \"wf_kaggle_outbrain_base_table\", \"spec\": \"kaggle_outbrain_base_table/ds={{ ds }}\"}]}",

"name": "risk.transaction_model.v1",
"team": "risk",
"outputNamespace": "risk",
"sourceFile": "models/risk/transaction_model.py",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Missing Airflow dependencies.

This model lacks customJson with airflowDependencies for its data sources (data.users, data.txn_events, data.merchants).

Add after line 6:

+"customJson": "{\"airflowDependencies\": [{\"name\": \"wf_data_users\", \"spec\": \"data.users/ds={{ ds }}\"}, {\"name\": \"wf_data_txn_events\", \"spec\": \"data.txn_events/ds={{ ds }}\"}, {\"name\": \"wf_data_merchants\", \"spec\": \"data.merchants/ds={{ ds }}\"}]}",
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
"sourceFile": "models/risk/transaction_model.py",
"sourceFile": "models/risk/transaction_model.py",
"customJson": "{\"airflowDependencies\": [{\"name\": \"wf_data_users\", \"spec\": \"data.users/ds={{ ds }}\"}, {\"name\": \"wf_data_txn_events\", \"spec\": \"data.txn_events/ds={{ ds }}\"}, {\"name\": \"wf_data_merchants\", \"spec\": \"data.merchants/ds={{ ds }}\"}]}",

Comment on lines +100 to +102
airflow_dependencies = [airflow_helpers.create_airflow_dependency(t.table, t.partition_column) for t in dependencies] if dependencies else []
custom_json = json.dumps({"airflow_dependencies": airflow_dependencies})

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Rename variable to avoid shadowing imported module.

Variable shadows imported function.

-    airflow_dependencies = [airflow_helpers.create_airflow_dependency(t.table, t.partition_column) for t in dependencies] if dependencies else []
-    custom_json = json.dumps({"airflow_dependencies": airflow_dependencies})
+    airflow_dependencies = [airflow_helpers.create_airflow_dependency(t.table, t.partition_column) for t in dependencies] if dependencies else []
+    custom_json_str = json.dumps({"airflow_dependencies": airflow_dependencies})

Also update line 110:

-        customJson=custom_json,
+        customJson=custom_json_str,
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
airflow_dependencies = [airflow_helpers.create_airflow_dependency(t.table, t.partition_column) for t in dependencies] if dependencies else []
custom_json = json.dumps({"airflow_dependencies": airflow_dependencies})
airflow_dependencies = [airflow_helpers.create_airflow_dependency(t.table, t.partition_column) for t in dependencies] if dependencies else []
custom_json_str = json.dumps({"airflow_dependencies": airflow_dependencies})
...
customJson=custom_json_str,
🧰 Tools
🪛 Ruff (0.8.2)

101-101: Redefinition of unused custom_json from line 9

(F811)

@@ -0,0 +1,9 @@
{
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this needed?

Copy link
Contributor

@nikhil-zlai nikhil-zlai left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

one comment, but lgtm otherwise

table: str
partition_column: Optional[str] = None

def staging_query(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lets call this StagingQuery. from user pov this is not really a method - more of an object. also that is more consistent with our existing naming.

from ai.chronon.api.ttypes import GroupBy, Join


def _sanitize_name(name):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

def sanitize(name):
"""
From api.Extensions.scala
Option(name).map(_.replaceAll("[^a-zA-Z0-9_]", "_")).orNull
"""
if name is not None:
return re.sub("[^a-zA-Z0-9_]", "_", name)
return None



def extract_default_partition_column(obj):
return obj.metaData.executionInfo.env.common.get("partitionColumn")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

need to grab this from spark.chronon.partition.column

A dictionary with name and spec for the Airflow dependency
"""
# Default partition column to 'ds' if not specified
partition_col = partition_column or 'ds'
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

don't default to ds we'll need to default to it in teams.py

Co-authored-by: Thomas Chow <[email protected]>
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (1)
api/python/test/sample/teams.py (1)

77-97: Other teams lack partition column config.

Consider adding conf attribute with partition column config to remaining teams for consistency.

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro (Legacy)

📥 Commits

Reviewing files that changed from the base of the PR and between 6e96ab2 and b4582d9.

📒 Files selected for processing (1)
  • api/python/test/sample/teams.py (2 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (2)
  • GitHub Check: non_spark_tests
  • GitHub Check: non_spark_tests
🔇 Additional comments (4)
api/python/test/sample/teams.py (4)

3-3: Import addition looks correct.

Added ConfigProperties import needed for new team configurations.


9-13: Partition column config for default team.

Added Spark partition column configuration properly structured.


51-55: Partition column config for test team.

Added test team configuration with different partition column value.


19-20:

❓ Verification inconclusive

Verify partition column configuration consistency.

Both "PARTITION_COLUMN": "ds" in EnvironmentVariables and "spark.chronon.partition.column": "_DATE" exist. Confirm these serve different purposes.

Also applies to: 9-13


🏁 Script executed:

#!/bin/bash
# Find usages of both partition column configurations
echo "Searching for PARTITION_COLUMN usage:"
rg "PARTITION_COLUMN" -A 1 -B 1 --no-filename-only --no-heading

echo -e "\nSearching for spark.chronon.partition.column usage:"
rg "spark\.chronon\.partition\.column" -A 1 -B 1 --no-filename-only --no-heading

echo -e "\nSearching for Airflow dependency related files:"
rg -l "airflow.*dependencies" --no-heading

Length of output: 1492


Action: Verify partition configuration usage.

  • In file api/python/test/sample/teams.py (lines 19‑20), "PARTITION_COLUMN": "ds" is set, while elsewhere (e.g. in files like api/python/ai/chronon/staging_query.py) "spark.chronon.partition.column": "_DATE" is referenced.
  • Please confirm these parameters are intentionally distinct in purpose.
  • Also, check that the corresponding configurations at lines 9‑13 (and related sections) align with this separation.
  • Note: The original search script’s flag --no-filename-only should be updated to --no-filename to avoid errors during repository searches.



# Wrapper for EngineType
class EngineType:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually we can kill this for now, not using.

@varant-zlai varant-zlai merged commit 263a029 into main Apr 16, 2025
7 checks passed
@varant-zlai varant-zlai deleted the vz--add_dependencies_for_airflow branch April 16, 2025 02:07
kumar-zlai pushed a commit that referenced this pull request Apr 25, 2025
## Summary

Setting airflow dependencies in customJson.

This is meant to be temporary, until we can ship orchestrator.

## Checklist
- [ ] Added Unit Tests
- [ ] Covered by existing CI
- [ ] Integration tested
- [ ] Documentation update



<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
## Summary by CodeRabbit

- **New Features**
- Introduced a new utility for defining staging queries with explicit
table dependencies and partition columns.
- Added support for specifying engine type, scheduling, and advanced
metadata when creating staging queries.
- Added helper functions to automatically generate and set Airflow
dependency metadata for Chronon objects.

- **Refactor**
- Updated sample staging query definitions to use the new dependency and
metadata structure, improving clarity and consistency.
- Replaced nested metadata objects with direct keyword arguments for
easier configuration.
- Integrated automatic setting of Airflow dependencies during
configuration parsing.

- **Chores**
- Enhanced internal handling of Airflow dependencies for relevant
objects, ensuring accurate dependency tracking.
- Updated team configurations to include default partition column
settings for improved environment consistency.
<!-- end of auto-generated comment: release notes by coderabbit.ai -->

---------

Co-authored-by: ezvz <[email protected]>
Co-authored-by: tchow-zlai <[email protected]>
Co-authored-by: Thomas Chow <[email protected]>
kumar-zlai pushed a commit that referenced this pull request Apr 29, 2025
## Summary

Setting airflow dependencies in customJson.

This is meant to be temporary, until we can ship orchestrator.

## Checklist
- [ ] Added Unit Tests
- [ ] Covered by existing CI
- [ ] Integration tested
- [ ] Documentation update



<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
## Summary by CodeRabbit

- **New Features**
- Introduced a new utility for defining staging queries with explicit
table dependencies and partition columns.
- Added support for specifying engine type, scheduling, and advanced
metadata when creating staging queries.
- Added helper functions to automatically generate and set Airflow
dependency metadata for Chronon objects.

- **Refactor**
- Updated sample staging query definitions to use the new dependency and
metadata structure, improving clarity and consistency.
- Replaced nested metadata objects with direct keyword arguments for
easier configuration.
- Integrated automatic setting of Airflow dependencies during
configuration parsing.

- **Chores**
- Enhanced internal handling of Airflow dependencies for relevant
objects, ensuring accurate dependency tracking.
- Updated team configurations to include default partition column
settings for improved environment consistency.
<!-- end of auto-generated comment: release notes by coderabbit.ai -->

---------

Co-authored-by: ezvz <[email protected]>
Co-authored-by: tchow-zlai <[email protected]>
Co-authored-by: Thomas Chow <[email protected]>
chewy-zlai pushed a commit that referenced this pull request May 15, 2025
## Summary

Setting airflow dependencies in customJson.

This is meant to be temporary, until we can ship orchestrator.

## Checklist
- [ ] Added Unit Tests
- [ ] Covered by existing CI
- [ ] Integration tested
- [ ] Documentation update



<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
## Summary by CodeRabbit

- **New Features**
- Introduced a new utility for defining staging queries with explicit
table dependencies and partition columns.
- Added support for specifying engine type, scheduling, and advanced
metadata when creating staging queries.
- Added helper functions to automatically generate and set Airflow
dependency metadata for Chronon objects.

- **Refactor**
- Updated sample staging query definitions to use the new dependency and
metadata structure, improving clarity and consistency.
- Replaced nested metadata objects with direct keyword arguments for
easier configuration.
- Integrated automatic setting of Airflow dependencies during
configuration parsing.

- **Chores**
- Enhanced internal handling of Airflow dependencies for relevant
objects, ensuring accurate dependency tracking.
- Updated team configurations to include default partition column
settings for improved environment consistency.
<!-- end of auto-generated comment: release notes by coderabbit.ai -->

---------

Co-authored-by: ezvz <[email protected]>
Co-authored-by: tchow-zlai <[email protected]>
Co-authored-by: Thomas Chow <[email protected]>
chewy-zlai pushed a commit that referenced this pull request May 15, 2025
## Summary

Setting airflow dependencies in customJson.

This is meant to be temporary, until we can ship orchestrator.

## Checklist
- [ ] Added Unit Tests
- [ ] Covered by existing CI
- [ ] Integration tested
- [ ] Documentation update



<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
## Summary by CodeRabbit

- **New Features**
- Introduced a new utility for defining staging queries with explicit
table dependencies and partition columns.
- Added support for specifying engine type, scheduling, and advanced
metadata when creating staging queries.
- Added helper functions to automatically generate and set Airflow
dependency metadata for Chronon objects.

- **Refactor**
- Updated sample staging query definitions to use the new dependency and
metadata structure, improving clarity and consistency.
- Replaced nested metadata objects with direct keyword arguments for
easier configuration.
- Integrated automatic setting of Airflow dependencies during
configuration parsing.

- **Chores**
- Enhanced internal handling of Airflow dependencies for relevant
objects, ensuring accurate dependency tracking.
- Updated team configurations to include default partition column
settings for improved environment consistency.
<!-- end of auto-generated comment: release notes by coderabbit.ai -->

---------

Co-authored-by: ezvz <[email protected]>
Co-authored-by: tchow-zlai <[email protected]>
Co-authored-by: Thomas Chow <[email protected]>
chewy-zlai pushed a commit that referenced this pull request May 16, 2025
## Summary

Setting airflow dependencies in customJson.

This is meant to be temporary, until we can ship orchestrator.

## Cheour clientslist
- [ ] Added Unit Tests
- [ ] Covered by existing CI
- [ ] Integration tested
- [ ] Documentation update



<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
## Summary by CodeRabbit

- **New Features**
- Introduced a new utility for defining staging queries with explicit
table dependencies and partition columns.
- Added support for specifying engine type, scheduling, and advanced
metadata when creating staging queries.
- Added helper functions to automatically generate and set Airflow
dependency metadata for Chronon objects.

- **Refactor**
- Updated sample staging query definitions to use the new dependency and
metadata structure, improving clarity and consistency.
- Replaced nested metadata objects with direct keyword arguments for
easier configuration.
- Integrated automatic setting of Airflow dependencies during
configuration parsing.

- **Chores**
- Enhanced internal handling of Airflow dependencies for relevant
objects, ensuring accurate dependency traour clientsing.
- Updated team configurations to include default partition column
settings for improved environment consistency.
<!-- end of auto-generated comment: release notes by coderabbit.ai -->

---------

Co-authored-by: ezvz <[email protected]>
Co-authored-by: tchow-zlai <[email protected]>
Co-authored-by: Thomas Chow <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants