-
Notifications
You must be signed in to change notification settings - Fork 0
Cache table permission check #377
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
## Checklist - [ ] Added Unit Tests - [ ] Covered by existing CI - [ ] Integration tested - [ ] Documentation update <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit - **New Features** - Enhanced the table creation process to return clear, detailed statuses, improving feedback during table building. - Introduced a new method for generating table builders that integrates with BigQuery, including error handling for partitioning. - Streamlined data writing operations to cloud storage with automatic path configuration and Parquet integration. - Added explicit partitioning for DataFrame saves in Hive, Delta, and Iceberg formats. - **Refactor** - Overhauled logic to enforce partition restrictions and incorporate robust error handling for a smoother user experience. <!-- end of auto-generated comment: release notes by coderabbit.ai --> --------- Co-authored-by: tchow-zlai <[email protected]> Co-authored-by: Thomas Chow <[email protected]>
WalkthroughThe changes modify the Changes
Sequence Diagram(s)sequenceDiagram
participant U as User
participant T as checkTablePermission
participant S as SparkSession
participant DF as DataFrame
U->>T: Invoke checkTablePermission
T->>S: load(tableName)
S-->>T: Return DataFrame
T->>DF: cache()
T->>DF: collect()
alt Exception Occurs
DF-->>T: Throw error
T->>T: Log error
end
T-->>U: Return Boolean
Poem
Warning Review ran into problems🔥 ProblemsGitHub Actions and Pipeline Checks: Resource not accessible by integration - https://docs.github.com/rest/actions/workflow-runs#list-workflow-runs-for-a-repository. Please grant the required permissions to the CodeRabbit GitHub App under the organization or repository settings. 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro (Legacy)
📒 Files selected for processing (1)
spark/src/main/scala/ai/chronon/spark/TableUtils.scala
(1 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (2)
- GitHub Check: spark_tests
- GitHub Check: enforce_triggered_workflows
val df = sparkSession.read | ||
.load(DataPointer.from(tableName, sparkSession)) | ||
.where(s"$partitionColumn='$partitionFilter'") | ||
.limit(1) | ||
.collect() | ||
df.cache | ||
df.collect() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Clear the cache after use to prevent memory leaks.
The DataFrame is cached but never uncached, which could lead to memory pressure.
Apply this diff:
- val df = sparkSession.read
- .load(DataPointer.from(tableName, sparkSession))
- .where(s"$partitionColumn='$partitionFilter'")
- .limit(1)
- df.cache
- df.collect()
+ val df = sparkSession.read
+ .load(DataPointer.from(tableName, sparkSession))
+ .where(s"$partitionColumn='$partitionFilter'")
+ .limit(1)
+ try {
+ df.cache()
+ df.collect()
+ } finally {
+ df.unpersist()
+ }
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
val df = sparkSession.read | |
.load(DataPointer.from(tableName, sparkSession)) | |
.where(s"$partitionColumn='$partitionFilter'") | |
.limit(1) | |
.collect() | |
df.cache | |
df.collect() | |
val df = sparkSession.read | |
.load(DataPointer.from(tableName, sparkSession)) | |
.where(s"$partitionColumn='$partitionFilter'") | |
.limit(1) | |
try { | |
df.cache() | |
df.collect() | |
} finally { | |
df.unpersist() | |
} |
.load(DataPointer.from(tableName, sparkSession)) | ||
.where(s"$partitionColumn='$partitionFilter'") | ||
.limit(1) | ||
.collect() | ||
df.cache |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's combine with #376
i think @tchow-zlai removed them |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed the permissions checks altogether
Summary
Checklist
Summary by CodeRabbit