diff --git a/.gitignore b/.gitignore index 188a84461..6b26640f8 100644 --- a/.gitignore +++ b/.gitignore @@ -103,4 +103,4 @@ releases # Bazel mod files (currently unused) MODULE.bazel* -/authoring/vscode/node_modules \ No newline at end of file +/plugins/vscode/node_modules \ No newline at end of file diff --git a/.plugin-versions b/.plugin-versions index 92ed18e86..cc6ed4df5 100644 --- a/.plugin-versions +++ b/.plugin-versions @@ -1,8 +1,6 @@ asdf-plugin-manager https://github.com/asdf-community/asdf-plugin-manager.git b5862c1 bazelisk https://github.com/josephtate/asdf-bazelisk.git 9b1cd87 gcloud https://github.com/jthegedus/asdf-gcloud.git 00cdf06 -java https://github.com/halcyon/asdf-java.git 0ec69b2 python https://github.com/danhper/asdf-python.git a3a0185 -sbt https://github.com/lerencao/asdf-sbt 53c9f4b scala https://github.com/asdf-community/asdf-scala.git 0533444 thrift https://github.com/alisaifee/asdf-thrift.git fecdd6c diff --git a/.scalafix.conf b/.scalafix.conf deleted file mode 100644 index 2e30fc27f..000000000 --- a/.scalafix.conf +++ /dev/null @@ -1,8 +0,0 @@ -rules = [ - DisableSyntax, - RemoveUnused, - ExplicitResultTypes, - OrganizeImports, - ProcedureSyntax, - RedundantSyntax -] \ No newline at end of file diff --git a/.tool-versions b/.tool-versions index 755fe0868..595d4f32a 100644 --- a/.tool-versions +++ b/.tool-versions @@ -1,11 +1,6 @@ -java corretto-11.0.25.9.1 - corretto-11.0.25.9.1 - corretto-17.0.9.8.1 scala 2.12.18 asdf-plugin-manager 1.4.0 -sbt 1.8.2 python - 3.7.17 3.11.0 gcloud 507.0.0 bazelisk 1.25.0 diff --git a/AUTHORS b/AUTHORS deleted file mode 100644 index 14e47fe44..000000000 --- a/AUTHORS +++ /dev/null @@ -1,18 +0,0 @@ -Nikhil Simha (Airbnb) -Varant Zanoyan (Airbnb) -Cristian Figureoa (Airbnb) -Pengyu Hou (Airbnb) -Haozhen Ding (Airbnb) -Sophie Wang (Airbnb) -Vamsee Yarlagadda (Airbnb) -Hao Cen (Airbnb) -Donghan Zhang (Airbnb) -Yuli Han (Airbnb) -Ben Mears (Stripe) -Andrew Lee (Stripe) -Cam Weston (Stripe) -Aaron Green (Stripe) -Daniel Kristjansson (Stripe) -Piyush Narang (Stripe) -Caio Camatta (Stripe) -Divya Manohar (Stripe) diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md deleted file mode 100644 index 3f43a2060..000000000 --- a/CONTRIBUTING.md +++ /dev/null @@ -1,226 +0,0 @@ -# Contributor Guide - -Everyone is welcome to contribute to Chronon. We value all forms of contributions, including, but not limited to: - -- Documentation and usage examples -- Community participation in forums and issues. -- Code readability and developer guide -- Logging improvements -- Code comment improvements -- Documentation improvements -- Test cases to make the codebase more robust -- Tutorials, blog posts, talks that promote the project. -- Functionality extensions, new features, etc. -- Optimizations -- Support for new aggregations and data types -- Support for connectors to different storage systems and event buses - -In the interest of keeping Chronon a stable platform for users, some changes are discouraged and would be very unlikely to be allowed in. These include, but are not limited to: - -- Backwards incompatible API changes, for example adding a required argument without a default to the run.py module or the fetcher library, etc. -- Changes to the aggregation library or spark library that produce different data outputs (such changes would be caught by unit tests and fail to pass). -- Changes that could break online fetching flows, including changing the timestamp watermarking or processing in the lambda architecture, or Serde logic. -- Changes that would interfere with existing Airflow DAGs, for example changing the default schedule in a way that would cause breakage on recent versions of Airflow. - -There are exceptions to these general rules, however, please be sure to follow the “major change” guidelines if you wish to make such a change. - -## General Development Process - -Everyone in the community is welcome to send patches, documents, and propose new features to the project. - -Code changes require a stamp of approval from Chronon contributors to be merged, as outlined in the project bylaws. - -Larger changes, as well as proposed directions for the project should follow the Chronon Improvement Proposal guide, outlined below. - -The process for reporting bugs and requesting smaller features is also outlined below. - -## Pull Request Guidelines - -Pull Requests (PRs) should follow these guidelines as much as possible: - -### Code Guidelines - -- Follow our [code style guidelines](docs/source/Code_Guidelines.md) -- Well scoped (avoid multiple unrelated changes in the same PR) -- Code should be rebased on the latest version of the latest version of the master branch -- All lint checks and test cases should pass -- If the change is a bugfix to the aggregations, spark, streaming or fetching libraries, then a test case that catches the bug should be included -- Similarly, if the PR expands the functionality of these libraries, then test cases should be included to cover new functionality -- Documentation should be added for new code - -### Commit Message Guidelines - -Chronon uses the Github (GH) platform for patch submission and code review via Pull Requests (PRs). The final commit (title and body) that is merged into the master branch is composed of the PR’s title and body and must be kept updated and reflecting the new changes in the code as per the reviews and discussions. - -Although these guidelines apply essentially to the PRs’ title and body messages, because GH auto-generates the PR’s title and body from the commits on a given branch, it’s recommended to follow these guidelines right from the beginning, when preparing commits in general to be submitted to the Chronon project. This will ease the creation of a new PR, avoiding rework, and also will help the review. - -The rules below will help to achieve uniformity that has several benefits, both for review and for the code base maintenance as a whole, helping you to write commit messages with a good quality suitable for the Chronon project, allowing fast log searches, bisecting, and so on. - -#### PR title - -- Guarantee a title exists -- Don’t use Github usernames in the title, like @username (enforced) -- Include tags as a hint about what component(s) of the code the PRs / commits “touch”. For example [BugFix], [CI], [Streaming], [Spark], etc. If more than one tag exist, multiple brackets should be used, like [BugFix][CI] - -#### PR body - -- Guarantee a body exists -- Include a simple and clear explanation of the purpose of the change -- Include any relevant information about how it was tested - -## Release Guidelines - -Releases are managed by project committers, as outlined in the project Bylaws. -The committer(s) who approve and help merge a change should be the ones to drive the release process for that change, unless explicitly delegated to someone else. -Please see the release instructions in the code repository. - -## Bug Reports - -Issues need to contain all relevant information based on the type of the issue. We have four issue types - -### Incorrect Outputs - -- Summary of what the user was trying to achieve - - Sample data - Inputs, Expected Outputs (by the user) and Current Output - - Configuration - StagingQuery / GroupBy or Join -- Repro steps - - What commands were run and what was the full output of the command -- PR guidelines - - Includes a failing test case based on sample data - -### Crash Reports - -- Summary of what the user was trying to achieve - - Sample data - Inputs, Expected Outputs (by the user) - - Configuration - StagingQuery / GroupBy or Join -- Repro steps - - What commands were run and the output along with the error stack trace -- PR guidelines - - Includes a test case for the crash - -## Feature requests and Optimization Requests - -We expect the proposer to create a CHIP / Chronon Improvement Proposal document as detailed below - -# Chronon Improvement Proposal (CHIP) - -## Purpose - -The purpose of CHIPs is to have a central place to collect and document planned major enhancements to Chronon. While Github is still the tool to track tasks, bugs, and progress, the CHIPs give an accessible high-level overview of the result of design discussions and proposals. Think of CHIPs as collections of major design documents for relevant changes. -This way of maintaining CHIPs is heavily influenced by the Apache Flink project’s Improvement Proposal guidelines. But instead of doing this through JIRA, we use Github PRs and issues. -We want to make Chronon a core architectural component for users. We also support a large number of integrations with other tools, systems, and clients. Keeping this kind of usage healthy requires a high level of compatibility between releases — core architectural elements can't break compatibility or shift functionality from release to release. As a result each new major feature or public API has to be done in a future proof way. -This means when making this kind of change we need to think through what we are doing as best we can prior to release. And as we go forward we need to stick to our decisions as much as possible. All technical decisions have pros and cons so it is important we capture the thought process that leads to a decision or design to avoid flip-flopping needlessly. - -**CHIPs should be proportional in effort to their magnitude — small changes should just need a couple brief paragraphs, whereas large changes need detailed design discussions.** - -This process also isn't meant to discourage incompatible changes — proposing an incompatible change is totally legitimate. Sometimes we will have made a mistake and the best path forward is a clean break that cleans things up and gives us a good foundation going forward. Rather this is intended to avoid accidentally introducing half thought-out interfaces and protocols that cause needless heartburn when changed. Likewise the definition of "compatible" is itself squishy: small details like which errors are thrown when are clearly part of the contract but may need to change in some circumstances, likewise performance isn't part of the public contract but dramatic changes may break use cases. So we just need to use good judgment about how big the impact of an incompatibility will be and how big the payoff is. - -## What is considered a "major change" that needs a CHIP? - -Any of the following should be considered a major change: - -- Any major new feature, subsystem, or piece of functionality -- Any change that impacts the public interfaces of the project - -All of the following are public interfaces that people build around: - -- User facing Python APIs - - StagingQuery - freeform ETL primitive - - Join - enrichment primitive - - GroupBy - aggregation primitive - - Source - - Metadata (designed to be extensible, but we want to make sure our extensions are general and future proof) -- User facing Python tooling - - compile.py - - run.py - - explore.py -- Java APIs - - KVStore - kv store connectors are implemented against this (once per company) - - Fetcher - this is used by applications to read processed data (used many many times) - - Stats Store - used by Grafana dashboards - - Metadata Store - used to manage metadata - - Stream Decoder - used to implement connectors and decoders for streams - -Not all compatibility commitments are the same. We need to spend significantly more time on public APIs as these can break code for existing users. They cause people to rebuild code and lead to compatibility issues in large multi-dependency projects (which end up requiring multiple incompatible versions). Configuration, monitoring, and command line tools can be faster and looser — changes here will break monitoring dashboards and require a bit of care during upgrades but aren't a huge burden. - -For the most part monitoring, command line tool changes, and configs are added with new features so these can be done with a single CHIP. - -## What should be included in a CHIP? - -A CHIP should contain the following sections: - -- Motivation: describe the problem to be solved -- Proposed Change: describe the new thing you want to do. This may be fairly extensive and have large subsections of its own. Or it may be a few sentences, depending on the scope of the change. -- New or Changed Public Interfaces: impact to any of the "compatibility commitments" described above. We want to call these out in particular so everyone thinks about them. -Migration Plan and Compatibility: if this feature requires additional support for a no-downtime upgrade describe how that will work -- Rejected Alternatives: What are the other alternatives you considered and why are they worse? The goal of this section is to help people understand why this is the best solution now, and also to prevent churn in the future when old alternatives are reconsidered. - -## Who should initiate the CHIP? - -Anyone can initiate a CHIP but you shouldn't do it unless you have an intention of doing the work to implement it. - -## Process - -Here is the process for making a CHIP: - -1. Create a PR in chronon/proposals with a single markdown file.Take the next available CHIP number and create a file “CHIP-42 Monoid caching for online & real-time feature fetches”. This is the document that you will iterate on. -2. Fill in the sections as described above and file a PR. These proposal document PRs are reviewed by the committer who is on-call. They usually get merged once there is enough detail and clarity. -3. Start a [DISCUSS] issue on github. Please ensure that the subject of the thread is of the format [DISCUSS] CHIP-{your CHIP number} {your CHIP heading}. In the process of the discussion you may update the proposal. You should let people know the changes you are making. -4. Once the proposal is finalized, tag the issue with the “voting-due” label. These proposals are more serious than code changes and more serious even than release votes. In the weekly committee meetings we will vote for/against the CHIP - where Yes, Veto-no, Neutral are the choices. The criteria for acceptance is 3+ “yes” vote count by the members of the committee without a veto-no. Veto-no votes require in-depth technical justifications to be provided on the github issue. -5. Please update the CHIP markdown doc to reflect the current stage of the CHIP after a vote. This acts as the permanent record indicating the result of the CHIP (e.g., Accepted or Rejected). Also report the result of the CHIP vote to the github issue thread. - -It's not unusual for a CHIP proposal to take long discussions to be finalized. Below are some general suggestions on driving CHIP towards consensus. Notice that these are hints rather than rules. Contributors should make pragmatic decisions in accordance with individual situations. - -- The progress of a CHIP should not be long blocked on an unresponsive reviewer. A reviewer who blocks a CHIP with dissenting opinions should try to respond to the subsequent replies timely, or at least provide a reasonable estimated time to respond. -- A typical reasonable time to wait for responses is 1 week, but be pragmatic about it. Also, it would be considerate to wait longer during holiday seasons (e.g., Christmas, Chinese New Year, etc.). -- We encourage CHIP proposers to actively reach out to the interested parties (e.g., previous contributors of the relevant part) early. It helps expose and address the potential dissenting opinions early, and also leaves more time for other parties to respond while the proposer works on the CHIP. -- Committers should use their veto rights with care. Vetos must be provided with a technical justification showing why the change is bad. They should not be used for simply blocking the process so the voter has more time to catch up. - -# Resources - -Below is a list of resources that can be useful for development and debugging. - -## Docs - -[Docsite](https://chronon.ai)\ -[doc directory](https://github.com/airbnb/chronon/tree/main/docs/source)\ -[Code of conduct](TODO) - -## Links - -[pip project](https://pypi.org/project/chronon-ai/)\ -[maven central](https://mvnrepository.com/artifact/ai.chronon/): [publishing](https://github.com/airbnb/chronon/blob/main/devnotes.md#publishing-all-the-artifacts-of-chronon)\ -[Docsite: publishing](https://github.com/airbnb/chronon/blob/main/devnotes.md#chronon-artifacts-publish-process) - -## Code Pointers - -### API - -[Thrift](https://github.com/airbnb/chronon/blob/main/api/thrift/api.thrift#L180), [Python](https://github.com/airbnb/chronon/blob/main/api/py/ai/chronon/group_by.py)\ -[CLI driver entry point for job launching.](https://github.com/airbnb/chronon/blob/main/spark/src/main/scala/ai/chronon/spark/Driver.scala) - -### Offline flows that produce hive tables or file output - -[GroupBy](https://github.com/airbnb/chronon/blob/main/spark/src/main/scala/ai/chronon/spark/GroupBy.scala)\ -[Staging Query](https://github.com/airbnb/chronon/blob/main/spark/src/main/scala/ai/chronon/spark/StagingQuery.scala)\ -[Join backfills](https://github.com/airbnb/chronon/blob/main/spark/src/main/scala/ai/chronon/spark/Join.scala)\ -[Metadata Export](https://github.com/airbnb/chronon/blob/main/spark/src/main/scala/ai/chronon/spark/MetadataExporter.scala) - -### Online flows that update and read data & metadata from the kvStore - -[GroupBy window tail upload](https://github.com/airbnb/chronon/blob/main/spark/src/main/scala/ai/chronon/spark/GroupByUpload.scala)\ -[Streaming window head upload](https://github.com/airbnb/chronon/blob/main/spark/src/main/scala/ai/chronon/spark/streaming/GroupBy.scala)\ -[Fetching](https://github.com/airbnb/chronon/blob/main/online/src/main/scala/ai/chronon/online/Fetcher.scala) - -### Aggregations - -[time based aggregations](https://github.com/airbnb/chronon/blob/main/aggregator/src/main/scala/ai/chronon/aggregator/base/TimedAggregators.scala)\ -[time independent aggregations](https://github.com/airbnb/chronon/blob/main/aggregator/src/main/scala/ai/chronon/aggregator/base/SimpleAggregators.scala)\ -[integration point with rest of chronon](https://github.com/airbnb/chronon/blob/main/aggregator/src/main/scala/ai/chronon/aggregator/row/ColumnAggregator.scala#L223)\ -[Windowing](https://github.com/airbnb/chronon/tree/main/aggregator/src/main/scala/ai/chronon/aggregator/windowing) - -### Testing - -[Testing - sbt commands](https://github.com/airbnb/chronon/blob/main/devnotes.md#testing)\ -[Automated testing - circle-ci pipelines](https://app.circleci.com/pipelines/github/airbnb/chronon)\ -[Dev Setup](https://github.com/airbnb/chronon/blob/main/devnotes.md#prerequisites) diff --git a/GOVERNANCE.md b/GOVERNANCE.md deleted file mode 100644 index 6d8d649d2..000000000 --- a/GOVERNANCE.md +++ /dev/null @@ -1,179 +0,0 @@ -This document defines the bylaws under which the Chronon Open Source project operates, including the roles and responsibilities of various stakeholders as well as the operation of voting, resolving conflicts, and deciding on the direction of the project. -# Roles - -![roles](roles.png) - -Stakeholders of the project fall into one of the following roles: - -## Users - -Anybody is free to become a user of the Chronon project. - -Users contribute to the projects by providing feedback to contributors in the form of bug reports and feature suggestions. Users also participate in the community by helping other users on mailing lists and user support forums like Stack Overflow. - -## Contributors - -Anyone writing code, documentation, or other resources like tutorials, demos or videos is a contributor to the project. Again, anyone is welcome to become a contributor, and contributors may become committers to the project by invitation (covered below). - -## Committers - -The project's committers are responsible for the project's technical management. Committers have access to a specified set of subproject's subversion repositories. - -Committers on subprojects may cast binding votes on any technical discussion regarding that subproject. - -Committer access is by invitation only and must be approved by the lazy consensus of the active PMC members. A Committer is considered emeritus by his or her own declaration or by not reviewing patches or committing patches to the project for over six months. An emeritus committer may request reinstatement of commit access from the PMC which must be approved by a lazy consensus of the active PMC members. - -Commit access can be revoked by a unanimous vote of all the active PMC members (except the committer in question if they are also a PMC member). - -## Project Management Committee (PMC) - -The PMC is responsible for the management and oversight of the Chronon codebase. These responsibilities include: -* Deciding what is distributed as part of the Chronon project. In particular all releases must be approved by the PMC. -* Maintaining the project's shared resources, including the codebase repository, mailing lists, websites. -* Speaking on behalf of the project. -* Resolving license disputes regarding products of the project. -* Nominating new PMC members and committers. -* Maintaining these bylaws and other guidelines of the project. - -### PMC Seat Allocation - -PMC seats are currently only allocated to Airbnb and Stripe organizations, meaning that PMC members must be parts of those organizations. Specifically, 8 seats are reserved for Airbnb, and 5 for Stripe. - -PMC members can be declared emeritus and removed from the active list in three different ways: by their own declaration, by leaving the organization to which seats are allocated (Stripe and Airbnb), or by a consensus vote of all the active PMC members other than the member in question. In these cases, they also lose their committer status, however, they are free to be reinstated as committers immediately, following the normal protocol. - -When a PMC member is removed from the active list, their organization can and should nominate a replacement. A nomination indicates unanimous approval from the PMC members of that organization, and in the absence of a veto from other PMC members, the nomination is immediately approved. Should a veto be cast by another member, it must come with a reason, and either the issue can be resolved through debate and the veto is removed, or a new member must be nominated. - -Major decisions requiring PMC voting should ideally be held off until the PMC is at full membership (13 active members). However, if an urgent matter needs deciding while the PMC is below full membership, then each organization can cast proxy votes for their empty seats. It is up to each organization to decide how they wish to cast these votes. - -# Decision Making - -## Voting - -Decisions regarding the project are made by votes on the primary project development mailing list dev@chronon.ai. - -Votes are clearly indicated by the subject line starting with [VOTE]. Votes may contain multiple items for approval and these should be clearly separated. Voting is carried out by replying to the vote mail. Voting may take three flavors - - -| Vote | Meaning | -| ---- | ----- | -| +1 | 'Yes,' 'Agree,' or 'the action should be performed.' | -| +0 | Neutral about the proposed action (or mildly negative but not enough so to want to block it). | -| -1 | This is a negative vote. On issues where consensus is required, this vote counts as a veto. All vetoes must contain an explanation of why the veto is appropriate. Vetoes with no explanation are void. It may also be appropriate for a -1 vote to include an alternative course of action. - - -All eligible participants are encouraged to show their agreement with or against a particular action by voting (eligibility depends on the action being voted upon, outlined in the “Actions” section below). - -For technical decisions, only the votes of active committers are binding. Non-binding votes are still useful for those with binding votes to understand the perception of an action in the wider community. - -For PMC decisions, only the votes of active PMC members are binding. - -Voting can also be applied to changes already made to the Chronon codebase. These typically take the form of a veto (-1) in reply to the commit message sent when the commit is made. Note that this should be a rare occurrence. All efforts should be made to discuss issues when they are still patches before the code is committed. - -Only active (i.e. non-emeritus) committers and PMC members have binding votes. - -## Approvals - - -| Approval Type |Definition | -| ------------- | ---------- | -| Consensus | Consensus requires 3 binding +1 votes and no -1 binding vetoes. | -| Lazy Majority | A lazy majority vote requires 3 binding +1 votes and more binding +1 votes than -1 votes. | -| Lazy Approval | An action with lazy approval is implicitly allowed unless a -1 vote is received, at which time, depending on the type of action, either lazy majority or consensus approval must be obtained. | -| 2/3 Majority | Some actions require a 2/3 majority of active PMC members to pass. Such actions typically affect the foundation of the project (e.g. adopting a new codebase). The higher threshold is designed to ensure such changes are strongly supported. To pass this vote requires at least 2/3 of binding vote holders to vote +1. | - - -## Vetoes - -A valid, binding veto cannot be overruled. If a veto is cast, it must be accompanied by a valid reason explaining the reasons for the veto. The validity of a veto, if challenged, can be confirmed by anyone who has a binding vote. This does not necessarily signify agreement with the veto - merely that the veto is valid. - -If you disagree with a valid veto, you must lobby the person casting the veto to withdraw their veto. If a veto is not withdrawn, the action that has been vetoed must be reversed in a timely manner. - -Only active members of the PMC have the ability to veto, and all active PMC members may veto any vote. -## Actions - - -| Actions | Description | Approval | Binding Votes | Minimum Length (days) | Mailing List | -| ------- | ----------- | -------- | ------------- | --------------------- | ------------ | -| Code Change | A change made to a codebase of the project and committed by a committer. This includes source code, documentation, website content, etc. | A +1 from a committer (Github approval counts as a +1). Moving to a lazy majority if a -1 is received (github rejection counts as a -1). A -1 from a committer counts as a veto. It must come with an explanation, and ideally it should be resolved through code change and petition. If it fails to be resolved through dialogue after 3 days, the on-call, or another PMC member, will intervene to try to reach consensus. If that also fails, then the veto can be overturned by a lazy majority vote amongst PMC voters. | Active committers | 0 | Github Pull Request (automated notification sent to dev@chronon.ai) | -| Major Change | A major change to the codebase. Exact definition of “major” TBD. | Consensus (3 github approvals), with the same veto rules as a minor code change. | Active PMC Members | 3 |Github Pull Request (automated notification sent to dev@chronon.ai) | -| Chronon Improvement Process Proposal (CHIP) | A required proposal prior to any major change. | Consensus (3 github approvals), with the same veto rules as a minor code change. | Active PMC members | 3 | Github Pull Request (automated notification sent to dev@chronon.ai) | -| Release Plan | Defines the timetable and actions for a release. The plan also nominates a Release Manager. | Lazy majority | Active PMC Members | 3 | dev@chronon.ai | -| Product Release | When a release of one of the project's products is ready, a vote is required to accept the release as an official release of the project. | Lazy Majority | Active PMC members | 3 | dev@chronon.ai | -| Adoption of New Codebase | Adoption of large existing external codebase. This refers to contributions big enough that potentially change the shape and direction of the project with massive restructuring and future maintenance commitment. | 2/3 majority | Active PMC members | 6 | dev@chronon.ai | -| New Committer | When a new committer is proposed for the project. | Consensus | Active PMC members | 3 | private@chronon.ai | -| New PMC Member | When a committer is proposed for the PMC. | Consensus | Active PMC membersx | 3 | private@chronon.ai | Committer Removal | When removal of commit privileges is sought. | Consensus | Active PMC members (excluding the committer in question if a member of the PMC). | 6 | private@chronon.ai | -| PMC Member Removal | When removal of a PMC member is sought. | Consensus | Active PMC members (excluding the member in question). | 6 | private@chronon.ai | -| Modifying Bylaws | Modifying this document. | 2/3 majority | Active PMC members | 6 | dev@chronon.ai | - - -## Reverting - -The on-call is free to use their discretion to revert any PR, even if it fulfills the conditions outlined above for merging. The cases where the on-call may elect to do this include, but are not limited to: -* The PR breaks the pipeline/CI -* The conditions under which it was merged did not allow for proper review (for example very late PR time with quick stamps and merging) -* The PR was merged as a minor change, but the on-call determines that it more closely resembles a major change - -# Examples - -## Minor code change - -Including bug fixes, small features and extensions. - -1. Contributor opens up a PR -2. If the PR gets at least one +1 from an active committer without any -1, then it’s merged. -3. If the PR gets a -1 from an active committer or a PMC member (the -1 must come with an explanation) - 1. Ideally the rejection is resolved through code change/discussion amongst the parties involved (initial committer as well as vetoer) - 2. If after 3 days the discussion hasn’t yielded any progress, the on-call or another PMC member will get involved to try and guide the conversation to a productive consensus - 3. Should that fail, then the on-call or PMC member will inform the rest of the PMC that debate has failed and then we will move ahead with a lazy majority vote amongst PMC members to resolve the issue. - -## Major code change - -These should have an associated CHIP (see contributor guide) that is already approved by a Consensus vote of PMC members. - -1. Contributor opens up a PR -2. If the PR gets at least three +1s from active committers without any -1s, then it’s merged. -3. If the PR gets a -1 from an active committer or a PMC member (the -1 must come with an explanation) - 1. Then we follow the same process as for a minor change, however without any guidelines as to how long the debate will continue for. The on-call or PMC member(s) resolving the debate are free to let it go on for longer before calling for a vote to resolve the issue, especially if the change is consequential for the future direction of the project. - -## Major vs Minor Changes - -We do not have a formal definition of major, but as a rough guideline any change that fulfills any of the below criteria could be considered major: -* Touches 10+ files in a significant way -* Sensitive changes such as: - * Edit, add, or remove major libraries or components, for example introducing a new dependency - * Any change to APIs, even backwards compatible changes like adding optional arguments - * Any change to core aggregation libraries - * Changing the default configurations of any parameters -* More than ~500 lines of code - -The final decision on what constitutes a major change will be left up to the on-call at the time that the PR is merged. - -If a PR is merged under the “minor change” process, but the on-call determines that it is in fact a “major change”, then they are free to immediately revert the PR and call for the major change process. - -## Proposed Re-architectures and Directional Changes - -Large re-architectures begin with a CHIP, which is the most important part of the process, where the overall vision and design is layed out. The CHIP itself follows an approval process identical to a major code change. - -Once the CHIP is approved, the PRs that comprise the change follow the normal approval processes in line with their status as either major or minor. - -# Licensing, Copyright / Ownership - -The code will be licensed under the standard Apache v2 license without any modifications. This is the most popular, permissive, liability-and-warranty-voiding open-source license. - -The ownership/copyright of the project will belong to “The Chronon Authors” which is a collective group of people that have contributed code and documentation to the project - any one with a github PR that’s been committed. This is standard industry practice and is followed by companies such as Google and Organizations such as Linux foundation - more details [here](https://opensource.google/documentation/reference/releasing/authors) and [here](https://www.linuxfoundation.org/blog/blog/copyright-notices-in-open-source-software-projects). - -# Appendix - -## Email Voting Example - New Committer - -Initial email to set the motion forward (sent to dev@chronon.ai): - -![email voting example](email_voting_example.png) - -A simple +1 or -1 is all that is required in reply: - -![email reply example](email_reply_example.png) - -After at least three days, and 3 positive votes with no vetoes: - -![vote tally](vote_tally.png) diff --git a/LICENSE b/LICENSE deleted file mode 100644 index 4a70f3ce5..000000000 --- a/LICENSE +++ /dev/null @@ -1,201 +0,0 @@ - Apache License - Version 2.0, January 2004 - http://www.apache.org/licenses/ - - TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION - - 1. Definitions. - - "License" shall mean the terms and conditions for use, reproduction, - and distribution as defined by Sections 1 through 9 of this document. - - "Licensor" shall mean the copyright owner or entity authorized by - the copyright owner that is granting the License. - - "Legal Entity" shall mean the union of the acting entity and all - other entities that control, are controlled by, or are under common - control with that entity. For the purposes of this definition, - "control" means (i) the power, direct or indirect, to cause the - direction or management of such entity, whether by contract or - otherwise, or (ii) ownership of fifty percent (50%) or more of the - outstanding shares, or (iii) beneficial ownership of such entity. - - "You" (or "Your") shall mean an individual or Legal Entity - exercising permissions granted by this License. - - "Source" form shall mean the preferred form for making modifications, - including but not limited to software source code, documentation - source, and configuration files. - - "Object" form shall mean any form resulting from mechanical - transformation or translation of a Source form, including but - not limited to compiled object code, generated documentation, - and conversions to other media types. - - "Work" shall mean the work of authorship, whether in Source or - Object form, made available under the License, as indicated by a - copyright notice that is included in or attached to the work - (an example is provided in the Appendix below). - - "Derivative Works" shall mean any work, whether in Source or Object - form, that is based on (or derived from) the Work and for which the - editorial revisions, annotations, elaborations, or other modifications - represent, as a whole, an original work of authorship. For the purposes - of this License, Derivative Works shall not include works that remain - separable from, or merely link (or bind by name) to the interfaces of, - the Work and Derivative Works thereof. - - "Contribution" shall mean any work of authorship, including - the original version of the Work and any modifications or additions - to that Work or Derivative Works thereof, that is intentionally - submitted to Licensor for inclusion in the Work by the copyright owner - or by an individual or Legal Entity authorized to submit on behalf of - the copyright owner. For the purposes of this definition, "submitted" - means any form of electronic, verbal, or written communication sent - to the Licensor or its representatives, including but not limited to - communication on electronic mailing lists, source code control systems, - and issue tracking systems that are managed by, or on behalf of, the - Licensor for the purpose of discussing and improving the Work, but - excluding communication that is conspicuously marked or otherwise - designated in writing by the copyright owner as "Not a Contribution." - - "Contributor" shall mean Licensor and any individual or Legal Entity - on behalf of whom a Contribution has been received by Licensor and - subsequently incorporated within the Work. - - 2. Grant of Copyright License. Subject to the terms and conditions of - this License, each Contributor hereby grants to You a perpetual, - worldwide, non-exclusive, no-charge, royalty-free, irrevocable - copyright license to reproduce, prepare Derivative Works of, - publicly display, publicly perform, sublicense, and distribute the - Work and such Derivative Works in Source or Object form. - - 3. Grant of Patent License. Subject to the terms and conditions of - this License, each Contributor hereby grants to You a perpetual, - worldwide, non-exclusive, no-charge, royalty-free, irrevocable - (except as stated in this section) patent license to make, have made, - use, offer to sell, sell, import, and otherwise transfer the Work, - where such license applies only to those patent claims licensable - by such Contributor that are necessarily infringed by their - Contribution(s) alone or by combination of their Contribution(s) - with the Work to which such Contribution(s) was submitted. If You - institute patent litigation against any entity (including a - cross-claim or counterclaim in a lawsuit) alleging that the Work - or a Contribution incorporated within the Work constitutes direct - or contributory patent infringement, then any patent licenses - granted to You under this License for that Work shall terminate - as of the date such litigation is filed. - - 4. Redistribution. You may reproduce and distribute copies of the - Work or Derivative Works thereof in any medium, with or without - modifications, and in Source or Object form, provided that You - meet the following conditions: - - (a) You must give any other recipients of the Work or - Derivative Works a copy of this License; and - - (b) You must cause any modified files to carry prominent notices - stating that You changed the files; and - - (c) You must retain, in the Source form of any Derivative Works - that You distribute, all copyright, patent, trademark, and - attribution notices from the Source form of the Work, - excluding those notices that do not pertain to any part of - the Derivative Works; and - - (d) If the Work includes a "NOTICE" text file as part of its - distribution, then any Derivative Works that You distribute must - include a readable copy of the attribution notices contained - within such NOTICE file, excluding those notices that do not - pertain to any part of the Derivative Works, in at least one - of the following places: within a NOTICE text file distributed - as part of the Derivative Works; within the Source form or - documentation, if provided along with the Derivative Works; or, - within a display generated by the Derivative Works, if and - wherever such third-party notices normally appear. The contents - of the NOTICE file are for informational purposes only and - do not modify the License. You may add Your own attribution - notices within Derivative Works that You distribute, alongside - or as an addendum to the NOTICE text from the Work, provided - that such additional attribution notices cannot be construed - as modifying the License. - - You may add Your own copyright statement to Your modifications and - may provide additional or different license terms and conditions - for use, reproduction, or distribution of Your modifications, or - for any such Derivative Works as a whole, provided Your use, - reproduction, and distribution of the Work otherwise complies with - the conditions stated in this License. - - 5. Submission of Contributions. Unless You explicitly state otherwise, - any Contribution intentionally submitted for inclusion in the Work - by You to the Licensor shall be under the terms and conditions of - this License, without any additional terms or conditions. - Notwithstanding the above, nothing herein shall supersede or modify - the terms of any separate license agreement you may have executed - with Licensor regarding such Contributions. - - 6. Trademarks. This License does not grant permission to use the trade - names, trademarks, service marks, or product names of the Licensor, - except as required for reasonable and customary use in describing the - origin of the Work and reproducing the content of the NOTICE file. - - 7. Disclaimer of Warranty. Unless required by applicable law or - agreed to in writing, Licensor provides the Work (and each - Contributor provides its Contributions) on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or - implied, including, without limitation, any warranties or conditions - of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A - PARTICULAR PURPOSE. You are solely responsible for determining the - appropriateness of using or redistributing the Work and assume any - risks associated with Your exercise of permissions under this License. - - 8. Limitation of Liability. In no event and under no legal theory, - whether in tort (including negligence), contract, or otherwise, - unless required by applicable law (such as deliberate and grossly - negligent acts) or agreed to in writing, shall any Contributor be - liable to You for damages, including any direct, indirect, special, - incidental, or consequential damages of any character arising as a - result of this License or out of the use or inability to use the - Work (including but not limited to damages for loss of goodwill, - work stoppage, computer failure or malfunction, or any and all - other commercial damages or losses), even if such Contributor - has been advised of the possibility of such damages. - - 9. Accepting Warranty or Additional Liability. While redistributing - the Work or Derivative Works thereof, You may choose to offer, - and charge a fee for, acceptance of support, warranty, indemnity, - or other liability obligations and/or rights consistent with this - License. However, in accepting such obligations, You may act only - on Your own behalf and on Your sole responsibility, not on behalf - of any other Contributor, and only if You agree to indemnify, - defend, and hold each Contributor harmless for any liability - incurred by, or claims asserted against, such Contributor by reason - of your accepting any such warranty or additional liability. - - END OF TERMS AND CONDITIONS - - APPENDIX: How to apply the Apache License to your work. - - To apply the Apache License to your work, attach the following - boilerplate notice, with the fields enclosed by brackets "[]" - replaced with your own identifying information. (Don't include - the brackets!) The text should be enclosed in the appropriate - comment syntax for the file format. We also recommend that a - file or class name and description of purpose be included on the - same "printed page" as the copyright notice for easier - identification within third-party archives. - - Copyright (C) 2023 The Chronon Authors. - - Licensed under the Apache License, Version 2.0 (the "License"); - you may not use this file except in compliance with the License. - You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. diff --git a/docker-compose.yml b/docker-compose.yml deleted file mode 100644 index f5432acb3..000000000 --- a/docker-compose.yml +++ /dev/null @@ -1,66 +0,0 @@ -# Quickstart Docker containers to run chronon commands with MongoDB as the KV Store. -services: - - mongodb: - image: mongo:latest - ports: - - "27017:27017" - environment: - MONGO_INITDB_ROOT_USERNAME: admin - MONGO_INITDB_ROOT_PASSWORD: admin - volumes: - - mongodb_data:/opt/mongo/data/db - - zookeeper: - image: confluentinc/cp-zookeeper:latest - environment: - ZOOKEEPER_CLIENT_PORT: 2181 - ZOOKEEPER_TICK_TIME: 2000 - ports: - - 22181:2181 - - kafka: - image: confluentinc/cp-kafka:latest - depends_on: - - zookeeper - ports: - - 9092:9092 - environment: - KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 - KAFKA_ADVERTISED_LISTENERS: INSIDE://kafka:9092 - KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INSIDE:PLAINTEXT - KAFKA_INTER_BROKER_LISTENER_NAME: INSIDE - KAFKA_CREATE_TOPICS: "events.returns:1:3" - KAFKA_MESSAGE_MAX_BYTES: 2147483647 - - main: - image: ezvz/chronon - command: bash -c "spark-shell -i scripts/data-loader.scala && tail -f /dev/null" - ports: - - "4040:4040" - environment: - - USER=root - - SPARK_SUBMIT_PATH=spark-submit - - PYTHONPATH=/srv/chronon - - SPARK_VERSION=3.5.1 - - JOB_MODE=local[*] - - PARALLELISM=2 - - EXECUTOR_MEMORY=2G - - EXECUTOR_CORES=4 - - DRIVER_MEMORY=1G - - CHRONON_LOG_TABLE=default.chronon_log_table - - CHRONON_ONLINE_CLASS=ai.chronon.quickstart.online.ChrononMongoOnlineImpl - - CHRONON_ONLINE_ARGS=-Zuser=admin -Zpassword=admin -Zhost=mongodb -Zport=27017 -Zdatabase=admin - - -volumes: - mongodb_data: - spark_events: - - -# volumes: -# - ./api/py/test/sample:/srv/chronon # Main working dir and repo for samples -# - ./quickstart/mongo-online-impl:/srv/onlineImpl # KV Store implementation -# - ./quickstart/jars:/srv/jars # Driver connectors and other spark required jars -# - /Users/varant_zanoyan/repos/chronon:/srv/chronon_jar -# - spark_events:/opt/spark/spark-events \ No newline at end of file diff --git a/docker-init/README.md b/docker-init/README.md index a3f903306..f5187855a 100644 --- a/docker-init/README.md +++ b/docker-init/README.md @@ -4,7 +4,7 @@ This directory holds code to setup docker containers for dynamoDB, a spark maste > **Note** > Make sure you have `docker >= 20.10` installed. -> Install appropriate java, scala, and python versions following the instructions in [devnotes.md](../devnotes.md#install-appropriate-java-scala-and-python-versions). +> Install appropriate java, scala, and python versions following the instructions in [devnotes.md](../docs/source/dev/devnotes.md#install-appropriate-java-scala-and-python-versions). ```bash $ cd chronon diff --git a/cli_readme.md b/docs/source/dev/cli_readme.md similarity index 100% rename from cli_readme.md rename to docs/source/dev/cli_readme.md diff --git a/devnotes.md b/docs/source/dev/devnotes.md similarity index 100% rename from devnotes.md rename to docs/source/dev/devnotes.md diff --git a/email_reply_example.png b/email_reply_example.png deleted file mode 100644 index 9d20cd603..000000000 Binary files a/email_reply_example.png and /dev/null differ diff --git a/email_voting_example.png b/email_voting_example.png deleted file mode 100644 index 9e97febc1..000000000 Binary files a/email_voting_example.png and /dev/null differ diff --git a/intellij_unit_test_1.png b/intellij_unit_test_1.png deleted file mode 100644 index 066efac68..000000000 Binary files a/intellij_unit_test_1.png and /dev/null differ diff --git a/intellij_unit_test_2.png b/intellij_unit_test_2.png deleted file mode 100644 index 8c803906f..000000000 Binary files a/intellij_unit_test_2.png and /dev/null differ diff --git a/intellij_unit_test_3.png b/intellij_unit_test_3.png deleted file mode 100644 index 337b00f02..000000000 Binary files a/intellij_unit_test_3.png and /dev/null differ diff --git a/authoring/vscode/.vscodeignore b/plugins/vscode/.vscodeignore similarity index 100% rename from authoring/vscode/.vscodeignore rename to plugins/vscode/.vscodeignore diff --git a/authoring/vscode/LICENSE b/plugins/vscode/LICENSE similarity index 100% rename from authoring/vscode/LICENSE rename to plugins/vscode/LICENSE diff --git a/authoring/vscode/images/icon.png b/plugins/vscode/images/icon.png similarity index 100% rename from authoring/vscode/images/icon.png rename to plugins/vscode/images/icon.png diff --git a/authoring/vscode/package.json b/plugins/vscode/package.json similarity index 100% rename from authoring/vscode/package.json rename to plugins/vscode/package.json diff --git a/authoring/vscode/src/extension.ts b/plugins/vscode/src/extension.ts similarity index 100% rename from authoring/vscode/src/extension.ts rename to plugins/vscode/src/extension.ts diff --git a/authoring/vscode/tsconfig.json b/plugins/vscode/tsconfig.json similarity index 100% rename from authoring/vscode/tsconfig.json rename to plugins/vscode/tsconfig.json diff --git a/authoring/vscode/webpack.config.js b/plugins/vscode/webpack.config.js similarity index 100% rename from authoring/vscode/webpack.config.js rename to plugins/vscode/webpack.config.js diff --git a/project/FolderCleaner.scala b/project/FolderCleaner.scala deleted file mode 100644 index 49dcb6354..000000000 --- a/project/FolderCleaner.scala +++ /dev/null @@ -1,16 +0,0 @@ -import org.slf4j.LoggerFactory -import java.io.File -import scala.reflect.io.Directory - -object Folder { - @transient lazy val logger = LoggerFactory.getLogger(getClass) - def clean(files: File*): Unit = { - logger.info(s"Removing folders ${files.map(_.getAbsolutePath)}") - files.foreach { file => - if (file.exists() && file.isDirectory) { - val directory = new Directory(file) - directory.deleteRecursively() - } - } - } -} diff --git a/project/ThriftGen.scala b/project/ThriftGen.scala deleted file mode 100644 index 894fe35a8..000000000 --- a/project/ThriftGen.scala +++ /dev/null @@ -1,59 +0,0 @@ -import org.slf4j.LoggerFactory -import sbt.* - -import scala.language.postfixOps -import sys.process.* - -object ThriftGen { - - def print_and_execute(command: String): Int = { - println(s"+ $command") - try { - val result = Process(command).!(ProcessLogger( - out => println(s"[out] $out"), - err => println(s"[err] $err") - )) - if (result != 0) { - println(s"Command failed with exit code $result") - } - result - } catch { - case e: Exception => - println(s"Command failed with exception: ${e.getMessage}") - throw e - } - } - - def replaceInFile(file: File): Unit = { - val source = scala.io.Source.fromFile(file, "UTF-8") - val content = source.mkString - source.close() - val newContent = content.replace("org.apache.thrift", "ai.chronon.api.thrift") - val writer = new java.io.PrintWriter(file) - try { - writer.write(newContent) - } finally { - writer.close() - } - } - - def gen(inputFolder: File, outputPath: String, language: String, extension: String = null): Seq[File] = { - s"""echo "Generating files from thrift files at: $inputFolder/ \ninto folder $outputPath" """ !; - print_and_execute(s"rm -rf $outputPath") - print_and_execute(s"mkdir -p $outputPath"); - print_and_execute(s"thrift -version") - val thriftFiles = (PathFinder(new File(s"$inputFolder/")) ** "*.thrift").get() - thriftFiles.foreach { file => - println(s"Processing file: $file") - print_and_execute(s"thrift --gen $language:generated_annotations=suppress -out $outputPath $file") - } - val javaFiles = (PathFinder(new File(s"$outputPath/ai/chronon/")) ** "*.java").get() - javaFiles.foreach { file => - println(s"Processing file: ${file.getPath}") - replaceInFile(file) - } - val files = (PathFinder(new File(outputPath)) ** s"*.${Option(extension).getOrElse(language)}").get() - println("\n") - files - } -} diff --git a/project/VersionDependency.scala b/project/VersionDependency.scala deleted file mode 100644 index 37bc4a51f..000000000 --- a/project/VersionDependency.scala +++ /dev/null @@ -1,18 +0,0 @@ -import sbt.librarymanagement.{CrossVersion, ModuleID} -import sbt.librarymanagement.DependencyBuilders.OrganizationArtifactName - -case class VersionDependency(modules: Seq[OrganizationArtifactName], - v11: Option[String], - v12: Option[String], - v13: Option[String]) { - def of(scalaVersion: String): Seq[ModuleID] = { - def applyVersion(v: Option[String]): Seq[ModuleID] = v.map(ver => modules.map(_.%(ver))).getOrElse(Seq.empty) - CrossVersion.partialVersion(scalaVersion) match { - case Some((2, 11)) => applyVersion(v11) - case Some((2, 12)) => applyVersion(v12) - case Some((2, 13)) => applyVersion(v13) - case _ => - throw new RuntimeException(s"Unhandled scala version $scalaVersion for modules ${modules.map(_.toString)}") - } - } -} diff --git a/project/build.properties b/project/build.properties deleted file mode 100644 index 46e43a97e..000000000 --- a/project/build.properties +++ /dev/null @@ -1 +0,0 @@ -sbt.version=1.8.2 diff --git a/project/plugins.sbt b/project/plugins.sbt deleted file mode 100644 index 33f85418d..000000000 --- a/project/plugins.sbt +++ /dev/null @@ -1,13 +0,0 @@ -addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "2.1.1") -addSbtPlugin("org.scalameta" % "sbt-scalafmt" % "2.4.6") -addDependencyTreePlugin -addSbtPlugin("com.github.sbt" % "sbt-pgp" % "2.1.2") -addSbtPlugin("com.github.sbt" % "sbt-release" % "1.0.15") -addSbtPlugin("com.github.sbt" % "sbt-git" % "2.0.0") -addSbtPlugin("com.eed3si9n" % "sbt-buildinfo" % "0.10.0") -addSbtPlugin("io.get-coursier" % "sbt-shading" % "2.1.1") -addSbtPlugin("ch.epfl.scala" % "sbt-scalafix" % "0.12.1") -// related to: https://github.com/sbt/sbt/issues/6997 -ThisBuild / libraryDependencySchemes ++= Seq( - "org.scala-lang.modules" %% "scala-xml" % VersionScheme.Always -) diff --git a/proposals/CHIP-1.md b/proposals/CHIP-1.md deleted file mode 100644 index d2f0de409..000000000 --- a/proposals/CHIP-1.md +++ /dev/null @@ -1,250 +0,0 @@ -# CHIP-1 – Online IR and GetRequest Caching -_By Caio Camatta (Stripe) | Last modified Jan 3, 2024_ - -This CHIP introduces IR caching in Chronon's online/Fetcher side. We're currently trying out caching at Stripe and will update this doc with benchmarks and findings. - -## Motivation - -The primary goal of this CHIP is to decrease Chronon feature serving latency and reduce RPS to KV stores. - -During our latest load tests, we observed that our feature serving app spends 20% of the time performing GET requests and the remaining 80% processing that data and constructing the GroupBy responses. A significant amount of the work necessary to process the data comes from `AvroCodec.decode` (the function that decodes bytes stored in the KV store). This function takes up 28% of total CPU time: ~21.5% is spent decoding batch IRs, and ~6.5% decoding tile IRs. - -We hope to decrease serving latency by - -- Caching the work to decode batch bytes into batch IRs (up to ~21.5% of CPU). -- Caching the work to decode streaming bytes into tile IRs (up to ~6.5% of CPU). -- Caching KV store requests (up to ~20% of request latency). - -This CHIP does not discuss optimizing to the un-tiled version of Chronon, which is the default. Only the batch caching portion of this CHIP applies to that version. (At Stripe, we use a tiled implementation of Chronon, which we are open-sourcing [#523](https://github.com/airbnb/chronon/pull/523), [#531](https://github.com/airbnb/chronon/pull/523).) - -## Proposed Change - -Here's a diagram of how the Chronon Fetcher works currently. - -![Chronon Fetcher before changes](./images/CHIP-1-current-fetcher-sequence.png) -_Simplified Chronon Fetcher before proposed changes. The numbers represent what we will change in each Step of the Implementation ("2" = "Step 2")_. - -We will be caching four different operations: - - Performing streaming GET requests - - Performing batch GET requests - - Avro decoding of streaming bytes - - Avro decoding of batch bytes - -To do that, I’m proposing we use two types of Caffeine caches to store: - -- Key: batch get requests → Value: batch IRs -- Key: streaming get requests → Value: streaming IRs - -After this CHIP is implemented, the Fetcher would work the following way: - -![Chronon Fetcher after changes](./images/CHIP-1-new-fetcher-sequence.png) -_Simplified Chronon Fetcher after this CHIP._ - -For reference, here's how data is currently stored in the KV store on the tiled version of Chronon: - -![Chronon Fetcher after changes](./images/CHIP-1-data-in-kv-store.png) -_Data stored in the KV store._ - - -The caches will be configured on a per-GroupBy basis, i.e. two caches per GroupBy. This allows us to enable caching only for features with very skewed access patterns (when the top few keys correspond to a significant percentage of traffic). See _Rejected Alternative #4_ and _UX Considerations_ for more details. - -Caching will be an opt-in feature that can be enabled by Chronon developers. - -Most of the code changes are in [FetcherBase.scala](https://github.com/airbnb/chronon/blob/main/online/src/main/scala/ai/chronon/online/FetcherBase.scala). - -### Batch Caching Details - -This cache will consist of: - -- Key: a combination of (`batchDataset`, `keyBytes`, `batchEndTsMillis`, "latest batch data landing time"). -- Value: either `FinalBatchIr` (for the [temporal accuracy code path](https://github.com/airbnb/chronon/blob/f786ab9ce9314bc09495499765cfaddd0f4ef5a7/online/src/main/scala/ai/chronon/online/FetcherBase.scala#L83C4-L83C4)) or `Map[String, AnyRef]` (for the [no-agg and snapshot accuracy code path](https://github.com/airbnb/chronon/blob/f786ab9ce9314bc09495499765cfaddd0f4ef5a7/online/src/main/scala/ai/chronon/online/FetcherBase.scala#L79)). - -#### Populating and using the cache - -Every time `FetcherBase.toBatchIr` is called, we check if the result is already cached. If it’s not, we run the function and store it in cache. Caffeine takes care of storing only the most used values. - -Then, before making a batch GET request, we check if it is already in cache. If so, we don’t make the KV store request. - -#### Keeping GroupByServingInfo up-to-date - -`GroupByServingInfo` contains information about a GroupBy, such as how much batch data is available (i.e. `batchEndTsMillis`). When the Chronon Fetcher receives a request, it needs to know in advance how much batch data is available so it can split the window into batch and streaming -- `GroupByServingInfo` is used for that. - -Currently, every time we perform a batch KV store request, we also get back the latest `batchEndTsMillis`. Then, in `FetcherBase`, we update the serving info if the new `batchEndTsMillis` is ahead of the current one stored in memory (i.e. new batch data has landed!) – see `updateServingInfo`. - -Once we start caching batch `GetRequest`s, it could happen that no batch KV store requests are made, and we never receive the updated `batchEndTsMillis` after batch data lands (or delay it). To address that, the following change is necessary: we will call `MetadataStore.refresh` once for every `FetcherBase.constructGroupByResponse` call. - -In practice, this means that `groupByServingInfo` will be updated at most 8 seconds after new batch data has landed. When new batch data lands, the `groupByServingInfo.batchEndTsMillis` will be updated and we will start using the new batch data right away. - - -#### Cache invalidation (edge case) - -The "latest batch data landing time" used in keys is essential for cache invalidation. - -The `batchEndTsMillis` portion of the key indicates the timestamp until which we have batch data. However, we also need to know when that batch data landed. If a user re-runs their `GroupByUpload` airflow task, the `batchEndTsMillis` wouldn’t change but the underlying data could. So, we keep track of the time at which your batch data job landed. - -We'll also add `batchDataLandingTime` to `GroupByServingInfo` and have it be populated by batch jobs. - -### Streaming Caching Details - -(This only applies to `GroupBys` that are scheduled online / use `TEMPORAL` accuracy.) - -This cache will consist of: - -- Key: a combination of (`streamingDataset`, `keyBytes`, `batchEndTsMillis`). -- Value: (`TiledIr`, "`streamingCacheEndTsMillis`") - -#### Populating and using the cache - -When a streaming GetRequest is performed, it may return a number of tiles. For example, a request for the range [0:00, 3:00) could return three tiles, [0:00, 1:00) + [1:00, 2:00) + [2:00, 3:00). Here, `batchEndTsMillis` = 0:00. Each tile contains a field called `isComplete`, which indicates whether that tile is complete. This field is set to True when the event-processing side of Chronon (Flink, in our case) decides that a tile is completed and will no longer be changed. - -If consecutive tiles starting from `batchEndTsMillis` are all completed, then we combine them into a single TiledIr and cache them with `streamingCacheEndTsMillis` set to the end time of the last cached tile. Caffeine takes care of storing only the most used values. - -- Example: if [0:00, 1:00) and [1:00, 2:00) are both complete, we will cache a single TiledIr for the range [0:00, 2:00) with `streamingCacheEndTsMillis` = 2:00. - -- GroupBys with windows shorter than one day should be handled slightly differently so caching works. - -Then, when creating a streaming GetRequest, we check if that (`streamingDataset`, `keyBytes`, ``batchEndTsMillis``) is in cache. If so, we modify the `batchEndTsMillis` of the outgoing GetRequest to be `streamingCacheEndTsMillis`. So, for example, if it’s 17:00 UTC, and your cache contains streaming data for [0:00, 13:00), we modify the `GetRequest` to fetch only [13:00, ...). This reduces the number of tiles that need to be fetched. - -Cache invalidation is not necessary on the online side. If a tile is marked as complete, it is not expected to change. Cached values would only need to be invalidated if you changed your GroupBy definition and restarted Flink without state. If that’s the case, you would also need to restart your feature serving app which would then restart the cache. - - -#### Gaps aren’t cached - -If, for whatever reason, there are any gaps in completed tiles during the day, caching won’t occur. For example, if a `GetRequest` returns three tiles, [0:00, 1:00), [1:00, 2:00), [2:00, 3:00), and only [1:00, 2:00) is completed, we won’t cache nor set `streamingCacheEndTsMillis` to 2:00. - -### UX Considerations - -(We are curious to hear others’ opinions on UX.) - -There are some UX aspects to consider if we are configuring caches on a per-GroupBy basis, as we certainly don’t want every user to have to think about caching when creating features. - -First, there should be a default cache size. This size might be 0, i.e. disabled by default, or something small (say, 50 MB). New users, or users without stringent latency requirements will never have to think about caching. - -For more advanced users, with stringent latency requirements, we (the developers) can help them configure/enable caching. -- For starters, this could be done via a parameter in a GroupBy’s `customJson`, e.g. `fetcher_cache_size` = 100 MB or 10,000 elements. It could also be simplified to a boolean, e.g. `enable_fetcher_caching`. This can potentially become part of the GroupBy API later on. -- To correctly size a cache and estimate the hit rate, we need knowledge of the access patterns. So we will work with users to figure out what makes sense. - - - -## Implementation - -Within Stripe, we are implementing and testing these changes incrementally so we can measure the effect of each of them. Once we have developed and tested all the different Steps listed here, and assuming the CHIP is accepted, we will open source the changes as two PRs (one for batch IR caching, and one for tile IR caching). Chronon users can then opt-in to one or both the caching strategies. - -### Step 0: Add caching library (Caffeine) - -For the caching library, we'll use Caffeine. It's a rewrite of Google’s Guava, and it's very popular. - -In this step, we add Caffeine as a dependency and set up: - -- A `Cache` class that can be used for all the following steps -- Cache metrics -- Unit tests - -The size of the cache should ideally be set in terms of maximum memory usage (e.g., 2GB) instead of in terms of maximum number of elements. Estimating memory usage is a [tricky problem](https://stackoverflow.com/questions/258120/what-is-the-memory-consumption-of-an-object-in-java?noredirect=1&lq=1) and [not something Caffeine provides out-of-the-box](https://stackoverflow.com/questions/73139235/how-to-set-maximum-memory-usage-in-caffeine#comment129179258_73139235). To achieve that, we can use the [Java Instrumentation library](https://docs.oracle.com/javase/8/docs/api/java/lang/instrument/package-summary.html) or [JAMM](https://github.com/jbellis/jamm), a library which is [commonly used alongside Caffeine](https://openjdk.org/jeps/8249196#:~:text=JAMM%20is%20routinely%20used%20with%20Caffeine%20to%20weigh%20the%20cache%20entries). If that proves difficult and we must stick with a maximum number of elements, the creator of Caffeine suggests sizing by [guessing, measuring, and repeating](https://stackoverflow.com/questions/39503105/caffeine-how-to-come-up-with-an-appropriate-cache-size#:~:text=best%20answer%20for%20sizing%20is%20to%20guess%2C%20measure%2C%20and%20repeat). - -### Step 1: BatchIr Caching - -We start by caching the conversion from `batchBytes` to `FinalBatchIr` (the [toBatchIr function in FetcherBase](https://github.com/airbnb/chronon/blob/main/online/src/main/scala/ai/chronon/online/FetcherBase.scala#L102)) and `Map[String, AnyRef]`. - -To make testing easier, we'll disable this feature by default and enable it via Java Args. - -Results: I tested this in production and saw a 22-35% decrease in serving latency depending on configuration. I used a realistic load test, served 10-15 GroupBys which used 4 different entity key types (some had skewed a access pattern, some didn't), a 20K-element cache shared across all GroupBys. - -### Step 2: Batch GetRequest Caching - -In this step, we will: - -- Add logic to stop sending batch GET requests when that request’s data is available in cache. - - Detail: in code, we will 1) check if the `GetRequest` is cached 2) if so, store it in a variable in memory so it’s not lost due to race conditions 3) make any necessary, uncached get requests 4) use the cached values -- Start using `MetadataStore.refresh` so `GroupByInfoParsed` is kept up-to-date regardless of whether we are querying the batch KV store. - -For the first point, in code, we will 1) check if the `GetRequest` is cached 2) if so, store it in a variable in memory so it's not lost due to race conditions 3) make any necessary, uncached get requests 4) use the cached values. - -We won't worry add about edge case invalidation just yet (the aforementioned "latest batch data landing time" stuff). - -Results: will add - -### Step 3: `TiledIr` Caching - -The second step is caching [tile bytes to TiledIr](https://github.com/airbnb/chronon/blob/main/online/src/main/scala/ai/chronon/online/TileCodec.scala#L77C67-L77C67). This is only possible if the tile bytes contain information about whether a tile is complete (i.e. it won’t be updated anymore). The Flink side marks tiles as complete. - -This cache can be "monoid-aware". Instead of storing multiple consecutive tiles for a given time range, we combine the tiles and store a single, larger tile in memory. For example, we combine two tiles, [0, 1) and [1, 2), into one, [0, 2). - -Results: will add - -### Step 4: Streaming GetRequest Caching. - -Add the rest of the logic described in "Streaming Caching Details" so that the `batchEndTsMillis` in the outgoing GetRequest is modified and the KV store ends up fetching fewer tiles. - -Results: will add - -### Step 5: Final Polishing -The final step is to -- Add memory-based cache size -- Handle edge cases; add batchDataLandingTime stuff. -- Add per-GroupBy caching (we might actually do this earlier on) - - -## Rejected Alternatives - -### Rejected Alternative #1: Defer `GetRequest` caching to specific KV Stores - -In the Proposed Changes, I am suggesting that we add the logic for caching `GetRequest`s within Chronon. This alternative would be to -- Cache Avro conversions from batch bytes to `FinalBatchIr`/`Map[String, AnyRef]` streaming bytes to `TiledIr`s in Chronon. -- Do not cache anything related to `GetRequest`s in Chronon. - -The advantage of this alternative is that it keeps the changes to FetcherBase very simple. - -The issue is that caching Avro conversions may save us a good amount of CPU (say, up to 28% in our example at the top of the document), but it wouldn’t save any time fetching from the KV stores. Additionally, Chronon contains the knowledge of whether a tile/batch ir can be cached, so it makes sense for it to make the decision of whether to cache or not. - -Ultimately, I’m rejecting this alternative because the Proposed Changes will cache GetRequests in a way that is applicable to everyone regardless of KV store. If Chronon developers want, they could still add their own layer of caching in their KV store. - - -### Rejected Alternative #2: Use a single cache for both batch and streaming - - -In the Proposed Changes, the idea is to have two separate caches, one for streaming and one for batching. This allows developers to separately tune them based on their specific benchmarks. - -The alternative would be to use a single cache with `GetRequest`s as keys (or something similar) and `Avro.GenericRecord`s as values (or something similar). - -This alternative is conceptually simpler – one cache instead of two – but - -- It is not as tuneable. For example, for some users, the batch fetching and decoding work may take up the majority of time, so they would likely want their batch cache to be larger than their streaming cache. - -- We would still need to parse `Avro.GenericRecord` into `FinalBatchIr` and `TiledIr`. This doesn’t take up a huge amount of time but is worth noting. - -Personally, I am still on the fence about this one, but leaning towards two caches. Opinions are very welcome. - - -### Rejected Alternative #3: Use one `GetRequest` per tile instead of one per `GroupBy` for streaming `GetRequest` caching - -Currently, we send one GetRequest to our KV store and receive back any number of tiles. For example, a request for the range [0:00, 3:00) could return three tiles, [0:00, 1:00) + [1:00, 2:00) + [2:00, 3:00). - -This is a problem because caching the GetRequest would mean caching all the streaming data, and streaming data changes frequently. In the Proposed Changes, we get around this by adjusting the `batchEndTsMillis` of the GetRequest to be the latest batched tile. That way, if we need [0:00, 3:00) and [0:00, 1:00) + [1:00, 2:00) are cached, we change `batchEndTsMillis` to 2:00 so that only the latest tiles are fetched. - -The alternative rejected here is to refactor FetcherBase to instead create one streaming GetRequest per tile. That way, if a GetRequest is cached, we just don’t send it to the KVstore. - -I am rejecting this alternative because it requires unnecessary refactoring of the FetcherBase and potentially also Chronon developer’s KV store implementation. - - -### Rejected Alternative #4: Share caches across GroupBys - -Instead of two caches (streaming and batch) per GroupBy, we could have two caches for the whole application. - -This approach is simpler, but not optimal. Certain GroupBys require caching more than others. For example, imagine you have a cache of 10,000 elements and two GroupBys with different keys: - -- GroupBy A. On a regular day, the top 10,000 keys correspond to 90% of traffic -- GroupBy B. On a regular day, the top 10,000 keys correspond to 1% of traffic - -It likely makes sense to cache A but not B. A shared cache would be less effective. - - -## New or Changed Public Interfaces - -None - -## Migration Plan and Compatibility - -Online caching is an optional feature that is disabled by default. Chronon users who need to decrease serving latency can enable caching. No migration is required. - -Before enabling caching in production, user/developers should decide on a size for the cache based on their deployment. Setting the size too high may result in significantly increased GC time. diff --git a/proposals/images/CHIP-1-current-fetcher-sequence.png b/proposals/images/CHIP-1-current-fetcher-sequence.png deleted file mode 100644 index 6a9031506..000000000 Binary files a/proposals/images/CHIP-1-current-fetcher-sequence.png and /dev/null differ diff --git a/proposals/images/CHIP-1-data-in-kv-store.png b/proposals/images/CHIP-1-data-in-kv-store.png deleted file mode 100644 index 83f924797..000000000 Binary files a/proposals/images/CHIP-1-data-in-kv-store.png and /dev/null differ diff --git a/proposals/images/CHIP-1-new-fetcher-sequence.png b/proposals/images/CHIP-1-new-fetcher-sequence.png deleted file mode 100644 index 5ba3249af..000000000 Binary files a/proposals/images/CHIP-1-new-fetcher-sequence.png and /dev/null differ diff --git a/quickstart/.env.spark b/quickstart/.env.spark deleted file mode 100644 index f45b19421..000000000 --- a/quickstart/.env.spark +++ /dev/null @@ -1 +0,0 @@ -SPARK_NO_DAEMONIZE=true diff --git a/quickstart/Dockerfile b/quickstart/Dockerfile deleted file mode 100644 index 54a08bd0b..000000000 --- a/quickstart/Dockerfile +++ /dev/null @@ -1,72 +0,0 @@ -# Start from a Debian base image -FROM openjdk:17-jdk - -# Update package lists and install necessary tools -RUN apt-get update && apt-get install -y \ - curl \ - python3 \ - python3-dev \ - python3-setuptools \ - vim \ - wget \ - procps \ - python3-pip - -ENV THRIFT_VERSION 0.13.0 -ENV SCALA_VERSION 2.12.12 - -# Install thrift -RUN curl -sSL "http://archive.apache.org/dist/thrift/$THRIFT_VERSION/thrift-$THRIFT_VERSION.tar.gz" -o thrift.tar.gz \ - && mkdir -p /usr/src/thrift \ - && tar zxf thrift.tar.gz -C /usr/src/thrift --strip-components=1 \ - && rm thrift.tar.gz \ - && cd /usr/src/thrift \ - && ./configure --without-python --without-cpp \ - && make \ - && make install \ - && cd / \ - && rm -rf /usr/src/thrift - -RUN curl https://downloads.lightbend.com/scala/${SCALA_VERSION}/scala-${SCALA_VERSION}.deb -k -o scala.deb && \ - apt install -y ./scala.deb && \ - rm -rf scala.deb /var/lib/apt/lists/* - -ENV SCALA_HOME="/usr/bin/scala" -ENV PATH=${PATH}:${SCALA_HOME}/bin - -## Download spark and hadoop dependencies and install - -# Optional env variables -ENV SPARK_HOME=${SPARK_HOME:-"/opt/spark"} -ENV HADOOP_HOME=${HADOOP_HOME:-"/opt/hadoop"} -ENV SPARK_VERSION=${SPARK_VERSION:-"3.1.1"} -ENV HADOOP_VERSION=${HADOOP_VERSION:-"3.2"} -RUN mkdir -p ${HADOOP_HOME} && mkdir -p ${SPARK_HOME} -WORKDIR ${SPARK_HOME} - - -RUN curl https://archive.apache.org/dist/spark/spark-${SPARK_VERSION}/spark-${SPARK_VERSION}-bin-hadoop${HADOOP_VERSION}.tgz -o spark.tgz \ - && tar xvzf spark.tgz --directory /opt/spark --strip-components 1 \ - && rm -rf spark.tgz - - -# Install python deps -COPY requirements.txt . -RUN pip3 install -r requirements.txt - - -ENV PATH="/opt/spark/sbin:/opt/spark/bin:${PATH}" -ENV SPARK_HOME="/opt/spark" - -COPY conf/spark-defaults.conf "$SPARK_HOME/conf" - -RUN chmod u+x /opt/spark/sbin/* && \ - chmod u+x /opt/spark/bin/* - -ENV PYTHONPATH=$SPARK_HOME/python/:/srv/chronon/:$PYTHONPATH - -# If trying a standalone docker cluster -WORKDIR ${SPARK_HOME} -# If doing a regular local spark box. -WORKDIR /srv/chronon - diff --git a/quickstart/README.md b/quickstart/README.md deleted file mode 100644 index 437409f2f..000000000 --- a/quickstart/README.md +++ /dev/null @@ -1,31 +0,0 @@ -# Chronon Quickstart Docker - -This directory holds Dockerfiles and compose yamls to bring up the quickstart containers for our various cloud environments. - -> **Note** -> Make sure you have `docker >= 20.10` installed. -> Install appropriate java, scala, and python versions following the instructions in [devnotes.md](../devnotes.md#install-appropriate-java-scala-and-python-versions). - -To start, run: -```bash -$ cd chronon -$ sbt clean assembly -$ docker compose -f quickstart/cloud_gcp/gcp-docker-compose.yml up --build -... -load-serving-data-1 | Metadata load completed successfully! -load-serving-data-1 | Done computing and uploading all serving data to BigTable! 🥳 -load-serving-data-1 exited with code 0 -``` - -This compose triggers the spinning up of the fetcher service as well as loading up of groupby upload data + join metadata to the relevant KV store for the chosen cloud env (Bigtable for GCP, DynamoDB for AWS) - -At this point, you can curl the fetcher service from your machine (outside Docker) to get the features for a user: -```bash -$ curl -X POST 'http://localhost:9000/v1/fetch/join/quickstart%2Ftraining_set.v2' -H 'Content-Type: application/json' -d '[{"user_id": "5"}]' -... -``` - -You can also pull up the container shell: -```bash -$ docker compose -f quickstart/cloud_gcp/gcp-docker-compose.yml exec app bash -``` diff --git a/quickstart/conf/spark-defaults.conf b/quickstart/conf/spark-defaults.conf deleted file mode 100644 index c50f281df..000000000 --- a/quickstart/conf/spark-defaults.conf +++ /dev/null @@ -1,10 +0,0 @@ -spark.master local -spark.eventLog.enabled true -spark.eventLog.dir /opt/spark/spark-events -spark.history.fs.logDirectory /opt/spark/spark-events -spark.shuffle.service.enabled true -spark.sql.warehouse.dir /opt/spark/data -spark.hadoop.javax.jdo.option.ConnectionURL jdbc:derby:;databaseName=/opt/spark/data/metastore_db;create=true -spark.sql.catalogImplementation hive -spark.submit.deployMode client -spark.home /opt/spark diff --git a/quickstart/mongo-online-impl/.gitignore b/quickstart/mongo-online-impl/.gitignore deleted file mode 100644 index bddd1888a..000000000 --- a/quickstart/mongo-online-impl/.gitignore +++ /dev/null @@ -1,2 +0,0 @@ -/.bsp/ -target/ diff --git a/quickstart/mongo-online-impl/build.sbt b/quickstart/mongo-online-impl/build.sbt deleted file mode 100644 index 0324cf643..000000000 --- a/quickstart/mongo-online-impl/build.sbt +++ /dev/null @@ -1,20 +0,0 @@ -import Dependencies._ -import sbtassembly.AssemblyPlugin.autoImport._ - -ThisBuild / scalaVersion := "2.12.12" -ThisBuild / version := "0.1.0-SNAPSHOT" -ThisBuild / organization := "ai.chronon" -ThisBuild / organizationName := "Chronon" - -lazy val root = (project in file(".")) - .settings( - name := "mongo-online-impl", - libraryDependencies ++= Seq( - "ai.chronon" %% "api" % "0.0.57", - "ai.chronon" %% "online" % "0.0.57" % Provided, - "org.mongodb.spark" %% "mongo-spark-connector" % "10.2.1", // Batch upload + structured streaming - "org.mongodb.scala" %% "mongo-scala-driver" % "4.8.1", // Fetching - "ch.qos.logback" % "logback-classic" % "1.2.3", - "org.slf4j" % "slf4j-api" % "1.7.32" - ), - ) diff --git a/quickstart/mongo-online-impl/project/Dependencies.scala b/quickstart/mongo-online-impl/project/Dependencies.scala deleted file mode 100644 index 1edb07a72..000000000 --- a/quickstart/mongo-online-impl/project/Dependencies.scala +++ /dev/null @@ -1,5 +0,0 @@ -import sbt._ - -object Dependencies { - lazy val munit = "org.scalameta" %% "munit" % "0.7.29" -} diff --git a/quickstart/mongo-online-impl/project/build.properties b/quickstart/mongo-online-impl/project/build.properties deleted file mode 100644 index e8a1e246e..000000000 --- a/quickstart/mongo-online-impl/project/build.properties +++ /dev/null @@ -1 +0,0 @@ -sbt.version=1.9.7 diff --git a/quickstart/mongo-online-impl/project/plugins.sbt b/quickstart/mongo-online-impl/project/plugins.sbt deleted file mode 100644 index 7bc4622d2..000000000 --- a/quickstart/mongo-online-impl/project/plugins.sbt +++ /dev/null @@ -1 +0,0 @@ -addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "1.1.0") diff --git a/quickstart/mongo-online-impl/src/main/scala/ai/chronon/quickstart/online/ChrononMongoOnlineImpl.scala b/quickstart/mongo-online-impl/src/main/scala/ai/chronon/quickstart/online/ChrononMongoOnlineImpl.scala deleted file mode 100644 index 4a91d6b36..000000000 --- a/quickstart/mongo-online-impl/src/main/scala/ai/chronon/quickstart/online/ChrononMongoOnlineImpl.scala +++ /dev/null @@ -1,41 +0,0 @@ -package ai.chronon.quickstart.online - -import ai.chronon.online.{ - Api, - ExternalSourceRegistry, - GroupByServingInfoParsed, - KVStore, - LoggableResponse, - StreamDecoder -} - -import org.mongodb.scala._ -import org.slf4j.{Logger, LoggerFactory} - -class ChrononMongoOnlineImpl(userConf: Map[String, String]) extends Api(userConf) { - - @transient lazy val registry: ExternalSourceRegistry = new ExternalSourceRegistry() - - @transient val logger: Logger = LoggerFactory.getLogger("ChrononMongoOnlineImpl") - - @transient lazy val mongoClient = MongoClient(s"mongodb://${userConf("user")}:${userConf("password")}@${userConf("host")}:${userConf("port")}") - override def streamDecoder(groupByServingInfoParsed: GroupByServingInfoParsed): StreamDecoder = - new QuickstartMutationDecoder(groupByServingInfoParsed) - - - override def genKvStore: KVStore = new MongoKvStore(mongoClient, Constants.mongoDatabase) - - - @transient lazy val loggingClient = mongoClient.getDatabase(Constants.mongoDatabase).getCollection(Constants.mongoLoggingCollection) - override def logResponse(resp: LoggableResponse): Unit = - loggingClient.insertOne(Document( - "joinName" -> resp.joinName, - "keyBytes" -> resp.keyBytes, - "schemaHash" -> Option(resp.schemaHash).getOrElse("SCHEMA_PUBLISHED"), - "valueBytes" -> resp.valueBytes, - "atMillis" -> resp.tsMillis, - "ts" -> System.currentTimeMillis(), - )).toFuture() - - override def externalRegistry: ExternalSourceRegistry = registry -} diff --git a/quickstart/mongo-online-impl/src/main/scala/ai/chronon/quickstart/online/Constants.scala b/quickstart/mongo-online-impl/src/main/scala/ai/chronon/quickstart/online/Constants.scala deleted file mode 100644 index a43e837c7..000000000 --- a/quickstart/mongo-online-impl/src/main/scala/ai/chronon/quickstart/online/Constants.scala +++ /dev/null @@ -1,11 +0,0 @@ -package ai.chronon.quickstart.online - -object Constants { - val tableKey = "key_bytes" - val tableValue = "value_bytes" - val mongoKey = "keyBytes" - val mongoValue = "valueBytes" - val mongoTs = "ts" - val mongoDatabase = "chronon" - val mongoLoggingCollection = "logging" -} diff --git a/quickstart/mongo-online-impl/src/main/scala/ai/chronon/quickstart/online/MongoKvStore.scala b/quickstart/mongo-online-impl/src/main/scala/ai/chronon/quickstart/online/MongoKvStore.scala deleted file mode 100644 index c559a0d2a..000000000 --- a/quickstart/mongo-online-impl/src/main/scala/ai/chronon/quickstart/online/MongoKvStore.scala +++ /dev/null @@ -1,55 +0,0 @@ -package ai.chronon.quickstart.online - -import ai.chronon.online.KVStore -import ai.chronon.online.KVStore._ -import org.mongodb.scala._ -import org.mongodb.scala.model.Filters._ -import scala.concurrent.Future -import scala.util.{Failure, Success, Try} -import java.util.Base64 - - -/** - * A KVStore implementation backed by MongoDB. - * Databases : [dataset]_realtime, [dataset]_batch. - * - */ -class MongoKvStore(mongoClient: MongoClient, databaseName: String) extends KVStore { - - override def create(dataset: String): Unit = mongoClient.getDatabase(databaseName).createCollection(dataset) - - override def multiGet(requests: Seq[GetRequest]): Future[Seq[GetResponse]] = { - val futures = requests.map { request => - val collection = mongoClient.getDatabase(databaseName).getCollection(request.dataset) - val filter = equal(Constants.mongoKey, request.keyBytes) - collection.find(filter).limit(1).toFuture().map { documents => - if (documents.isEmpty) { - GetResponse(request, Failure(new NoSuchElementException("Key not found"))) - } else { - GetResponse(request, Try( - documents.map(document => - TimedValue( - document.get(Constants.mongoValue).get.asBinary().getData, - System.currentTimeMillis()) - ))) - } - } - } - Future.sequence(futures) - } - - // Move to insertMany grouped by dataset. - override def multiPut(putRequests: Seq[PutRequest]): Future[Seq[Boolean]] = { - val futures = putRequests.map { putRequest => - val collection = mongoClient.getDatabase(databaseName).getCollection(putRequest.dataset) - val document = Document( - Constants.mongoKey -> putRequest.keyBytes, - Constants.mongoValue -> putRequest.valueBytes, - Constants.mongoTs-> putRequest.tsMillis) - collection.insertOne(document).toFuture().map(_ => true).recover { case _ => false } - } - Future.sequence(futures) - } - - override def bulkPut(sourceOfflineTable: String, destinationOnlineDataSet: String, partition: String): Unit = ??? -} diff --git a/quickstart/mongo-online-impl/src/main/scala/ai/chronon/quickstart/online/MongoLoggingDumper.scala b/quickstart/mongo-online-impl/src/main/scala/ai/chronon/quickstart/online/MongoLoggingDumper.scala deleted file mode 100644 index e1232a5b3..000000000 --- a/quickstart/mongo-online-impl/src/main/scala/ai/chronon/quickstart/online/MongoLoggingDumper.scala +++ /dev/null @@ -1,55 +0,0 @@ -package ai.chronon.quickstart.online - -import org.apache.spark.sql.SparkSession - - -/** - * Dump mongo collection to hive. - * Part of log flattening / OOC pattern - */ -object MongoLoggingDumper { - def main(args: Array[String]): Unit = { - if (args.length != 2) { - println("Usage: MongoLoggingDumper ") - sys.exit(1) - } - val tableName = args(0) - val uri = args(1) - - val spark = SparkSession.builder() - .appName(s"MongoLoggingDumper") - .config("spark.mongodb.read.connection.uri", uri) - .getOrCreate() - - val df = spark.read - .format("mongodb") - .option("database", Constants.mongoDatabase) // Replace with your MongoDB database name - .option("collection", Constants.mongoLoggingCollection) - .load() - - df.createOrReplaceTempView("temp_view") - df.printSchema() - - val transformedDF = spark.sql( - s""" - | SELECT - | schemaHash AS schema_hash, - | BASE64(keyBytes) AS key_base64, - | BASE64(valueBytes) AS value_base64, - | atMillis AS ts_millis, - | ts AS ts, - | joinName AS name, - | FROM_UNIXTIME(ts / 1000, 'yyyy-MM-dd') AS ds - | FROM temp_view - | """.stripMargin) - transformedDF.printSchema() - - transformedDF.write - .partitionBy("ds", "name") - .format("parquet") - .mode("overwrite") - .saveAsTable(tableName) - - spark.stop() - } -} diff --git a/quickstart/mongo-online-impl/src/main/scala/ai/chronon/quickstart/online/QuickstartMutationDecoder.scala b/quickstart/mongo-online-impl/src/main/scala/ai/chronon/quickstart/online/QuickstartMutationDecoder.scala deleted file mode 100644 index b7767dec5..000000000 --- a/quickstart/mongo-online-impl/src/main/scala/ai/chronon/quickstart/online/QuickstartMutationDecoder.scala +++ /dev/null @@ -1,88 +0,0 @@ -package ai.chronon.quickstart.online - -import ai.chronon.api -import ai.chronon.api.Extensions.{GroupByOps, SourceOps} -import ai.chronon.api.{StructField, StructType} -import ai.chronon.online.{GroupByServingInfoParsed, Mutation, StreamDecoder} -import java.util.HashSet - -/** - * We build a convention that for events (immutable) topic starts with 'event.' For mutable topics, it starts with 'mutation.' - * Similarly we accept that for events and mutations we follow the schema as data loader and the topic data is csv. - * - * For mutations we require three additional columns, to be implemented later. - */ -class QuickstartMutationDecoder(groupByServingInfoParsed: GroupByServingInfoParsed) extends StreamDecoder { - private val eventPrefix = "events." - val groupByConf: api.GroupBy = groupByServingInfoParsed.groupBy - private val source = { - val opt = groupByConf.streamingSource - assert(opt.isDefined, "A valid streaming source (with topic) can't be found") - opt.get - } - - val eventDecoder: EventDecoder = { - val fields = source.topicTokens("fields").split(",") - if (source.topic.startsWith(eventPrefix)) { - new StreamingEventDecoder(fields) - } else { - new CDCDecoder(fields) - } - } - - override def decode(bytes: Array[Byte]): Mutation = eventDecoder.decode(bytes).orNull - - override def schema: StructType = eventDecoder.schema - -} - -trait EventDecoder extends Serializable { - def schema: StructType - def decode(bytes: Array[Byte]): Option[Mutation] -} - -class StreamingEventDecoder(fields: Array[String]) extends EventDecoder { - override def schema: StructType = StructType("event", - fields.map { columnName => - val dataType = columnName match { - case name if name.endsWith("ts") => api.LongType - case name if name.endsWith("_price") || name.endsWith("_amt") => api.LongType - case _ => api.StringType - } - StructField(columnName, dataType) - } - ) - - /** - * Receive a csv string and convert it to a mutation. - */ - override def decode(bytes: Array[Byte]): Option[Mutation] = { - val csvRow = new String(bytes) - val values: Array[Any] = csvRow.split(",").zip(schema).map { - case (value, field) => - // Convert the string value to the appropriate data type based on the schema - if (value == null || value.isEmpty || value == "") null - else field.fieldType match { - case api.LongType => value.toLong - case _ => value - } - } - Some(Mutation(schema, null, values)) - } -} - -class CDCDecoder(fields: Array[String]) extends EventDecoder { - - val mutationColumns = Array("__mutationTs", "__mutationType") - override def schema: StructType = StructType("mutation", - (fields ++ mutationColumns).map { columnName => - val dataType = columnName match { - case name if name.endsWith("ts") => api.LongType - case name if name.endsWith("_price") || name.endsWith("_amt") => api.LongType - case _ => api.StringType - } - StructField(columnName, dataType) - } - ) - override def decode(bytes: Array[Byte]): Option[Mutation] = ??? -} diff --git a/quickstart/mongo-online-impl/src/main/scala/ai/chronon/quickstart/online/Spark2MongoLoader.scala b/quickstart/mongo-online-impl/src/main/scala/ai/chronon/quickstart/online/Spark2MongoLoader.scala deleted file mode 100644 index 2b6ceb015..000000000 --- a/quickstart/mongo-online-impl/src/main/scala/ai/chronon/quickstart/online/Spark2MongoLoader.scala +++ /dev/null @@ -1,44 +0,0 @@ -package ai.chronon.quickstart.online -import org.apache.spark.sql.SparkSession -import ai.chronon.api.{Constants => ApiConstants} - -object Spark2MongoLoader { - def main(args: Array[String]): Unit = { - if (args.length != 2) { - println("Usage: TableDataLoader ") - sys.exit(1) - } - - val tableName = args(0) - val dataset = tableName match { - case tableName if tableName.endsWith("_logged_daily_stats_upload") => ApiConstants.LogStatsBatchDataset - case tableName if tableName.endsWith("_daily_stats_upload") => ApiConstants.StatsBatchDataset - case tableName if tableName.endsWith("_consistency_upload") => ApiConstants.ConsistencyMetricsDataset - case tableName if tableName.endsWith("_upload") => tableName.stripSuffix("_upload").split("\\.").lastOption.getOrElse(tableName).toUpperCase + "_BATCH" - case _ => tableName.toUpperCase + "_BATCH" - } - val uri = args(1) - - val spark = SparkSession.builder() - .appName(s"Spark2MongoLoader-${tableName}") - .config("spark.mongodb.write.connection.uri", uri) - .getOrCreate() - - val baseDf = spark.read.table(tableName) - val timeColumn = if (baseDf.columns.contains("ts")) "ts" else "UNIX_TIMESTAMP(DATE_ADD(ds, 0)) * 1000" - - val df = spark.sql(s""" - | SELECT - | ${Constants.tableKey} AS ${Constants.mongoKey}, - | ${Constants.tableValue} AS ${Constants.mongoValue}, - | $timeColumn AS ${Constants.mongoTs} - | FROM $tableName""".stripMargin) - df.show() - df.write - .format("mongodb") - .mode("overwrite") - .option("database", Constants.mongoDatabase) - .option("collection", dataset) - .save() - } -} \ No newline at end of file diff --git a/quickstart/mongo-online-impl/src/main/scala/resources/logback.xml b/quickstart/mongo-online-impl/src/main/scala/resources/logback.xml deleted file mode 100644 index 588ab469c..000000000 --- a/quickstart/mongo-online-impl/src/main/scala/resources/logback.xml +++ /dev/null @@ -1,12 +0,0 @@ - - - - [%date] {%logger{0}} %level - %message%n - - - - - - - - diff --git a/quickstart/requirements.txt b/quickstart/requirements.txt deleted file mode 100644 index 51b797526..000000000 --- a/quickstart/requirements.txt +++ /dev/null @@ -1,3 +0,0 @@ -jupyter -chronon-ai -tornado>=6.4.2 # not directly required, pinned by Snyk to avoid a vulnerability diff --git a/roles.png b/roles.png deleted file mode 100644 index 2fe4bca1c..000000000 Binary files a/roles.png and /dev/null differ diff --git a/quickstart/cloud_gcp/scripts/fetcher_launch.sh b/scripts/cloud_gcp_quickstart/fetcher_launch.sh similarity index 100% rename from quickstart/cloud_gcp/scripts/fetcher_launch.sh rename to scripts/cloud_gcp_quickstart/fetcher_launch.sh diff --git a/quickstart/cloud_gcp/gcp-docker-compose.yml b/scripts/cloud_gcp_quickstart/gcp-docker-compose.yml similarity index 97% rename from quickstart/cloud_gcp/gcp-docker-compose.yml rename to scripts/cloud_gcp_quickstart/gcp-docker-compose.yml index 92132138f..805662b85 100644 --- a/quickstart/cloud_gcp/gcp-docker-compose.yml +++ b/scripts/cloud_gcp_quickstart/gcp-docker-compose.yml @@ -26,7 +26,7 @@ services: load-serving-data: build: context: ../.. - dockerfile: Dockerfile + dockerfile: ../../Dockerfile depends_on: - bigtable-init environment: @@ -51,7 +51,7 @@ services: app: build: context: ../.. - dockerfile: Dockerfile + dockerfile: ../../Dockerfile depends_on: - load-serving-data - statsd diff --git a/quickstart/cloud_gcp/scripts/load_data.sh b/scripts/cloud_gcp_quickstart/load_data.sh similarity index 100% rename from quickstart/cloud_gcp/scripts/load_data.sh rename to scripts/cloud_gcp_quickstart/load_data.sh diff --git a/distribution/build_and_upload_artifacts.sh b/scripts/distribution/build_and_upload_artifacts.sh similarity index 100% rename from distribution/build_and_upload_artifacts.sh rename to scripts/distribution/build_and_upload_artifacts.sh diff --git a/distribution/publish_docker_images.sh b/scripts/distribution/publish_docker_images.sh similarity index 100% rename from distribution/publish_docker_images.sh rename to scripts/distribution/publish_docker_images.sh diff --git a/distribution/run_zipline_quickstart.py b/scripts/distribution/run_zipline_quickstart.py similarity index 100% rename from distribution/run_zipline_quickstart.py rename to scripts/distribution/run_zipline_quickstart.py diff --git a/distribution/run_zipline_quickstart.sh b/scripts/distribution/run_zipline_quickstart.sh similarity index 100% rename from distribution/run_zipline_quickstart.sh rename to scripts/distribution/run_zipline_quickstart.sh diff --git a/vote_tally.png b/vote_tally.png deleted file mode 100644 index 64ed9234d..000000000 Binary files a/vote_tally.png and /dev/null differ