31
31
import io .airbyte .workers .temporal .scheduling .state .WorkflowState ;
32
32
import io .airbyte .workers .temporal .spec .SpecWorkflow ;
33
33
import io .airbyte .workers .temporal .sync .SyncWorkflow ;
34
- import io .temporal .api .workflow .v1 .WorkflowExecutionInfo ;
35
34
import io .temporal .api .workflowservice .v1 .ListOpenWorkflowExecutionsRequest ;
36
35
import io .temporal .api .workflowservice .v1 .ListOpenWorkflowExecutionsResponse ;
37
36
import io .temporal .client .BatchRequest ;
40
39
import java .io .IOException ;
41
40
import java .nio .file .Path ;
42
41
import java .util .HashSet ;
43
- import java .util .List ;
44
42
import java .util .Optional ;
45
43
import java .util .Set ;
46
44
import java .util .UUID ;
@@ -65,6 +63,8 @@ public class TemporalClient {
65
63
*/
66
64
private static final int DELAY_BETWEEN_QUERY_MS = 10 ;
67
65
66
+ private static final int MAXIMUM_SEARCH_PAGE_SIZE = 50 ;
67
+
68
68
public static TemporalClient production (final String temporalHost , final Path workspaceRoot , final Configs configs ) {
69
69
final WorkflowServiceStubs temporalService = TemporalUtils .createTemporalService (temporalHost );
70
70
return new TemporalClient (WorkflowClient .newInstance (temporalService ), workspaceRoot , temporalService , configs );
@@ -454,14 +454,15 @@ public boolean isWorkflowRunning(final String workflowName) {
454
454
ListOpenWorkflowExecutionsRequest openWorkflowExecutionsRequest =
455
455
ListOpenWorkflowExecutionsRequest .newBuilder ()
456
456
.setNamespace (client .getOptions ().getNamespace ())
457
+ .setMaximumPageSize (MAXIMUM_SEARCH_PAGE_SIZE )
457
458
.build ();
458
459
do {
459
460
final ListOpenWorkflowExecutionsResponse listOpenWorkflowExecutionsRequest =
460
461
service .blockingStub ().listOpenWorkflowExecutions (openWorkflowExecutionsRequest );
461
- final List < WorkflowExecutionInfo > workflowExecutionInfos = listOpenWorkflowExecutionsRequest .getExecutionsList ().stream ()
462
+ final long matchingWorkflowCount = listOpenWorkflowExecutionsRequest .getExecutionsList ().stream ()
462
463
.filter ((workflowExecutionInfo -> workflowExecutionInfo .getExecution ().getWorkflowId ().equals (workflowName )))
463
- .collect ( Collectors . toList () );
464
- if (! workflowExecutionInfos . isEmpty () ) {
464
+ .count ( );
465
+ if (matchingWorkflowCount != 0 ) {
465
466
return true ;
466
467
}
467
468
token = listOpenWorkflowExecutionsRequest .getNextPageToken ();
@@ -470,6 +471,7 @@ public boolean isWorkflowRunning(final String workflowName) {
470
471
ListOpenWorkflowExecutionsRequest .newBuilder ()
471
472
.setNamespace (client .getOptions ().getNamespace ())
472
473
.setNextPageToken (token )
474
+ .setMaximumPageSize (MAXIMUM_SEARCH_PAGE_SIZE )
473
475
.build ();
474
476
475
477
} while (token != null && token .size () > 0 );
0 commit comments