Skip to content

Commit a16ecd6

Browse files
authored
Bmoric/extract state api (#18980)
* Extract Operation API * Extract scheduler API * Format * extract source api * Extract source definition api * Add path * Extract State API * Add missing binder * fix type
1 parent b7b6507 commit a16ecd6

File tree

6 files changed

+112
-5
lines changed

6 files changed

+112
-5
lines changed

airbyte-server/src/main/java/io/airbyte/server/ServerApp.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import io.airbyte.config.persistence.ConfigRepository;
2222
import io.airbyte.config.persistence.SecretsRepositoryReader;
2323
import io.airbyte.config.persistence.SecretsRepositoryWriter;
24+
import io.airbyte.config.persistence.StatePersistence;
2425
import io.airbyte.config.persistence.StreamResetPersistence;
2526
import io.airbyte.config.persistence.split_secrets.SecretPersistence;
2627
import io.airbyte.config.persistence.split_secrets.SecretsHydrator;
@@ -61,6 +62,7 @@
6162
import io.airbyte.server.handlers.SchedulerHandler;
6263
import io.airbyte.server.handlers.SourceDefinitionsHandler;
6364
import io.airbyte.server.handlers.SourceHandler;
65+
import io.airbyte.server.handlers.StateHandler;
6466
import io.airbyte.server.handlers.WorkspacesHandler;
6567
import io.airbyte.server.scheduler.DefaultSynchronousSchedulerClient;
6668
import io.airbyte.server.scheduler.EventRunner;
@@ -320,6 +322,10 @@ public static ServerRunnable getServer(final ServerFactory apiFactory,
320322

321323
final OpenApiConfigHandler openApiConfigHandler = new OpenApiConfigHandler();
322324

325+
final StatePersistence statePersistence = new StatePersistence(configsDatabase);
326+
327+
final StateHandler stateHandler = new StateHandler(statePersistence);
328+
323329
LOGGER.info("Starting server...");
324330

325331
return apiFactory.create(
@@ -353,6 +359,7 @@ public static ServerRunnable getServer(final ServerFactory apiFactory,
353359
schedulerHandler,
354360
sourceHandler,
355361
sourceDefinitionsHandler,
362+
stateHandler,
356363
workspacesHandler);
357364
}
358365

airbyte-server/src/main/java/io/airbyte/server/ServerFactory.java

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,8 @@
3131
import io.airbyte.server.apis.SchedulerApiController;
3232
import io.airbyte.server.apis.SourceApiController;
3333
import io.airbyte.server.apis.SourceDefinitionApiController;
34+
import io.airbyte.server.apis.SourceOauthApiController;
35+
import io.airbyte.server.apis.StateApiController;
3436
import io.airbyte.server.apis.binders.AttemptApiBinder;
3537
import io.airbyte.server.apis.binders.ConnectionApiBinder;
3638
import io.airbyte.server.apis.binders.DbMigrationBinder;
@@ -48,6 +50,7 @@
4850
import io.airbyte.server.apis.binders.SourceApiBinder;
4951
import io.airbyte.server.apis.binders.SourceDefinitionApiBinder;
5052
import io.airbyte.server.apis.binders.SourceOauthApiBinder;
53+
import io.airbyte.server.apis.binders.StateApiBinder;
5154
import io.airbyte.server.apis.factories.AttemptApiFactory;
5255
import io.airbyte.server.apis.factories.ConnectionApiFactory;
5356
import io.airbyte.server.apis.factories.DbMigrationApiFactory;
@@ -65,6 +68,7 @@
6568
import io.airbyte.server.apis.factories.SourceApiFactory;
6669
import io.airbyte.server.apis.factories.SourceDefinitionApiFactory;
6770
import io.airbyte.server.apis.factories.SourceOauthApiFactory;
71+
import io.airbyte.server.apis.factories.StateApiFactory;
6872
import io.airbyte.server.handlers.AttemptHandler;
6973
import io.airbyte.server.handlers.ConnectionsHandler;
7074
import io.airbyte.server.handlers.DbMigrationHandler;
@@ -79,6 +83,7 @@
7983
import io.airbyte.server.handlers.SchedulerHandler;
8084
import io.airbyte.server.handlers.SourceDefinitionsHandler;
8185
import io.airbyte.server.handlers.SourceHandler;
86+
import io.airbyte.server.handlers.StateHandler;
8287
import io.airbyte.server.handlers.WorkspacesHandler;
8388
import io.airbyte.server.scheduler.EventRunner;
8489
import io.airbyte.server.scheduler.SynchronousSchedulerClient;
@@ -121,6 +126,7 @@ ServerRunnable create(final SynchronousSchedulerClient synchronousSchedulerClien
121126
final SchedulerHandler schedulerHandler,
122127
final SourceHandler sourceHandler,
123128
final SourceDefinitionsHandler sourceDefinitionsHandler,
129+
final StateHandler stateHandler,
124130
final WorkspacesHandler workspacesHandler);
125131

126132
class Api implements ServerFactory {
@@ -156,6 +162,7 @@ public ServerRunnable create(final SynchronousSchedulerClient synchronousSchedul
156162
final SchedulerHandler schedulerHandler,
157163
final SourceHandler sourceHandler,
158164
final SourceDefinitionsHandler sourceDefinitionsHandler,
165+
final StateHandler stateHandler,
159166
final WorkspacesHandler workspacesHandler) {
160167
final Map<String, String> mdc = MDC.getCopyOfContextMap();
161168

@@ -218,6 +225,8 @@ public ServerRunnable create(final SynchronousSchedulerClient synchronousSchedul
218225

219226
SourceDefinitionApiFactory.setValues(sourceDefinitionsHandler);
220227

228+
StateApiFactory.setValues(stateHandler);
229+
221230
// server configurations
222231
final Set<Class<?>> componentClasses = Set.of(
223232
ConfigurationApi.class,
@@ -237,7 +246,8 @@ public ServerRunnable create(final SynchronousSchedulerClient synchronousSchedul
237246
SchedulerApiController.class,
238247
SourceApiController.class,
239248
SourceDefinitionApiController.class,
240-
SourceOauthApiFactory.class);
249+
SourceOauthApiController.class,
250+
StateApiController.class);
241251

242252
final Set<Object> components = Set.of(
243253
new CorsFilter(),
@@ -258,7 +268,8 @@ public ServerRunnable create(final SynchronousSchedulerClient synchronousSchedul
258268
new SchedulerApiBinder(),
259269
new SourceApiBinder(),
260270
new SourceDefinitionApiBinder(),
261-
new SourceOauthApiBinder());
271+
new SourceOauthApiBinder(),
272+
new StateApiBinder());
262273

263274
// construct server
264275
return new ServerApp(airbyteVersion, componentClasses, components);

airbyte-server/src/main/java/io/airbyte/server/apis/ConfigurationApi.java

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -416,7 +416,7 @@ public void revokeSourceDefinitionFromWorkspace(final SourceDefinitionIdWithWork
416416
}
417417

418418
@Override
419-
public InternalOperationResult saveStats(SaveStatsRequestBody saveStatsRequestBody) {
419+
public InternalOperationResult saveStats(final SaveStatsRequestBody saveStatsRequestBody) {
420420
throw new UnsupportedOperationException();
421421
}
422422

@@ -948,9 +948,13 @@ public OperationRead createOperation(final OperationCreate operationCreate) {
948948
throw new NotImplementedException();
949949
}
950950

951+
/**
952+
* This implementation has been moved to {@link StateApiController}. Since the path of
953+
* {@link StateApiController} is more granular, it will override this implementation
954+
*/
951955
@Override
952956
public ConnectionState createOrUpdateState(final ConnectionStateCreateOrUpdate connectionStateCreateOrUpdate) {
953-
return ConfigurationApi.execute(() -> stateHandler.createOrUpdateState(connectionStateCreateOrUpdate));
957+
throw new NotImplementedException();
954958
}
955959

956960
/**
@@ -989,9 +993,13 @@ public OperationRead updateOperation(final OperationUpdate operationUpdate) {
989993
throw new NotImplementedException();
990994
}
991995

996+
/**
997+
* This implementation has been moved to {@link StateApiController}. Since the path of
998+
* {@link StateApiController} is more granular, it will override this implementation
999+
*/
9921000
@Override
9931001
public ConnectionState getState(final ConnectionIdRequestBody connectionIdRequestBody) {
994-
return ConfigurationApi.execute(() -> stateHandler.getState(connectionIdRequestBody));
1002+
throw new NotImplementedException();
9951003
}
9961004

9971005
// SCHEDULER
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
/*
2+
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
3+
*/
4+
5+
package io.airbyte.server.apis;
6+
7+
import io.airbyte.api.generated.StateApi;
8+
import io.airbyte.api.model.generated.ConnectionIdRequestBody;
9+
import io.airbyte.api.model.generated.ConnectionState;
10+
import io.airbyte.api.model.generated.ConnectionStateCreateOrUpdate;
11+
import io.airbyte.server.handlers.StateHandler;
12+
import javax.ws.rs.Path;
13+
import lombok.AllArgsConstructor;
14+
15+
@Path("/v1/state")
16+
@AllArgsConstructor
17+
public class StateApiController implements StateApi {
18+
19+
private final StateHandler stateHandler;
20+
21+
@Override
22+
public ConnectionState createOrUpdateState(final ConnectionStateCreateOrUpdate connectionStateCreateOrUpdate) {
23+
return ConfigurationApi.execute(() -> stateHandler.createOrUpdateState(connectionStateCreateOrUpdate));
24+
}
25+
26+
@Override
27+
public ConnectionState getState(final ConnectionIdRequestBody connectionIdRequestBody) {
28+
return ConfigurationApi.execute(() -> stateHandler.getState(connectionIdRequestBody));
29+
}
30+
31+
}
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
/*
2+
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
3+
*/
4+
5+
package io.airbyte.server.apis.binders;
6+
7+
import io.airbyte.server.apis.StateApiController;
8+
import io.airbyte.server.apis.factories.StateApiFactory;
9+
import org.glassfish.hk2.utilities.binding.AbstractBinder;
10+
import org.glassfish.jersey.process.internal.RequestScoped;
11+
12+
public class StateApiBinder extends AbstractBinder {
13+
14+
@Override
15+
protected void configure() {
16+
bindFactory(StateApiFactory.class)
17+
.to(StateApiController.class)
18+
.in(RequestScoped.class);
19+
}
20+
21+
}
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
/*
2+
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
3+
*/
4+
5+
package io.airbyte.server.apis.factories;
6+
7+
import io.airbyte.server.apis.StateApiController;
8+
import io.airbyte.server.handlers.StateHandler;
9+
import org.glassfish.hk2.api.Factory;
10+
11+
public class StateApiFactory implements Factory<StateApiController> {
12+
13+
private static StateHandler stateHandler;
14+
15+
public static void setValues(final StateHandler stateHandler) {
16+
StateApiFactory.stateHandler = stateHandler;
17+
}
18+
19+
@Override
20+
public StateApiController provide() {
21+
return new StateApiController(StateApiFactory.stateHandler);
22+
}
23+
24+
@Override
25+
public void dispose(final StateApiController instance) {
26+
/* no op */
27+
}
28+
29+
}

0 commit comments

Comments
 (0)