-
Notifications
You must be signed in to change notification settings - Fork 4.6k
Check protocol version compatibility during a platform update #19200
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Check protocol version compatibility during a platform update #19200
Conversation
|
||
final var bootloader = | ||
new BootloaderApp(configs, featureFlags, secretMigrator, configsDslContext, jobsDslContext, configsFlyway, jobsFlyway, definitionsProvider, | ||
false); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I assume that this is hard-coded to false
just for now and will eventually be replaced by some sort of configuration property?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I had the change to add the configs, however it felt more misleading for the time being because this how we apply upgrades is currently hardcoded and injected in cloud.
The config should control both how apply the upgrades and how we validate.
Unless someone has a strong feeling we should address this now, I'd rather push it to when we micronaut the bootloader.
Should I move this comment to here instead?
final Map<ActorType, Set<UUID>> conflicts = getConflictingActorDefinitions(targetRange); | ||
|
||
if (conflicts.isEmpty()) { | ||
log.info("No conflicts"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: Consider adding more context to this message so it's easy to understand what has no conflicts. Perhaps something like "No conflicting connector protocol versions detected."
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updated
// hard. | ||
// If the new config is fine, the system should self-heal. | ||
log.warn("Inconsistent AirbyteProtocolVersion found, only one of min/max was found. (min:{}, max:{})", | ||
min.map(Version::serialize).orElse(""), max.map(Version::serialize).orElse("")); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: consider extracting to a function that can be reused for both min
and max
. Something like:
private Function<Optional<Version>, String> versionFunction = v -> v.map(Version::serialize).orElse("");
...
log.warn("... (min:{}, max:{})", versionFunction.apply(min), versionFunction.apply(max));
It doesn't reduce the code that much, but does prevent repetition of the logic to extract the version from the optional to avoid potential drift.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, this makes sense, but I am thinking of pushing it a bit further and changing the toString()
implementation and stop using serialize
everywhere.
The different between the two is about format, serialize()
gives a <major>.<minor>.<patch>
while toString()
returns Version{version='<major>.<minor>.<patch>', major='<major>', minor='<minor>', patch='<patch>'}
.
return String.format("Destination: %s: %s: protocol version: %s", | ||
destDef.getDestinationDefinitionId(), destDef.getName(), destDef.getProtocolVersion()); | ||
} catch (Exception e) { | ||
log.info("Failed to getStandardDestinationDefinition for " + defId, e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: use placeholders instead of string concatenation in log message.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
a few nits to address/can be done in a follow up PR.
if (actorType == ActorType.SOURCE) { | ||
stream = definitionsProvider.getSourceDefinitions() | ||
.stream() | ||
.map(def -> Map.entry(def.getSourceDefinitionId(), AirbyteProtocolVersion.getWithDefault(def.getSpec().getProtocolVersion()))); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not doing collect(Collectors.toMap) here? or just map the to get the keys. It looks like this is the only thing that is neeed here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need both UUID and Protocol version, see line 169
return String.format("Source: %s: %s: protocol version: %s", | ||
sourceDef.getSourceDefinitionId(), sourceDef.getName(), sourceDef.getProtocolVersion()); | ||
} catch (Exception e) { | ||
log.info("Failed to getStandardSourceDefinition for {}", defId, e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this be an error?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am on the fence here.
At this point, we are failing in the extra read we do for pretty printing errors, which is why I left it as info.
*/ | ||
static Set<String> getConnectorRepositoriesInUse(final DSLContext ctx) { | ||
return getActorDefinitionsInUse(ctx) | ||
.map(Record4::value2) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we use map(record -> record.get(ACTOR_DEFINITION.DOCKER_REPOSITORY))
here? This would help to understand what we are fetching here. (Same bellow)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Much better indeed. done.
@jdpgrailsdev, @benmoriceau, FYI, I just changed the interface a bit to accommodate cloud behavior. With the addition of the |
…fter-protocol-update
…fter-protocol-update
* Refactoring to improve code re-use * Add ProtocolVersionChecker * Add an option to configure if we are automatically upgrading connectors * Add airbyte version check to pass the fresh install case * Inject DefinitionsProvider in the BootloaderApp * Remove AutoUpgradeConnector config * Improve logging * Use named argument rather than positional * Make DefinitionsProvider optional * Format
What
When updating the platform we need to ensure that we are not going to end up having to disable a connection because the support range of the protocol version changed.
This PR adds the logic to check for protocol version compatibility of the platform and the connectors used in active syncs to decide whether it is safe to continue the upgrade or if we need to other interventions before.
Relates to #15463
How
Recommended reading order
x.java
y.python
🚨 User Impact 🚨
Following this change, platform upgrades can be aborted because of protocol version conflicts.
If such cases happen, there will need to be some manual operations before being able to upgrade the platform. A typical operation would be upgrading a connector to a version that is using a more recent version of the protocol.