@@ -799,12 +799,20 @@ func (i *IdentityStore) upsertEntityInTxn(ctx context.Context, txn *memdb.Txn, e
799
799
entity .NamespaceID = namespace .RootNamespaceID
800
800
}
801
801
802
+ ns , err := i .namespacer .NamespaceByID (ctx , entity .NamespaceID )
803
+ if err != nil {
804
+ return false , err
805
+ }
806
+
807
+ nsCtx := namespace .ContextWithNamespace (ctx , ns )
808
+
802
809
if previousEntity != nil && previousEntity .NamespaceID != entity .NamespaceID {
803
810
return false , errors .New ("entity and previous entity are not in the same namespace" )
804
811
}
805
812
806
813
aliasFactors := make ([]string , len (entity .Aliases ))
807
814
815
+ var localAliasesToDrop []* identity.Alias
808
816
for index , alias := range entity .Aliases {
809
817
// Verify that alias is not associated to a different one already
810
818
aliasByFactors , err := i .MemDBAliasByFactorsInTxn (txn , alias .MountAccessor , alias .Name , false , false )
@@ -830,6 +838,20 @@ func (i *IdentityStore) upsertEntityInTxn(ctx context.Context, txn *memdb.Txn, e
830
838
return false , errors .New ("alias from factors and entity are not in the same namespace" )
831
839
}
832
840
841
+ case i .localNode .ReplicationState ().HasState (consts .ReplicationPerformanceSecondary ) && alias .Local :
842
+ // If this alias is local and we're on the perf secondary, don't
843
+ // merge! We do this because we can't persist the entity merge to
844
+ // storage on a secondary and sending it upstream to the primary
845
+ // leads to all sorts of distributed state problems. Instead, just
846
+ // let the first alias win and remove any duplicates from the local
847
+ // alias packer.
848
+ //
849
+ // Rather than delete immediately, keep track of the list of local
850
+ // aliases to delete once we're done iterating.
851
+ i .logger .Trace ("skipping entity merge and dropping local alias on secondary" )
852
+ localAliasesToDrop = append (localAliasesToDrop , alias )
853
+ continue
854
+
833
855
case previousEntity != nil && aliasByFactors .CanonicalID == previousEntity .ID :
834
856
// previousEntity isn't upserted yet so may still contain the old
835
857
// alias reference in memdb if it was just changed; validate
@@ -864,7 +886,7 @@ func (i *IdentityStore) upsertEntityInTxn(ctx context.Context, txn *memdb.Txn, e
864
886
"alias_by_factors" , aliasByFactors )
865
887
866
888
persistMerge := persist || persistMerges
867
- respErr , intErr := i .mergeEntityAsPartOfUpsert (ctx , txn , entity , aliasByFactors .CanonicalID , persistMerge )
889
+ respErr , intErr := i .mergeEntityAsPartOfUpsert (nsCtx , txn , entity , aliasByFactors .CanonicalID , persistMerge )
868
890
switch {
869
891
case respErr != nil :
870
892
return false , respErr
@@ -894,7 +916,7 @@ func (i *IdentityStore) upsertEntityInTxn(ctx context.Context, txn *memdb.Txn, e
894
916
// are in case-sensitive mode so we can report these to the operator ahead
895
917
// of them disabling case-sensitive mode. Note that alias resolvers don't
896
918
// ever modify right now so ignore the bool.
897
- _ , conflictErr := i .conflictResolver .ResolveAliases (ctx , entity , aliasByFactors , alias )
919
+ _ , conflictErr := i .conflictResolver .ResolveAliases (nsCtx , entity , aliasByFactors , alias )
898
920
899
921
// This appears to be accounting for any duplicate aliases for the same
900
922
// Entity. In that case we would have skipped over the merge above in the
@@ -926,24 +948,42 @@ func (i *IdentityStore) upsertEntityInTxn(ctx context.Context, txn *memdb.Txn, e
926
948
927
949
if persist {
928
950
// Persist the previous entity object
929
- if err := i .persistEntity (ctx , previousEntity ); err != nil {
951
+ if err := i .persistEntity (nsCtx , previousEntity ); err != nil {
930
952
return false , err
931
953
}
932
954
}
933
955
}
934
956
957
+ // Now that we've gone through all aliases, we can update this entity and
958
+ // remove any local aliases before upserting in MemDB.
959
+ for _ , alias := range localAliasesToDrop {
960
+ entity .DeleteAliasByID (alias .ID )
961
+ }
962
+
935
963
// Insert or update entity in MemDB using the transaction created above
936
964
err = i .MemDBUpsertEntityInTxn (txn , entity )
937
965
if err != nil {
938
966
return false , err
939
967
}
940
968
941
969
if persist {
942
- if err := i .persistEntity (ctx , entity ); err != nil {
970
+ if err := i .persistEntity (nsCtx , entity ); err != nil {
943
971
return false , err
944
972
}
945
973
}
946
974
975
+ // Drop any local aliases detected above from the local alias packer on the
976
+ // Secondary active node. We'll only populate the localAliasesToDrop slice
977
+ // on Perf Secondaries, so check here to see if we're an active node so we
978
+ // can persist the change.
979
+ if i .localNode .HAState () == consts .Active {
980
+ for _ , alias := range localAliasesToDrop {
981
+ if err := i .localAliasPacker .DeleteItem (nsCtx , alias .CanonicalID ); err != nil {
982
+ i .logger .Warn ("failed to delete entity from local alias packer" , "entity_id" , alias .CanonicalID )
983
+ }
984
+ }
985
+ }
986
+
947
987
return false , nil
948
988
}
949
989
@@ -3022,19 +3062,24 @@ func makeLocalAliasWithName(t *testing.T, name, entityID string, bucketKey strin
3022
3062
// MakeDeduplicationDoneChan creates a new done channel for synchronization
3023
3063
// with tests outside of the vault package (e.g. in external_tests).
3024
3064
func (i * IdentityStore ) MakeDeduplicationDoneChan () {
3065
+ i .lock .Lock ()
3066
+ defer i .lock .Unlock ()
3067
+
3025
3068
i .activateDeduplicationDone = make (chan struct {})
3026
3069
}
3027
3070
3028
3071
// WaitForActivateDeduplicationDone is a test helper to wait for the identity
3029
3072
// deduplication activation to finish.
3030
3073
func (i * IdentityStore ) WaitForActivateDeduplicationDone (ctx context.Context ) error {
3031
- timeoutCtx , cancel := context .WithTimeout (ctx , 90 * time .Second )
3074
+ i .logger .Trace ("waiting for activation" , "channel" , i .activateDeduplicationDone )
3075
+
3076
+ timeoutCtx , cancel := context .WithTimeout (ctx , 30 * time .Second )
3032
3077
defer cancel ()
3033
3078
select {
3034
3079
case <- i .activateDeduplicationDone :
3080
+ i .logger .Trace ("activation write received" , "channel" , i .activateDeduplicationDone )
3035
3081
return nil
3036
3082
case <- timeoutCtx .Done ():
3037
3083
return fmt .Errorf ("timed out waiting for deduplication" )
3038
-
3039
3084
}
3040
3085
}
0 commit comments