75
75
import java .util .function .Function ;
76
76
import java .util .stream .Collectors ;
77
77
78
+ /**
79
+ * The web backend is an abstraction that allows the frontend to structure data in such a way that
80
+ * it is easier for a react frontend to consume. It should NOT have direct access to the database.
81
+ * It should operate exclusively by calling other endpoints that are exposed in the API.
82
+ **/
78
83
@ Singleton
79
84
public class WebBackendConnectionsHandler {
80
85
@@ -87,7 +92,8 @@ public class WebBackendConnectionsHandler {
87
92
private final OperationsHandler operationsHandler ;
88
93
private final EventRunner eventRunner ;
89
94
// todo (cgardens) - this handler should NOT have access to the db. only access via handler.
90
- private final ConfigRepository configRepository ;
95
+ @ Deprecated
96
+ private final ConfigRepository configRepositoryDoNotUse ;
91
97
92
98
public WebBackendConnectionsHandler (final ConnectionsHandler connectionsHandler ,
93
99
final StateHandler stateHandler ,
@@ -97,7 +103,7 @@ public WebBackendConnectionsHandler(final ConnectionsHandler connectionsHandler,
97
103
final SchedulerHandler schedulerHandler ,
98
104
final OperationsHandler operationsHandler ,
99
105
final EventRunner eventRunner ,
100
- final ConfigRepository configRepository ) {
106
+ final ConfigRepository configRepositoryDoNotUse ) {
101
107
this .connectionsHandler = connectionsHandler ;
102
108
this .stateHandler = stateHandler ;
103
109
this .sourceHandler = sourceHandler ;
@@ -106,14 +112,14 @@ public WebBackendConnectionsHandler(final ConnectionsHandler connectionsHandler,
106
112
this .schedulerHandler = schedulerHandler ;
107
113
this .operationsHandler = operationsHandler ;
108
114
this .eventRunner = eventRunner ;
109
- this .configRepository = configRepository ;
115
+ this .configRepositoryDoNotUse = configRepositoryDoNotUse ;
110
116
}
111
117
112
118
public WebBackendWorkspaceStateResult getWorkspaceState (final WebBackendWorkspaceState webBackendWorkspaceState ) throws IOException {
113
119
final var workspaceId = webBackendWorkspaceState .getWorkspaceId ();
114
- final var connectionCount = configRepository .countConnectionsForWorkspace (workspaceId );
115
- final var destinationCount = configRepository .countDestinationsForWorkspace (workspaceId );
116
- final var sourceCount = configRepository .countSourcesForWorkspace (workspaceId );
120
+ final var connectionCount = configRepositoryDoNotUse .countConnectionsForWorkspace (workspaceId );
121
+ final var destinationCount = configRepositoryDoNotUse .countDestinationsForWorkspace (workspaceId );
122
+ final var sourceCount = configRepositoryDoNotUse .countSourcesForWorkspace (workspaceId );
117
123
118
124
return new WebBackendWorkspaceStateResult ()
119
125
.hasConnections (connectionCount > 0 )
@@ -135,7 +141,7 @@ public WebBackendConnectionReadList webBackendListConnectionsForWorkspace(final
135
141
// passing 'false' so that deleted connections are not included
136
142
false );
137
143
138
- final List <StandardSync > standardSyncs = configRepository .listWorkspaceStandardSyncs (query );
144
+ final List <StandardSync > standardSyncs = configRepositoryDoNotUse .listWorkspaceStandardSyncs (query );
139
145
final List <UUID > sourceIds = standardSyncs .stream ().map (StandardSync ::getSourceId ).toList ();
140
146
final List <UUID > destinationIds = standardSyncs .stream ().map (StandardSync ::getDestinationId ).toList ();
141
147
final List <UUID > connectionIds = standardSyncs .stream ().map (StandardSync ::getConnectionId ).toList ();
@@ -148,7 +154,7 @@ public WebBackendConnectionReadList webBackendListConnectionsForWorkspace(final
148
154
// right status filtering for this.
149
155
final Map <UUID , JobRead > runningJobByConnectionId = getRunningJobByConnectionId (connectionIds );
150
156
final Map <UUID , ActorCatalogFetchEvent > newestFetchEventsByActorId =
151
- configRepository .getMostRecentActorCatalogFetchEventForSources (sourceIds );
157
+ configRepositoryDoNotUse .getMostRecentActorCatalogFetchEventForSources (sourceIds );
152
158
153
159
final List <WebBackendConnectionListItem > connectionItems = Lists .newArrayList ();
154
160
@@ -177,14 +183,14 @@ private Map<UUID, JobRead> getRunningJobByConnectionId(final List<UUID> connecti
177
183
}
178
184
179
185
private Map <UUID , SourceSnippetRead > getSourceSnippetReadById (final List <UUID > sourceIds ) throws IOException {
180
- return configRepository .getSourceAndDefinitionsFromSourceIds (sourceIds )
186
+ return configRepositoryDoNotUse .getSourceAndDefinitionsFromSourceIds (sourceIds )
181
187
.stream ()
182
188
.map (sourceAndDefinition -> SourceHandler .toSourceSnippetRead (sourceAndDefinition .source (), sourceAndDefinition .definition ()))
183
189
.collect (Collectors .toMap (SourceSnippetRead ::getSourceId , Function .identity ()));
184
190
}
185
191
186
192
private Map <UUID , DestinationSnippetRead > getDestinationSnippetReadById (final List <UUID > destinationIds ) throws IOException {
187
- return configRepository .getDestinationAndDefinitionsFromDestinationIds (destinationIds )
193
+ return configRepositoryDoNotUse .getDestinationAndDefinitionsFromDestinationIds (destinationIds )
188
194
.stream ()
189
195
.map (destinationAndDefinition -> DestinationHandler .toDestinationSnippetRead (destinationAndDefinition .destination (),
190
196
destinationAndDefinition .definition ()))
@@ -210,7 +216,7 @@ private WebBackendConnectionRead buildWebBackendConnectionRead(final ConnectionR
210
216
});
211
217
212
218
final Optional <ActorCatalogFetchEvent > mostRecentFetchEvent =
213
- configRepository .getMostRecentActorCatalogFetchEventForSource (connectionRead .getSourceId ());
219
+ configRepositoryDoNotUse .getMostRecentActorCatalogFetchEventForSource (connectionRead .getSourceId ());
214
220
215
221
final SchemaChange schemaChange = getSchemaChange (connectionRead , currentSourceCatalogId , mostRecentFetchEvent );
216
222
@@ -539,13 +545,15 @@ public WebBackendConnectionRead webBackendUpdateConnection(final WebBackendConne
539
545
if (webBackendConnectionPatch .getSyncCatalog () != null ) {
540
546
// Get the most recent actor catalog fetched for this connection's source and the newly updated sync
541
547
// catalog
542
- Optional <ActorCatalog > mostRecentActorCatalog = configRepository .getMostRecentActorCatalogForSource (originalConnectionRead .getSourceId ());
543
- AirbyteCatalog newAirbyteCatalog = webBackendConnectionPatch .getSyncCatalog ();
548
+ final Optional <ActorCatalog > mostRecentActorCatalog =
549
+ configRepositoryDoNotUse .getMostRecentActorCatalogForSource (originalConnectionRead .getSourceId ());
550
+ final AirbyteCatalog newAirbyteCatalog = webBackendConnectionPatch .getSyncCatalog ();
544
551
// Get the diff between these two catalogs to check for breaking changes
545
552
if (mostRecentActorCatalog .isPresent ()) {
546
553
final io .airbyte .protocol .models .AirbyteCatalog mostRecentAirbyteCatalog =
547
554
Jsons .object (mostRecentActorCatalog .get ().getCatalog (), io .airbyte .protocol .models .AirbyteCatalog .class );
548
- final StandardSourceDefinition sourceDefinition = configRepository .getSourceDefinitionFromSource (originalConnectionRead .getSourceId ());
555
+ final StandardSourceDefinition sourceDefinition =
556
+ configRepositoryDoNotUse .getSourceDefinitionFromSource (originalConnectionRead .getSourceId ());
549
557
final CatalogDiff catalogDiff =
550
558
connectionsHandler .getDiff (newAirbyteCatalog , CatalogConverter .toApi (mostRecentAirbyteCatalog , sourceDefinition ),
551
559
CatalogConverter .toConfiguredProtocol (newAirbyteCatalog ));
@@ -556,7 +564,7 @@ public WebBackendConnectionRead webBackendUpdateConnection(final WebBackendConne
556
564
// before doing any updates, fetch the existing catalog so that it can be diffed
557
565
// with the final catalog to determine which streams might need to be reset.
558
566
final ConfiguredAirbyteCatalog oldConfiguredCatalog =
559
- configRepository .getConfiguredCatalogForConnection (connectionId );
567
+ configRepositoryDoNotUse .getConfiguredCatalogForConnection (connectionId );
560
568
561
569
final List <UUID > newAndExistingOperationIds = createOrUpdateOperations (originalConnectionRead , webBackendConnectionPatch );
562
570
@@ -619,7 +627,7 @@ private void resetStreamsIfNeeded(final WebBackendConnectionUpdate webBackendCon
619
627
final ConnectionStateType stateType = getStateType (connectionIdRequestBody );
620
628
621
629
if (stateType == ConnectionStateType .LEGACY || stateType == ConnectionStateType .NOT_SET ) {
622
- streamsToReset = configRepository .getAllStreamsForConnection (connectionId );
630
+ streamsToReset = configRepositoryDoNotUse .getAllStreamsForConnection (connectionId );
623
631
}
624
632
eventRunner .resetConnection (
625
633
connectionId ,
@@ -728,7 +736,7 @@ protected static ConnectionCreate toConnectionCreate(final WebBackendConnectionC
728
736
@ VisibleForTesting
729
737
protected static ConnectionUpdate toConnectionPatch (final WebBackendConnectionUpdate webBackendConnectionPatch ,
730
738
final List <UUID > finalOperationIds ,
731
- boolean breakingChange ) {
739
+ final boolean breakingChange ) {
732
740
final ConnectionUpdate connectionPatch = new ConnectionUpdate ();
733
741
734
742
connectionPatch .connectionId (webBackendConnectionPatch .getConnectionId ());
0 commit comments