Skip to content

Check protocol version compatibility during a platform update #19200

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged

Conversation

gosusnp
Copy link
Contributor

@gosusnp gosusnp commented Nov 9, 2022

What

When updating the platform we need to ensure that we are not going to end up having to disable a connection because the support range of the protocol version changed.
This PR adds the logic to check for protocol version compatibility of the platform and the connectors used in active syncs to decide whether it is safe to continue the upgrade or if we need to other interventions before.

Relates to #15463

How

  • Add a ProtocolVersionChecker to check the protocol compatibilty
  • Plug it in the Bootloader before the DB migrations in case we want to abort the upgrade
  • Small refactoring for DB code reuse

Recommended reading order

  1. x.java
  2. y.python

🚨 User Impact 🚨

Following this change, platform upgrades can be aborted because of protocol version conflicts.
If such cases happen, there will need to be some manual operations before being able to upgrade the platform. A typical operation would be upgrading a connector to a version that is using a more recent version of the protocol.

@gosusnp gosusnp temporarily deployed to more-secrets November 9, 2022 21:39 Inactive
@gosusnp gosusnp marked this pull request as ready for review November 9, 2022 21:50

final var bootloader =
new BootloaderApp(configs, featureFlags, secretMigrator, configsDslContext, jobsDslContext, configsFlyway, jobsFlyway, definitionsProvider,
false);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume that this is hard-coded to false just for now and will eventually be replaced by some sort of configuration property?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I had the change to add the configs, however it felt more misleading for the time being because this how we apply upgrades is currently hardcoded and injected in cloud.
The config should control both how apply the upgrades and how we validate.
Unless someone has a strong feeling we should address this now, I'd rather push it to when we micronaut the bootloader.

Should I move this comment to here instead?

final Map<ActorType, Set<UUID>> conflicts = getConflictingActorDefinitions(targetRange);

if (conflicts.isEmpty()) {
log.info("No conflicts");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Consider adding more context to this message so it's easy to understand what has no conflicts. Perhaps something like "No conflicting connector protocol versions detected."

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated

// hard.
// If the new config is fine, the system should self-heal.
log.warn("Inconsistent AirbyteProtocolVersion found, only one of min/max was found. (min:{}, max:{})",
min.map(Version::serialize).orElse(""), max.map(Version::serialize).orElse(""));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: consider extracting to a function that can be reused for both min and max. Something like:

private Function<Optional<Version>, String> versionFunction = v -> v.map(Version::serialize).orElse("");

...

log.warn("... (min:{}, max:{})", versionFunction.apply(min), versionFunction.apply(max));

It doesn't reduce the code that much, but does prevent repetition of the logic to extract the version from the optional to avoid potential drift.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this makes sense, but I am thinking of pushing it a bit further and changing the toString() implementation and stop using serialize everywhere.

The different between the two is about format, serialize() gives a <major>.<minor>.<patch> while toString() returns Version{version='<major>.<minor>.<patch>', major='<major>', minor='<minor>', patch='<patch>'}.

return String.format("Destination: %s: %s: protocol version: %s",
destDef.getDestinationDefinitionId(), destDef.getName(), destDef.getProtocolVersion());
} catch (Exception e) {
log.info("Failed to getStandardDestinationDefinition for " + defId, e);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: use placeholders instead of string concatenation in log message.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

Copy link
Contributor

@jdpgrailsdev jdpgrailsdev left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

:shipit: a few nits to address/can be done in a follow up PR.

@gosusnp gosusnp temporarily deployed to more-secrets November 9, 2022 22:12 Inactive
if (actorType == ActorType.SOURCE) {
stream = definitionsProvider.getSourceDefinitions()
.stream()
.map(def -> Map.entry(def.getSourceDefinitionId(), AirbyteProtocolVersion.getWithDefault(def.getSpec().getProtocolVersion())));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not doing collect(Collectors.toMap) here? or just map the to get the keys. It looks like this is the only thing that is neeed here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need both UUID and Protocol version, see line 169

return String.format("Source: %s: %s: protocol version: %s",
sourceDef.getSourceDefinitionId(), sourceDef.getName(), sourceDef.getProtocolVersion());
} catch (Exception e) {
log.info("Failed to getStandardSourceDefinition for {}", defId, e);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be an error?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am on the fence here.
At this point, we are failing in the extra read we do for pretty printing errors, which is why I left it as info.

*/
static Set<String> getConnectorRepositoriesInUse(final DSLContext ctx) {
return getActorDefinitionsInUse(ctx)
.map(Record4::value2)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we use map(record -> record.get(ACTOR_DEFINITION.DOCKER_REPOSITORY)) here? This would help to understand what we are fetching here. (Same bellow)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Much better indeed. done.

@gosusnp gosusnp temporarily deployed to more-secrets November 9, 2022 23:13 Inactive
@gosusnp gosusnp temporarily deployed to more-secrets November 10, 2022 00:26 Inactive
@gosusnp gosusnp temporarily deployed to more-secrets November 10, 2022 01:32 Inactive
@gosusnp
Copy link
Contributor Author

gosusnp commented Nov 10, 2022

@jdpgrailsdev, @benmoriceau, FYI, I just changed the interface a bit to accommodate cloud behavior.

With the addition of the RemoteDefinitionsProvider, the DefinitionsProvider is now optional. Currently, if we fail to initialize a valid RemoteDefinitionsProvider, we continue to start the platform without the ability to update connectors from seed. I have updated the logic to make sure that we do not prevent the platform from starting if there is no conflicts detected even without a DefinitionsProvider.

@gosusnp gosusnp temporarily deployed to more-secrets November 10, 2022 01:50 Inactive
@gosusnp gosusnp temporarily deployed to more-secrets November 10, 2022 02:16 Inactive
@gosusnp gosusnp temporarily deployed to more-secrets November 10, 2022 02:34 Inactive
@gosusnp gosusnp temporarily deployed to more-secrets November 14, 2022 17:18 Inactive
@gosusnp gosusnp merged commit 8683bc8 into master Nov 14, 2022
@gosusnp gosusnp deleted the gosusnp/16043-check-disabled-connections-after-protocol-update branch November 14, 2022 18:16
akashkulk pushed a commit that referenced this pull request Dec 2, 2022
* Refactoring to improve code re-use

* Add ProtocolVersionChecker

* Add an option to configure if we are automatically upgrading connectors

* Add airbyte version check to pass the fresh install case

* Inject DefinitionsProvider in the BootloaderApp

* Remove AutoUpgradeConnector config

* Improve logging

* Use named argument rather than positional

* Make DefinitionsProvider optional

* Format
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants