Skip to content

add target #7687

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions common/persistence/serialization/task_serializer.go
Original file line number Diff line number Diff line change
Expand Up @@ -1198,6 +1198,7 @@
Version: syncWorkflowStateTask.Version,
VisibilityTime: timestamppb.New(syncWorkflowStateTask.VisibilityTimestamp),
Priority: syncWorkflowStateTask.Priority,
// target Cluster id: syncWorkflowStateTask.TargetCluster,
}
}

Expand All @@ -1218,6 +1219,7 @@
Version: syncWorkflowStateTask.Version,
TaskID: syncWorkflowStateTask.TaskId,
Priority: syncWorkflowStateTask.Priority,
TargetCluster: syncWorkflowStateTask.targetClusterId

Check failure on line 1222 in common/persistence/serialization/task_serializer.go

View workflow job for this annotation

GitHub Actions / golangci

syntax error: unexpected newline in composite literal; possibly missing comma or }) (typecheck)

Check failure on line 1222 in common/persistence/serialization/task_serializer.go

View workflow job for this annotation

GitHub Actions / golangci

syntax error: unexpected newline in composite literal; possibly missing comma or }) (typecheck)

Check failure on line 1222 in common/persistence/serialization/task_serializer.go

View workflow job for this annotation

GitHub Actions / golangci

syntax error: unexpected newline in composite literal; possibly missing comma or }) (typecheck)

Check failure on line 1222 in common/persistence/serialization/task_serializer.go

View workflow job for this annotation

GitHub Actions / lint-workflows

missing ',' before newline in composite literal

Check failure on line 1222 in common/persistence/serialization/task_serializer.go

View workflow job for this annotation

GitHub Actions / lint-workflows

syncWorkflowStateTask.targetClusterId undefined (type *persistence.ReplicationTaskInfo has no field or method targetClusterId)

Check failure on line 1222 in common/persistence/serialization/task_serializer.go

View workflow job for this annotation

GitHub Actions / lint-workflows

missing ',' before newline in composite literal

Check failure on line 1222 in common/persistence/serialization/task_serializer.go

View workflow job for this annotation

GitHub Actions / lint-workflows

syncWorkflowStateTask.targetClusterId undefined (type *persistence.ReplicationTaskInfo has no field or method targetClusterId)

Check failure on line 1222 in common/persistence/serialization/task_serializer.go

View workflow job for this annotation

GitHub Actions / lint-workflows

missing ',' before newline in composite literal

Check failure on line 1222 in common/persistence/serialization/task_serializer.go

View workflow job for this annotation

GitHub Actions / lint-workflows

syncWorkflowStateTask.targetClusterId undefined (type *persistence.ReplicationTaskInfo has no field or method targetClusterId)

Check failure on line 1222 in common/persistence/serialization/task_serializer.go

View workflow job for this annotation

GitHub Actions / lint-workflows

missing ',' before newline in composite literal

Check failure on line 1222 in common/persistence/serialization/task_serializer.go

View workflow job for this annotation

GitHub Actions / lint-workflows

syncWorkflowStateTask.targetClusterId undefined (type *persistence.ReplicationTaskInfo has no field or method targetClusterId)

Check failure on line 1222 in common/persistence/serialization/task_serializer.go

View workflow job for this annotation

GitHub Actions / lint-workflows

missing ',' before newline in composite literal

Check failure on line 1222 in common/persistence/serialization/task_serializer.go

View workflow job for this annotation

GitHub Actions / lint-workflows

syncWorkflowStateTask.targetClusterId undefined (type *persistence.ReplicationTaskInfo has no field or method targetClusterId)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -380,6 +380,7 @@ message ReplicationTaskInfo {
repeated ReplicationTaskInfo task_equivalents = 20;
history.v1.VersionHistoryItem last_version_history_item = 21;
bool is_first_task = 22;
string target_cluster_id = 23;
}

// visibility_task_data column
Expand Down
2 changes: 1 addition & 1 deletion service/history/api/replication/generate_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func GenerateTask(
defer func() { workflowLease.GetReleaseFn()(retError) }()

mutableState := workflowLease.GetMutableState()
replicationTasks, stateTransitionCount, err := mutableState.GenerateMigrationTasks()
replicationTasks, stateTransitionCount, err := mutableState.GenerateMigrationTasks("abc")
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion service/history/interfaces/mutable_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -323,7 +323,7 @@ type (
// CloseTransactionAsSnapshot closes the mutable state transaction (different from DB transaction) and prepares the current snapshot of the state to be persisted and bumps the DBRecordVersion.
// You should ideally not make any changes to the mutable state after this call.
CloseTransactionAsSnapshot(transactionPolicy TransactionPolicy) (*persistence.WorkflowSnapshot, []*persistence.WorkflowEvents, error)
GenerateMigrationTasks() ([]tasks.Task, int64, error)
GenerateMigrationTasks(targetClusterId string) ([]tasks.Task, int64, error)

// ContinueAsNewMinBackoff calculate minimal backoff for next ContinueAsNew run.
// Input backoffDuration is current backoff for next run.
Expand Down
14 changes: 14 additions & 0 deletions service/history/replication/stream_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -635,6 +635,11 @@ func (s *StreamSenderImpl) shouldProcessTask(item tasks.Task) bool {
return false
}

targetCluster := s.getTaskTargetCluster(item)
if targetCluster != "" && targetCluster != s.clientClusterName {
return false
}

var shouldProcessTask bool
namespaceEntry, err := s.shardContext.GetNamespaceRegistry().GetNamespaceByID(
namespace.ID(item.GetNamespaceID()),
Expand Down Expand Up @@ -668,3 +673,12 @@ func (s *StreamSenderImpl) getTaskPriority(task tasks.Task) enumsspb.TaskPriorit
return enumsspb.TASK_PRIORITY_HIGH
}
}

func (s *StreamSenderImpl) getTaskTargetCluster(task tasks.Task) string {
switch t := task.(type) {
case *tasks.SyncWorkflowStateTask:
return t.TargetCluster
default:
return ""
}
}
5 changes: 3 additions & 2 deletions service/history/tasks/sync_workflow_state_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,9 @@ type (
VisibilityTimestamp time.Time
TaskID int64
// TODO: validate this version in source task converter
Version int64
Priority enumsspb.TaskPriority
Version int64
Priority enumsspb.TaskPriority
TargetCluster string
}
)

Expand Down
2 changes: 1 addition & 1 deletion service/history/workflow/mutable_state_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -5944,7 +5944,7 @@ func (ms *MutableStateImpl) UpdateDuplicatedResource(
ms.appliedEvents[id] = struct{}{}
}

func (ms *MutableStateImpl) GenerateMigrationTasks() ([]tasks.Task, int64, error) {
func (ms *MutableStateImpl) GenerateMigrationTasks(targetClusterId string) ([]tasks.Task, int64, error) {
return ms.taskGenerator.GenerateMigrationTasks()
}

Expand Down
11 changes: 6 additions & 5 deletions service/history/workflow/task_generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ type (
GenerateHistoryReplicationTasks(
eventBatches [][]*historypb.HistoryEvent,
) ([]tasks.Task, error)
GenerateMigrationTasks() ([]tasks.Task, int64, error)
GenerateMigrationTasks(targetClusterId string) ([]tasks.Task, int64, error)

// Generate tasks for any updated state machines on mutable state.
// Looks up machine definition in the provided registry.
Expand Down Expand Up @@ -739,7 +739,7 @@ func (r *TaskGeneratorImpl) GenerateHistoryReplicationTasks(
}, nil
}

func (r *TaskGeneratorImpl) GenerateMigrationTasks() ([]tasks.Task, int64, error) {
func (r *TaskGeneratorImpl) GenerateMigrationTasks(targetClusterId string) ([]tasks.Task, int64, error) {
executionInfo := r.mutableState.GetExecutionInfo()
versionHistory, err := versionhistory.GetCurrentVersionHistory(executionInfo.GetVersionHistories())
if err != nil {
Expand All @@ -755,9 +755,10 @@ func (r *TaskGeneratorImpl) GenerateMigrationTasks() ([]tasks.Task, int64, error
if r.mutableState.GetExecutionState().State == enumsspb.WORKFLOW_EXECUTION_STATE_COMPLETED {
syncWorkflowStateTask := []tasks.Task{&tasks.SyncWorkflowStateTask{
// TaskID, VisibilityTimestamp is set by shard
WorkflowKey: workflowKey,
Version: lastItem.GetVersion(),
Priority: enumsspb.TASK_PRIORITY_LOW,
WorkflowKey: workflowKey,
Version: lastItem.GetVersion(),
Priority: enumsspb.TASK_PRIORITY_LOW,
TargetCluster: targetClusterId,
}}
if r.mutableState.IsTransitionHistoryEnabled() &&
// even though current cluster may enabled state transition, but transition history can be cleared
Expand Down
Loading