Skip to content

Scheduled callbacks #823

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 32 commits into from
Aug 8, 2025
Merged
Show file tree
Hide file tree
Changes from 24 commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
3ab3459
add execute and process templates
devbugging May 22, 2025
bee210b
add callback transactions
devbugging May 22, 2025
a89b8b8
execution of callbacks
devbugging May 22, 2025
2d8c206
add contracts and transactions for scheduler
devbugging May 23, 2025
f934204
add execution of scheduled callbacks if enabled
devbugging May 23, 2025
1c1a3f2
add callbacks logic for parsing events and preparing transactions
devbugging May 23, 2025
65a19c2
config wiring
devbugging May 23, 2025
5553d6a
licence
devbugging May 23, 2025
350b70b
change cancel logic to closure
devbugging Jun 3, 2025
324a73a
remove cancel function
devbugging Jun 3, 2025
bc21b1b
update scheduled callback
devbugging Jun 3, 2025
13138f7
rejected callback
devbugging Jun 3, 2025
0afbc9f
update API for scheduling
devbugging Jun 3, 2025
9e78772
add garbage collection of callbacks
devbugging Jun 4, 2025
fdefdab
updated contract with singleton pattern
devbugging Jun 6, 2025
3f7a42d
callbacks tests
devbugging Jun 6, 2025
c870e31
fix cancel method
devbugging Jun 6, 2025
a44bda6
convert uint64
devbugging Jun 23, 2025
d48ee51
type id formatting
devbugging Jun 23, 2025
ba2fa44
Merge branch 'master' into gregor/schedule-callbacks
nialexsan Jul 16, 2025
645c8be
Update emulator/callbacks.go
nialexsan Jul 16, 2025
66a319e
Update emulator/callbacks_test.go
nialexsan Jul 16, 2025
235470a
Update emulator/callbacks_test.go
nialexsan Jul 16, 2025
9e87889
PR review comments
devbugging Jul 17, 2025
4d5f2e4
update to latest version of FlowCallbackScheduler
joshuahannan Jul 22, 2025
d842353
return empty dictionary when there are no low prio callbacks
joshuahannan Jul 22, 2025
65ed6d1
update to latest contract and fix comment
joshuahannan Jul 28, 2025
41a13de
contract 07-31
joshuahannan Jul 31, 2025
e0eb1ef
update to latest contract and fix event parameters
joshuahannan Aug 4, 2025
4026a2a
use core contracts version of schedule callbacks cadence code
joshuahannan Aug 5, 2025
67c296d
go mod tidy
joshuahannan Aug 5, 2025
f931f3b
update core contracts version
joshuahannan Aug 7, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 35 additions & 33 deletions cmd/emulator/start/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ type Config struct {
CheckpointPath string `default:"" flag:"checkpoint-dir" info:"checkpoint directory to load the emulator state from"`
StateHash string `default:"" flag:"state-hash" info:"state hash of the checkpoint to load the emulator state from"`
ComputationReportingEnabled bool `default:"false" flag:"computation-reporting" info:"enable Cadence computation reporting"`
ScheduledCallbacksEnabled bool `default:"false" flag:"scheduled-callbacks" info:"enable Cadence scheduled callbacks"`
SetupEVMEnabled bool `default:"true" flag:"setup-evm" info:"enable EVM setup for the emulator, this will deploy the EVM contracts"`
SetupVMBridgeEnabled bool `default:"true" flag:"setup-vm-bridge" info:"enable VM Bridge setup for the emulator, this will deploy the VM Bridge contracts"`
}
Expand Down Expand Up @@ -190,39 +191,40 @@ func Cmd(config StartConfig) *cobra.Command {
RESTPort: conf.RestPort,
RESTDebug: conf.RESTDebug,
// TODO: allow headers to be parsed from environment
HTTPHeaders: nil,
BlockTime: conf.BlockTime,
ServicePublicKey: servicePublicKey,
ServicePrivateKey: servicePrivateKey,
ServiceKeySigAlgo: serviceKeySigAlgo,
ServiceKeyHashAlgo: serviceKeyHashAlgo,
Persist: conf.Persist,
Snapshot: conf.Snapshot,
DBPath: conf.DBPath,
GenesisTokenSupply: parseCadenceUFix64(conf.TokenSupply, "token-supply"),
TransactionMaxGasLimit: uint64(conf.TransactionMaxGasLimit),
ScriptGasLimit: uint64(conf.ScriptGasLimit),
TransactionExpiry: uint(conf.TransactionExpiry),
StorageLimitEnabled: conf.StorageLimitEnabled,
StorageMBPerFLOW: storageMBPerFLOW,
MinimumStorageReservation: minimumStorageReservation,
TransactionFeesEnabled: conf.TransactionFeesEnabled,
WithContracts: conf.Contracts,
SkipTransactionValidation: conf.SkipTxValidation,
SimpleAddressesEnabled: conf.SimpleAddresses,
Host: conf.Host,
ChainID: flowChainID,
RedisURL: conf.RedisURL,
ContractRemovalEnabled: conf.ContractRemovalEnabled,
SqliteURL: conf.SqliteURL,
CoverageReportingEnabled: conf.CoverageReportingEnabled,
StartBlockHeight: conf.StartBlockHeight,
RPCHost: conf.RPCHost,
CheckpointPath: conf.CheckpointPath,
StateHash: conf.StateHash,
ComputationReportingEnabled: conf.ComputationReportingEnabled,
SetupEVMEnabled: conf.SetupEVMEnabled,
SetupVMBridgeEnabled: conf.SetupVMBridgeEnabled,
HTTPHeaders: nil,
BlockTime: conf.BlockTime,
ServicePublicKey: servicePublicKey,
ServicePrivateKey: servicePrivateKey,
ServiceKeySigAlgo: serviceKeySigAlgo,
ServiceKeyHashAlgo: serviceKeyHashAlgo,
Persist: conf.Persist,
Snapshot: conf.Snapshot,
DBPath: conf.DBPath,
GenesisTokenSupply: parseCadenceUFix64(conf.TokenSupply, "token-supply"),
TransactionMaxGasLimit: uint64(conf.TransactionMaxGasLimit),
ScriptGasLimit: uint64(conf.ScriptGasLimit),
TransactionExpiry: uint(conf.TransactionExpiry),
StorageLimitEnabled: conf.StorageLimitEnabled,
StorageMBPerFLOW: storageMBPerFLOW,
MinimumStorageReservation: minimumStorageReservation,
TransactionFeesEnabled: conf.TransactionFeesEnabled,
WithContracts: conf.Contracts,
SkipTransactionValidation: conf.SkipTxValidation,
SimpleAddressesEnabled: conf.SimpleAddresses,
Host: conf.Host,
ChainID: flowChainID,
RedisURL: conf.RedisURL,
ContractRemovalEnabled: conf.ContractRemovalEnabled,
SqliteURL: conf.SqliteURL,
CoverageReportingEnabled: conf.CoverageReportingEnabled,
StartBlockHeight: conf.StartBlockHeight,
RPCHost: conf.RPCHost,
CheckpointPath: conf.CheckpointPath,
StateHash: conf.StateHash,
ComputationReportingEnabled: conf.ComputationReportingEnabled,
ScheduledCallbacksEnabled: conf.ScheduledCallbacksEnabled,
SetupEVMEnabled: conf.SetupEVMEnabled,
SetupVMBridgeEnabled: conf.SetupVMBridgeEnabled,
}

emu := server.NewEmulatorServer(logger, serverConf)
Expand Down
62 changes: 62 additions & 0 deletions emulator/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,12 @@ func WithComputationReporting(enabled bool) Option {
}
}

func WithScheduledCallbacks(enabled bool) Option {
return func(c *config) {
c.ScheduledCallbacksEnabled = enabled
}
}

// WithSetupEVMEnabled enables/disables the EVM setup.
func WithSetupEVMEnabled(enabled bool) Option {
return func(c *config) {
Expand Down Expand Up @@ -397,6 +403,7 @@ type config struct {
AutoMine bool
Contracts []ContractDescription
ComputationReportingEnabled bool
ScheduledCallbacksEnabled bool
SetupEVMEnabled bool
SetupVMBridgeEnabled bool
}
Expand Down Expand Up @@ -1276,6 +1283,23 @@ func (b *Blockchain) executeBlock() ([]*types.TransactionResult, error) {
results = append(results, result)
}

// lastly execute any scheduled callbacks if the feature is enabled
if b.conf.ScheduledCallbacksEnabled {
// todo refactor after bootstrap deploys CallbackScheduler
// this is a temporary workaround since deployment of CallbackScheduler is not
// yet part of bootstrap procedure, so it must be deployed in the first block
// during emulator startup, so we shouldn't support scheduling in these first blocks
if b.pendingBlock.height < 2 {
return results, nil
}

callbacks, err := b.executeScheduledCallbacks(blockContext)
if err != nil {
return results, err
}
results = append(results, callbacks...)
}

return results, nil
}

Expand Down Expand Up @@ -1877,6 +1901,44 @@ func (b *Blockchain) executeSystemChunkTransaction() error {
return nil
}

func (b *Blockchain) executeScheduledCallbacks(blockContext fvm.Context) ([]*types.TransactionResult, error) {
var results []*types.TransactionResult

serviceAddress := b.GetChain().ServiceAddress()
parentID := b.pendingBlock.parentID
// disable checks for signatures and keys since we are executing a system transaction
ctx := fvm.NewContextFromParent(
blockContext,
fvm.WithAuthorizationChecksEnabled(false),
fvm.WithSequenceNumberCheckAndIncrementEnabled(false),
)

// add transaction to schedule callbacks
b.pendingBlock.AddTransaction(processCallbackTransaction(serviceAddress, parentID))
result, err := b.executeNextTransaction(ctx)
if err != nil {
return results, err
}
results = append(results, result)

// execute callbacks we receive from events of the schedule transaction
executeTxs, err := executeCallbackTransactions(result.Events, serviceAddress, parentID)
if err != nil {
return results, err
}

for _, tx := range executeTxs {
b.pendingBlock.AddTransaction(tx)
result, err := b.executeNextTransaction(ctx)
if err != nil {
return results, err
}
results = append(results, result)
}

return results, nil
}

func (b *Blockchain) GetRegisterValues(registerIDs flowgo.RegisterIDs, height uint64) (values []flowgo.RegisterValue, err error) {
ledger, err := b.storage.LedgerByHeight(context.Background(), height)
if err != nil {
Expand Down
136 changes: 136 additions & 0 deletions emulator/callbacks.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
/*
* Flow Emulator
*
* Copyright Flow Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package emulator

import (
_ "embed"
"fmt"
"strings"

"github.com/onflow/cadence"
"github.com/onflow/cadence/common"
jsoncdc "github.com/onflow/cadence/encoding/json"
flowsdk "github.com/onflow/flow-go-sdk"
"github.com/onflow/flow-go/model/flow"
)

//go:embed templates/executeCallbackTransaction.cdc
var executeCallbackScript []byte

//go:embed templates/processCallbackTransaction.cdc
var processCallbackScript []byte

const (
contractName = "UnsafeCallbackScheduler"
callbackProcessedEvent = "CallbackProcessed"
)

// todo: replace all the functions bellow with flow-go implementation once it's done
// issue: https://github.com/onflow/flow-emulator/issues/829

func processCallbackTransaction(
serviceAddress flow.Address,
parentID flow.Identifier,
) flow.TransactionBody {
script := replaceSchedulerAddress(processCallbackScript, serviceAddress)

return *flow.NewTransactionBody().
SetScript(script).
SetComputeLimit(defaultTransactionMaxGasLimit).
SetPayer(serviceAddress).
SetReferenceBlockID(parentID)
}

func executeCallbackTransactions(
scheduleEvent []flowsdk.Event,
serviceAddress flow.Address,
parentID flow.Identifier,
) ([]flow.TransactionBody, error) {
var transactions []flow.TransactionBody
script := replaceSchedulerAddress(executeCallbackScript, serviceAddress)

for _, e := range scheduleEvent {
limit, id, err := parseSchedulerProcessedEvent(e, serviceAddress)
if err != nil {
return nil, err
}

tx := flow.NewTransactionBody().
SetScript(script).
AddArgument(id).
SetPayer(serviceAddress).
SetReferenceBlockID(parentID).
SetComputeLimit(limit)

transactions = append(transactions, *tx)
}

return transactions, nil
}

// parseSchedulerProcessedEvent parses flow event that is emitted during scheduler
// marking the callback as scheduled.
// Returns:
// - execution effort
// - ID of the event encoded as bytes
// - error in case the event type is not correct
func parseSchedulerProcessedEvent(event flowsdk.Event, serviceAddress flow.Address) (uint64, []byte, error) {
contractLocation := common.AddressLocation{
Address: common.Address(serviceAddress),
Name: contractName,
}
callbackScheduledEvent := contractLocation.TypeID(nil, fmt.Sprintf("%s.%s", contractName, callbackProcessedEvent))

const (
IDField = "ID"
executionField = "executionEffort"
)

if event.Type != string(callbackScheduledEvent) {
return 0, nil, fmt.Errorf("invalid event type, got: %s, expected: %s", event.Type, callbackScheduledEvent)
}

id, ok := event.Value.SearchFieldByName(IDField).(cadence.UInt64)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Instead of searching linearly for two fields, maybe use FieldsMappedByName instead and look up the fields in the result

if !ok {
return 0, nil, fmt.Errorf("invalid ID value type: %v", id)
}

encodedID, err := jsoncdc.Encode(id)
if err != nil {
return 0, nil, err
}

effort, ok := event.Value.SearchFieldByName(executionField).(cadence.UInt64)
if !ok {
return 0, nil, fmt.Errorf("invalid effort value type: %v", effort)
}
computeLimit := uint64(effort)

return computeLimit, encodedID, nil
}

func replaceSchedulerAddress(script []byte, serviceAddress flow.Address) []byte {
s := strings.ReplaceAll(
string(script),
`import "UnsafeCallbackScheduler"`,
fmt.Sprintf("import UnsafeCallbackScheduler from %s", serviceAddress.HexWithPrefix()),
)

return []byte(s)
}
Loading
Loading