Skip to content

Commit 3ac6b93

Browse files
committed
refactor(controller): refactor upgradeEngineForVolume
Preliminary task for v2 volume upgrade Longhorn 9104 Signed-off-by: Derek Su <[email protected]>
1 parent 2edf8ec commit 3ac6b93

File tree

1 file changed

+109
-75
lines changed

1 file changed

+109
-75
lines changed

controller/volume_controller.go

Lines changed: 109 additions & 75 deletions
Original file line numberDiff line numberDiff line change
@@ -3007,9 +3007,7 @@ func (c *VolumeController) hasEngineStatusSynced(e *longhorn.Engine, rs map[stri
30073007
return true
30083008
}
30093009

3010-
func (c *VolumeController) upgradeEngineForVolume(v *longhorn.Volume, es map[string]*longhorn.Engine, rs map[string]*longhorn.Replica) error {
3011-
var err error
3012-
3010+
func (c *VolumeController) upgradeEngineForVolume(v *longhorn.Volume, es map[string]*longhorn.Engine, rs map[string]*longhorn.Replica) (err error) {
30133011
if len(es) > 1 {
30143012
return nil
30153013
}
@@ -3115,44 +3113,95 @@ func (c *VolumeController) upgradeEngineForVolume(v *longhorn.Volume, es map[str
31153113
}
31163114

31173115
if types.IsDataEngineV1(v.Spec.DataEngine) {
3118-
oldImage, err := c.getEngineImageRO(v.Status.CurrentImage)
3119-
if err != nil {
3120-
log.WithError(err).Warnf("Failed to get engine image %v for live upgrade", v.Status.CurrentImage)
3116+
if err := c.checkOldAndNewEngineImagesForLiveUpgrade(v, volumeAndReplicaNodes...); err != nil {
3117+
log.Warnf("%v", err)
31213118
return nil
31223119
}
31233120

3124-
if isReady, err := c.ds.CheckEngineImageReadiness(oldImage.Spec.Image, volumeAndReplicaNodes...); !isReady {
3125-
log.WithError(err).Warnf("Engine live upgrade from %v, but the image wasn't ready", oldImage.Spec.Image)
3126-
return nil
3127-
}
3128-
newImage, err := c.getEngineImageRO(v.Spec.Image)
3129-
if err != nil {
3130-
log.WithError(err).Warnf("Failed to get engine image %v for live upgrade", v.Spec.Image)
3131-
return nil
3132-
}
3133-
if isReady, err := c.ds.CheckEngineImageReadiness(newImage.Spec.Image, volumeAndReplicaNodes...); !isReady {
3134-
log.WithError(err).Warnf("Engine live upgrade to %v, but the image wasn't ready", newImage.Spec.Image)
3135-
return nil
3121+
_, dataPathToOldRunningReplica, dataPathToNewReplica := c.groupReplicasByImageAndState(v, e, rs)
3122+
3123+
// Skip checking and creating new replicas for the 2 cases:
3124+
// 1. Volume is degraded.
3125+
// 2. The new replicas is activated and all old replicas are already purged.
3126+
if len(dataPathToOldRunningReplica) >= v.Spec.NumberOfReplicas {
3127+
if err := c.createAndStartMatchingReplicas(v, rs, dataPathToOldRunningReplica, dataPathToNewReplica,
3128+
func(r *longhorn.Replica, engineImage string) {
3129+
r.Spec.Image = engineImage
3130+
},
3131+
v.Spec.Image); err != nil {
3132+
return err
3133+
}
31363134
}
31373135

3138-
if oldImage.Status.GitCommit == newImage.Status.GitCommit {
3139-
log.Warnf("Engine image %v and %v are identical, delay upgrade until detach for volume", oldImage.Spec.Image, newImage.Spec.Image)
3140-
return nil
3136+
if e.Spec.Image != v.Spec.Image {
3137+
replicaAddressMap, err := c.constructReplicaAddressMap(v, e, dataPathToNewReplica)
3138+
if err != nil {
3139+
return nil
3140+
}
3141+
3142+
// Only upgrade e.Spec.Image if there are enough new upgraded replica.
3143+
// This prevent the deadlock in the case that an upgrade from engine image
3144+
// is followed immediately by an other upgrade.
3145+
// More specifically, after the 1st upgrade, e.Status.ReplicaModeMap empty.
3146+
// Therefore, dataPathToOldRunningReplica, dataPathToOldRunningReplica, and replicaAddressMap are also empty.
3147+
// Now, if we set e.Spec.UpgradedReplicaAddressMap to an empty map in the second upgrade,
3148+
// the second engine upgrade will be blocked since len(e.Spec.UpgradedReplicaAddressMap) == 0.
3149+
// On the other hand, the engine controller blocks the engine's status from being refreshed
3150+
// and keep the e.Status.ReplicaModeMap to be empty map. The system enter a deadlock for the volume.
3151+
if len(replicaAddressMap) == v.Spec.NumberOfReplicas {
3152+
e.Spec.UpgradedReplicaAddressMap = replicaAddressMap
3153+
e.Spec.Image = v.Spec.Image
3154+
}
31413155
}
3156+
c.finishLiveEngineUpgrade(v, e, rs, log)
3157+
} else {
3158+
// TODO: Implement the logic for data engine v2
3159+
return nil
3160+
}
3161+
return nil
3162+
}
31423163

3143-
if oldImage.Status.ControllerAPIVersion > newImage.Status.ControllerAPIVersion ||
3144-
oldImage.Status.ControllerAPIVersion < newImage.Status.ControllerAPIMinVersion {
3145-
log.Warnf("Failed to live upgrade from %v to %v: the old controller version %v "+
3146-
"is not compatible with the new controller version %v and the new controller minimal version %v",
3147-
oldImage.Spec.Image, newImage.Spec.Image,
3148-
oldImage.Status.ControllerAPIVersion, newImage.Status.ControllerAPIVersion, newImage.Status.ControllerAPIMinVersion)
3149-
return nil
3164+
func (c *VolumeController) constructReplicaAddressMap(v *longhorn.Volume, e *longhorn.Engine, dataPathToNewReplica map[string]*longhorn.Replica) (map[string]string, error) {
3165+
log := getLoggerForVolume(c.logger, v).WithFields(logrus.Fields{
3166+
"engine": e.Name,
3167+
"volumeDesiredEngineImage": v.Spec.Image,
3168+
})
3169+
3170+
replicaAddressMap := map[string]string{}
3171+
3172+
for _, r := range dataPathToNewReplica {
3173+
// wait for all potentially healthy replicas become running
3174+
if r.Status.CurrentState != longhorn.InstanceStateRunning {
3175+
return replicaAddressMap, fmt.Errorf("replica %v is not running", r.Name)
31503176
}
3177+
if r.Status.IP == "" {
3178+
log.WithField("replica", r.Name).Warn("replica is running but IP is empty")
3179+
continue
3180+
}
3181+
if r.Status.StorageIP == "" {
3182+
log.WithField("replica", r.Name).Warn("Replica is running but storage IP is empty, need to wait for update")
3183+
continue
3184+
}
3185+
if r.Status.Port == 0 {
3186+
log.WithField("replica", r.Name).Warn("Replica is running but port is 0")
3187+
continue
3188+
}
3189+
replicaAddressMap[r.Name] = imutil.GetURL(r.Status.StorageIP, r.Status.Port)
31513190
}
31523191

3153-
unknownReplicas := map[string]*longhorn.Replica{}
3154-
dataPathToOldRunningReplica := map[string]*longhorn.Replica{}
3155-
dataPathToNewReplica := map[string]*longhorn.Replica{}
3192+
return replicaAddressMap, nil
3193+
}
3194+
3195+
func (c *VolumeController) groupReplicasByImageAndState(v *longhorn.Volume, e *longhorn.Engine, rs map[string]*longhorn.Replica) (unknownReplicas, dataPathToOldRunningReplica, dataPathToNewReplica map[string]*longhorn.Replica) {
3196+
log := getLoggerForVolume(c.logger, v).WithFields(logrus.Fields{
3197+
"engine": e.Name,
3198+
"volumeDesiredEngineImage": v.Spec.Image,
3199+
})
3200+
3201+
unknownReplicas = map[string]*longhorn.Replica{}
3202+
dataPathToOldRunningReplica = map[string]*longhorn.Replica{}
3203+
dataPathToNewReplica = map[string]*longhorn.Replica{}
3204+
31563205
for _, r := range rs {
31573206
dataPath := types.GetReplicaDataPath(r.Spec.DiskPath, r.Spec.DataDirectoryName)
31583207
if r.Spec.Image == v.Status.CurrentImage && r.Status.CurrentState == longhorn.InstanceStateRunning && r.Spec.HealthyAt != "" {
@@ -3165,53 +3214,38 @@ func (c *VolumeController) upgradeEngineForVolume(v *longhorn.Volume, es map[str
31653214
}
31663215
}
31673216

3168-
// Skip checking and creating new replicas for the 2 cases:
3169-
// 1. Volume is degraded.
3170-
// 2. The new replicas is activated and all old replicas are already purged.
3171-
if len(dataPathToOldRunningReplica) >= v.Spec.NumberOfReplicas {
3172-
if err := c.createAndStartMatchingReplicas(v, rs, dataPathToOldRunningReplica, dataPathToNewReplica, func(r *longhorn.Replica, engineImage string) {
3173-
r.Spec.Image = engineImage
3174-
}, v.Spec.Image); err != nil {
3175-
return err
3176-
}
3217+
return unknownReplicas, dataPathToOldRunningReplica, dataPathToNewReplica
3218+
}
3219+
3220+
func (c *VolumeController) checkOldAndNewEngineImagesForLiveUpgrade(v *longhorn.Volume, nodes ...string) error {
3221+
oldImage, err := c.getEngineImageRO(v.Status.CurrentImage)
3222+
if err != nil {
3223+
return errors.Wrapf(err, "failed to get engine image %v for live upgrade", v.Status.CurrentImage)
31773224
}
31783225

3179-
if e.Spec.Image != v.Spec.Image {
3180-
replicaAddressMap := map[string]string{}
3181-
for _, r := range dataPathToNewReplica {
3182-
// wait for all potentially healthy replicas become running
3183-
if r.Status.CurrentState != longhorn.InstanceStateRunning {
3184-
return nil
3185-
}
3186-
if r.Status.IP == "" {
3187-
log.WithField("replica", r.Name).Warn("replica is running but IP is empty")
3188-
continue
3189-
}
3190-
if r.Status.StorageIP == "" {
3191-
log.WithField("replica", r.Name).Warn("Replica is running but storage IP is empty, need to wait for update")
3192-
continue
3193-
}
3194-
if r.Status.Port == 0 {
3195-
log.WithField("replica", r.Name).Warn("Replica is running but port is 0")
3196-
continue
3197-
}
3198-
replicaAddressMap[r.Name] = imutil.GetURL(r.Status.StorageIP, r.Status.Port)
3199-
}
3200-
// Only upgrade e.Spec.Image if there are enough new upgraded replica.
3201-
// This prevent the deadlock in the case that an upgrade from engine image
3202-
// is followed immediately by an other upgrade.
3203-
// More specifically, after the 1st upgrade, e.Status.ReplicaModeMap empty.
3204-
// Therefore, dataPathToOldRunningReplica, dataPathToOldRunningReplica, and replicaAddressMap are also empty.
3205-
// Now, if we set e.Spec.UpgradedReplicaAddressMap to an empty map in the second upgrade,
3206-
// the second engine upgrade will be blocked since len(e.Spec.UpgradedReplicaAddressMap) == 0.
3207-
// On the other hand, the engine controller blocks the engine's status from being refreshed
3208-
// and keep the e.Status.ReplicaModeMap to be empty map. The system enter a deadlock for the volume.
3209-
if len(replicaAddressMap) == v.Spec.NumberOfReplicas {
3210-
e.Spec.UpgradedReplicaAddressMap = replicaAddressMap
3211-
e.Spec.Image = v.Spec.Image
3212-
}
3226+
if isReady, err := c.ds.CheckEngineImageReadiness(oldImage.Spec.Image, nodes...); !isReady {
3227+
return errors.Wrapf(err, "engine live upgrade from %v, but the image wasn't ready", oldImage.Spec.Image)
3228+
}
3229+
newImage, err := c.getEngineImageRO(v.Spec.Image)
3230+
if err != nil {
3231+
return errors.Wrapf(err, "failed to get engine image %v for live upgrade", v.Spec.Image)
32133232
}
3214-
c.finishLiveEngineUpgrade(v, e, rs, log)
3233+
if isReady, err := c.ds.CheckEngineImageReadiness(newImage.Spec.Image, nodes...); !isReady {
3234+
return errors.Wrapf(err, "engine live upgrade to %v, but the image wasn't ready", newImage.Spec.Image)
3235+
}
3236+
3237+
if oldImage.Status.GitCommit == newImage.Status.GitCommit {
3238+
return fmt.Errorf("engine image %v and %v are identical, delay upgrade until detach for volume", oldImage.Spec.Image, newImage.Spec.Image)
3239+
}
3240+
3241+
if oldImage.Status.ControllerAPIVersion > newImage.Status.ControllerAPIVersion ||
3242+
oldImage.Status.ControllerAPIVersion < newImage.Status.ControllerAPIMinVersion {
3243+
return fmt.Errorf("failed to live upgrade from %v to %v: the old controller version %v "+
3244+
"is not compatible with the new controller version %v and the new controller minimal version %v",
3245+
oldImage.Spec.Image, newImage.Spec.Image,
3246+
oldImage.Status.ControllerAPIVersion, newImage.Status.ControllerAPIVersion, newImage.Status.ControllerAPIMinVersion)
3247+
}
3248+
32153249
return nil
32163250
}
32173251

0 commit comments

Comments
 (0)