Skip to content

Add support cache for dynamic spec #6372

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 8 commits into
base: master
Choose a base branch
from

Conversation

popojk
Copy link
Contributor

@popojk popojk commented Mar 25, 2025

Tracking issue

Closes #5543

Why are the changes needed?

When executing a dynamic workflow in the Flyte backend, compiling the future.pb file before sub-node execution takes some time. If an out-of-memory (OOM) error occurs during sub-node execution, re-running the dynamic workflow requires recompiling the future.pb file, consuming additional time and computational resources. To address this, we propose caching the compiled future.pb file.

The future cache mechanism in this PR stores the compiled file as artifact data in Flyte storage via the datacatalog. The cache key, TagName, is determined by the hash of the input values. The diagrams below illustrate the data flow in the backend system:
截圖 2025-03-25 上午11 10 21

截圖 2025-03-25 上午11 11 27

What changes were proposed in this pull request?

Please refer to bellow diagram for understanding the procedure of how future read/write:
截圖 2025-03-25 上午11 13 07

key changes:
1.Added Mode to TaskTemplate Metadata in flyteidl to indicate whether a task is a DynamicTask.
2.Implemented logic in flytepropeller to send cache read/write requests to datacatalog.
3.Added cache read/write functionality to the storage layer in datacatalog.

How was this patch tested?

To test how much time it saved when future file cache hit, we modify this block of code to count the time spend to run taskNodeHandler.Handle for test purpose.

startTime := time.Now()
   if !isDynamic || cacheStatus == nil || cacheStatus.GetCacheStatus() != core.CatalogCacheStatus_CACHE_HIT {
       var err error
       taskTrns, err := d.TaskNodeHandler.Handle(ctx, nCtx)
       trns = &taskTrns
       if err != nil {
           logger.Debug(ctx, "Failed to compile dynamic workflow")
           return *trns, prevState, err
       }
   }
   if isDynamic {
       duration := time.Since(startTime)
       logger.Infof(ctx, "Dynamic node compilation took %v", duration)
   }

To mimic OOM, we will throw panic in dynamicNodeHandler. So the Dynamic workflow will be aborted after future file compiled

func (d dynamicNodeTaskNodeHandler) handleDynamicSubNodes(ctx context.Context, nCtx interfaces.NodeExecutionContext, prevState handler.DynamicNodeState) (handler.Transition, handler.DynamicNodeState, error) {
   panic("throw panic here to mimic OOM")
   # existing code…

then we run bellow test dynamic workflow for the first time

import typing
from flytekit import dynamic, task, workflow

@task
def return_index(character: str) -> int:
   """
   Computes the character index (which needs to fit into the 26 characters list)
   """
   if character.islower():
       return ord(character) - ord("a")
   else:
       return ord(character) - ord("A")
  
@task
def update_list(freq_list: typing.List[int], list_index: int) -> typing.List[int]:
   """
   Notes the frequency of characters
   """
   freq_list[list_index] += 1
   return freq_list


@task
def derive_count(freq1: typing.List[int], freq2: typing.List[int]) -> int:
   """
   Derives the number of common characters
   """
   count = 0
   for i in range(26):
       count += min(freq1[i], freq2[i])
   return count


@dynamic(cache_version="4", cache=True)
def count_characters(s1: str, s2: str) -> int:
   """
   Calls the required tasks and returns the final result
   """

   # s1 and s2 are accessible
   # initiliaze an empty list consisting of 26 empty slots corresponding to every alphabet (lower and upper case)
   freq1 = [0] * 26
   freq2 = [0] * 26
  
   # looping through the string s1
   for i in range(len(s1)):
       # index and freq1 are not accesible as they are promises
       index = return_index(character=s1[i])
       freq1 = update_list(freq_list=freq1, list_index=index)


   # looping through the string s2
   for i in range(len(s2)):
       # index and freq2 are not accesible as they are promises
       index = return_index(character=s2[i])
       freq2 = update_list(freq_list=freq2, list_index=index)

   # counting the common characters
   return derive_count(freq1=freq1, freq2=freq2)

@workflow
def wf(s1: str, s2: str) -> int:
   """
   Calls the dynamic workflow and returns the result
   """

   # sending two strings to the workflow
   return count_characters(s1=s1, s2=s2)

We use the command pyflyte run --remote dynamic.py wf --s1="dynamic" --s2="test"

The first run took around 17 ms to compile future file

Then we remove the panic throwing and re-execute the same workflow with the same input. The future cache hit and it only took 83ns to go through the taskNodeHandler.Handle code block, which means the taskNodeHandler.Handle is skipped.

Moreover, unit tests has been added to make sure cache behaviors.

Labels

Please add one or more of the following labels to categorize your PR:

  • added: For new features.

This is important to improve the readability of release notes.

Check all the applicable boxes

  • All new and existing tests passed.
  • All commits are signed-off.

Related PRs

flytekit PR

popojk added 3 commits March 25, 2025 09:38
Signed-off-by: Alex Wu <[email protected]>
Signed-off-by: Alex Wu <[email protected]>
@flyte-bot
Copy link
Collaborator

flyte-bot commented Mar 25, 2025

Code Review Agent Run Status

  • Limitations and other issues: ❌ Failure - Bito Code Review Agent didn't review this pull request automatically because it exceeded the size limit. No action is needed if you didn't intend for the agent to review it. Otherwise, you can initiate the review by typing /review in a comment below.

Copy link

codecov bot commented Mar 25, 2025

Codecov Report

Attention: Patch coverage is 69.55530% with 267 lines in your changes missing coverage. Please review.

Project coverage is 58.59%. Comparing base (8f02ac6) to head (5e8e14d).

Files with missing lines Patch % Lines
...atalog/pkg/manager/impl/future_artifact_manager.go 64.55% 75 Missing and 20 partials ⚠️
...ontroller/nodes/catalog/datacatalog/datacatalog.go 73.39% 42 Missing and 16 partials ⚠️
...ytepropeller/pkg/controller/nodes/dynamic/cache.go 72.82% 36 Missing and 14 partials ⚠️
...epropeller/pkg/controller/nodes/dynamic/handler.go 74.07% 21 Missing and 7 partials ⚠️
...atacatalog/pkg/manager/impl/artifact_data_store.go 59.09% 6 Missing and 3 partials ⚠️
datacatalog/pkg/rpc/datacatalogservice/service.go 0.00% 7 Missing ⚠️
...atacatalog/pkg/repositories/transformers/future.go 75.00% 4 Missing and 2 partials ⚠️
...eller/pkg/controller/nodes/catalog/noop_catalog.go 0.00% 6 Missing ⚠️
...ontroller/nodes/catalog/datacatalog/transformer.go 85.00% 2 Missing and 1 partial ⚠️
...peller/pkg/controller/nodes/dynamic/transformer.go 82.35% 2 Missing and 1 partial ⚠️
... and 2 more
Additional details and impacted files
@@            Coverage Diff             @@
##           master    #6372      +/-   ##
==========================================
+ Coverage   58.48%   58.59%   +0.10%     
==========================================
  Files         940      944       +4     
  Lines       71584    72442     +858     
==========================================
+ Hits        41867    42444     +577     
- Misses      26534    26750     +216     
- Partials     3183     3248      +65     
Flag Coverage Δ
unittests-datacatalog 59.58% <63.35%> (+0.55%) ⬆️
unittests-flyteadmin 56.20% <ø> (-0.06%) ⬇️
unittests-flytecopilot 30.99% <ø> (ø)
unittests-flytectl 64.72% <ø> (ø)
unittests-flyteidl 76.12% <ø> (ø)
unittests-flyteplugins 60.95% <ø> (ø)
unittests-flytepropeller 55.24% <73.15%> (+0.46%) ⬆️
unittests-flytestdlib 64.02% <ø> (-0.02%) ⬇️

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

Signed-off-by: Alex Wu <[email protected]>
@flyte-bot
Copy link
Collaborator

flyte-bot commented Mar 25, 2025

Code Review Agent Run Status

  • Limitations and other issues: ❌ Failure - Bito Code Review Agent didn't review this pull request automatically because it exceeded the size limit. No action is needed if you didn't intend for the agent to review it. Otherwise, you can initiate the review by typing /review in a comment below.

@flyte-bot
Copy link
Collaborator

flyte-bot commented Mar 25, 2025

Code Review Agent Run Status

  • Limitations and other issues: ❌ Failure - Bito Code Review Agent didn't review this pull request automatically because it exceeded the size limit. No action is needed if you didn't intend for the agent to review it. Otherwise, you can initiate the review by typing /review in a comment below.

@flyte-bot
Copy link
Collaborator

Code Review Agent Run Status

  • Limitations and other issues: ❌ Failure - Bito Code Review Agent didn't review this pull request automatically because it exceeded the size limit. No action is needed if you didn't intend for the agent to review it. Otherwise, you can initiate the review by typing /review in a comment below.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[Core feature] Add support cache for dynamic spec
2 participants