Skip to content

Commit 8cc0b0a

Browse files
authored
Simplify bucket names and sourceID for rosmar (#7655)
1 parent a3a19af commit 8cc0b0a

File tree

8 files changed

+119
-76
lines changed

8 files changed

+119
-76
lines changed

base/bucket.go

Lines changed: 58 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,10 @@ package base
1010

1111
import (
1212
"context"
13+
"crypto/md5"
1314
"crypto/tls"
1415
"crypto/x509"
16+
"encoding/hex"
1517
"errors"
1618
"fmt"
1719
"io"
@@ -372,14 +374,25 @@ func getMaxTTL(ctx context.Context, store CouchbaseBucketStore) (int, error) {
372374
return bucketResponseWithMaxTTL.MaxTTLSeconds, nil
373375
}
374376

375-
// Get the Server UUID of the bucket, this is also known as the Cluster UUID
376-
func GetServerUUID(ctx context.Context, store CouchbaseBucketStore) (uuid string, err error) {
377-
respBytes, _, err := store.MgmtRequest(ctx, http.MethodGet, "/pools", "application/json", nil)
377+
// GetServerUUID returns Couchbase Server Cluster UUID on a timeout. If running against rosmar, do return an empty string.
378+
func GetServerUUID(ctx context.Context, bucket Bucket) (string, error) {
379+
gocbV2Bucket, err := AsGocbV2Bucket(bucket)
378380
if err != nil {
379-
return "", err
381+
return "", nil
382+
}
383+
// start a retry loop to get server ID
384+
worker := func() (bool, error, string) {
385+
respBytes, _, err := gocbV2Bucket.MgmtRequest(ctx, http.MethodGet, "/pools", "application/json", nil)
386+
if err != nil {
387+
return true, err, ""
388+
}
389+
390+
uuid, err := ParseClusterUUID(respBytes)
391+
return false, err, uuid
380392
}
381393

382-
return ParseClusterUUID(respBytes)
394+
err, uuid := RetryLoop(ctx, "Getting ServerUUID", worker, GetNewDatabaseSleeperFunc())
395+
return uuid, err
383396
}
384397

385398
func ParseClusterUUID(respBytes []byte) (string, error) {
@@ -522,3 +535,43 @@ func RequireNoBucketTTL(ctx context.Context, b Bucket) error {
522535

523536
return nil
524537
}
538+
539+
// GetSourceID returns the source ID for a bucket.
540+
func GetSourceID(ctx context.Context, bucket Bucket) (string, error) {
541+
// for rosmar bucket and testing, use the bucket name as the source ID to make it easier to identify the source
542+
gocbBucket, err := AsGocbV2Bucket(bucket)
543+
if err != nil {
544+
return bucket.GetName(), nil
545+
}
546+
547+
// If not overwriting the source ID, for rosmar, serverUUID would be ""
548+
serverUUID, err := GetServerUUID(ctx, gocbBucket)
549+
if err != nil {
550+
return "", err
551+
}
552+
bucketUUID, err := bucket.UUID()
553+
if err != nil {
554+
return "", err
555+
}
556+
return CreateEncodedSourceID(bucketUUID, serverUUID)
557+
}
558+
559+
// CreateEncodedSourceID will hash the bucket UUID and cluster UUID using md5 hash function then will base64 encode it
560+
// This function is in sync with xdcr implementation of UUIDstoDocumentSource https://github.com/couchbase/goxdcr/blob/dfba7a5b4251d93db46e2b0b4b55ea014218931b/hlv/hlv.go#L51
561+
func CreateEncodedSourceID(bucketUUID, clusterUUID string) (string, error) {
562+
md5Hash := md5.Sum([]byte(bucketUUID + clusterUUID))
563+
hexStr := hex.EncodeToString(md5Hash[:])
564+
source, err := HexToBase64(hexStr)
565+
if err != nil {
566+
return "", err
567+
}
568+
return string(source), nil
569+
}
570+
571+
// GetNewDatabaseSleeperFunc returns a sleeper function during database connection
572+
func GetNewDatabaseSleeperFunc() RetrySleeper {
573+
return CreateDoublingSleeperFunc(
574+
13, // MaxNumRetries approx 40 seconds total retry duration
575+
5, // InitialRetrySleepTimeMS
576+
)
577+
}

base/main_test_bucket_pool.go

Lines changed: 48 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,43 @@ import (
3232
// GTestBucketPool is a global instance of a TestBucketPool used to manage a pool of buckets for integration testing.
3333
var GTestBucketPool *TestBucketPool
3434

35+
// rosmarTracker is simplified implemenation of a bucketReadierQueue for rosmar buckets. The only purpose of this
36+
// tracker is to be able to name in low sequential orders like Couchbase Server bucket pool: rosmar0, rosmar1, etc.
37+
type rosmarTracker struct {
38+
lock sync.Mutex
39+
activeBuckets []bool
40+
}
41+
42+
// newRosmarTracker initializes a new rosmarTracker with the specified number of buckets.
43+
func newRosmarTracker(numBuckets int) *rosmarTracker {
44+
r := &rosmarTracker{
45+
activeBuckets: make([]bool, numBuckets),
46+
}
47+
return r
48+
}
49+
50+
// getNextBucketIdx returns the next available bucket index in a rosmarTracker. This will return the lowest available
51+
// number starting at 0. If all buckets are in use, it will return an error.
52+
func (r *rosmarTracker) GetNextBucketIdx() (int, error) {
53+
r.lock.Lock()
54+
defer r.lock.Unlock()
55+
// iterate over len of activeBuckets to find the first lexicographically in a map
56+
for i, active := range r.activeBuckets {
57+
if !active {
58+
r.activeBuckets[i] = true
59+
return i, nil
60+
}
61+
}
62+
return 0, fmt.Errorf("no rosmar buckets available, all have been used")
63+
}
64+
65+
// ReleaseBucketIdx releases a bucket index in a rosmarTracker. This should be called when a bucket is no longer in use.
66+
func (r *rosmarTracker) ReleaseBucketIdx(idx int) {
67+
r.lock.Lock()
68+
defer r.lock.Unlock()
69+
r.activeBuckets[idx] = false
70+
}
71+
3572
// TestBucketPool is used to manage a pool of pre-prepared buckets for testing purposes.
3673
type TestBucketPool struct {
3774
// integrationMode should be true if using Couchbase Server. If this is false, Walrus buckets are returned instead of pooled buckets.
@@ -60,6 +97,8 @@ type TestBucketPool struct {
6097
unclosedBuckets map[string]map[string]struct{}
6198
unclosedBucketsLock sync.Mutex
6299

100+
rosmarBuckets rosmarTracker
101+
63102
// skipCollections may be true for older Couchbase Server versions that do not support collections.
64103
skipCollections bool
65104
// numCollectionsPerBucket is the number of collections to create in each bucket
@@ -154,6 +193,7 @@ func NewTestBucketPoolWithOptions(ctx context.Context, bucketReadierFunc TBPBuck
154193
preserveBuckets: preserveBuckets,
155194
bucketInitFunc: bucketInitFunc,
156195
unclosedBuckets: make(map[string]map[string]struct{}),
196+
rosmarBuckets: *newRosmarTracker(numBuckets),
157197
useExistingBucket: TestUseExistingBucket(),
158198
useDefaultScope: options.UseDefaultScope,
159199
numCollectionsPerBucket: numCollectionsPerBucket,
@@ -250,12 +290,13 @@ func (tbp *TestBucketPool) GetWalrusTestBucket(t testing.TB, url string) (b Buck
250290
tbp.Fatalf(testCtx, "nil TestBucketPool, but not using a Walrus test URL")
251291
}
252292

253-
id, err := GenerateRandomID()
254-
require.NoError(t, err)
255-
293+
bucketIdx, err := tbp.rosmarBuckets.GetNextBucketIdx()
294+
if err != nil {
295+
tbp.Fatalf(testCtx, "Couldn't get next rosmar bucket index: %v", err)
296+
}
256297
var walrusBucket *rosmar.Bucket
257298
const typeName = "rosmar"
258-
bucketName := tbpBucketNamePrefix + "rosmar_" + id
299+
bucketName := fmt.Sprintf("rosmar%d", bucketIdx)
259300
if url == "walrus:" || url == rosmar.InMemoryURL {
260301
walrusBucket, err = rosmar.OpenBucket(url, bucketName, rosmar.CreateOrOpen)
261302
} else {
@@ -305,9 +346,10 @@ func (tbp *TestBucketPool) GetWalrusTestBucket(t testing.TB, url string) (b Buck
305346
// Persisted buckets should call close and delete
306347
closeErr := walrusBucket.CloseAndDelete(ctx)
307348
if closeErr != nil {
308-
tbp.Logf(ctx, "Unexpected error closing persistent %s bucket: %v", typeName, closeErr)
349+
tbp.Fatalf(ctx, "Unexpected error closing persistent %s bucket: %v", typeName, closeErr)
350+
return
309351
}
310-
352+
tbp.rosmarBuckets.ReleaseBucketIdx(bucketIdx)
311353
}
312354
}
313355

db/database.go

Lines changed: 3 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -298,14 +298,6 @@ func ValidateDatabaseName(dbName string) error {
298298
return nil
299299
}
300300

301-
// getNewDatabaseSleeperFunc returns a sleeper function during database connection
302-
func getNewDatabaseSleeperFunc() base.RetrySleeper {
303-
return base.CreateDoublingSleeperFunc(
304-
13, // MaxNumRetries approx 40 seconds total retry duration
305-
5, // InitialRetrySleepTimeMS
306-
)
307-
}
308-
309301
// connectToBucketErrorHandling takes the given spec and error and returns a formatted error, along with whether it was a fatal error.
310302
func connectToBucketErrorHandling(ctx context.Context, spec base.BucketSpec, gotErr error) (fatalError bool, err error) {
311303
if gotErr != nil {
@@ -352,30 +344,14 @@ func ConnectToBucket(ctx context.Context, spec base.BucketSpec, failFast bool) (
352344
}
353345

354346
description := fmt.Sprintf("Attempt to connect to bucket : %v", spec.BucketName)
355-
err, ibucket := base.RetryLoop(ctx, description, worker, getNewDatabaseSleeperFunc())
347+
err, ibucket := base.RetryLoop(ctx, description, worker, base.GetNewDatabaseSleeperFunc())
356348
if err != nil {
357349
return nil, err
358350
}
359351

360352
return ibucket.(base.Bucket), nil
361353
}
362354

363-
// GetServerUUID returns Couchbase Server Cluster UUID on a timeout. If running against rosmar, do return an empty string.
364-
func GetServerUUID(ctx context.Context, bucket base.Bucket) (string, error) {
365-
gocbV2Bucket, err := base.AsGocbV2Bucket(bucket)
366-
if err != nil {
367-
return "", nil
368-
}
369-
// start a retry loop to get server ID
370-
worker := func() (bool, error, interface{}) {
371-
uuid, err := base.GetServerUUID(ctx, gocbV2Bucket)
372-
return err != nil, err, uuid
373-
}
374-
375-
err, uuid := base.RetryLoop(ctx, "Getting ServerUUID", worker, getNewDatabaseSleeperFunc())
376-
return uuid.(string), err
377-
}
378-
379355
// Creates a new DatabaseContext on a bucket. The bucket will be closed when this context closes.
380356
func NewDatabaseContext(ctx context.Context, dbName string, bucket base.Bucket, autoImport bool, options DatabaseContextOptions) (dbc *DatabaseContext, returnedError error) {
381357
cleanupFunctions := make([]func(), 0)
@@ -400,7 +376,7 @@ func NewDatabaseContext(ctx context.Context, dbName string, bucket base.Bucket,
400376
return nil, err
401377
}
402378

403-
serverUUID, err := GetServerUUID(ctx, bucket)
379+
serverUUID, err := base.GetServerUUID(ctx, bucket)
404380
if err != nil {
405381
return nil, err
406382
}
@@ -422,7 +398,7 @@ func NewDatabaseContext(ctx context.Context, dbName string, bucket base.Bucket,
422398
if err != nil {
423399
return nil, err
424400
}
425-
sourceID, err := CreateEncodedSourceID(bucketUUID, serverUUID)
401+
sourceID, err := base.GetSourceID(ctx, bucket)
426402
if err != nil {
427403
return nil, err
428404
}

db/hybrid_logical_vector.go

Lines changed: 0 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,7 @@
99
package db
1010

1111
import (
12-
"crypto/md5"
1312
"encoding/base64"
14-
"encoding/hex"
1513
"fmt"
1614
"maps"
1715
"sort"
@@ -597,18 +595,6 @@ func EncodeValueStr(value string) (string, error) {
597595
return base.StringDecimalToLittleEndianHex(strings.TrimSpace(value))
598596
}
599597

600-
// CreateEncodedSourceID will hash the bucket UUID and cluster UUID using md5 hash function then will base64 encode it
601-
// This function is in sync with xdcr implementation of UUIDstoDocumentSource https://github.com/couchbase/goxdcr/blob/dfba7a5b4251d93db46e2b0b4b55ea014218931b/hlv/hlv.go#L51
602-
func CreateEncodedSourceID(bucketUUID, clusterUUID string) (string, error) {
603-
md5Hash := md5.Sum([]byte(bucketUUID + clusterUUID))
604-
hexStr := hex.EncodeToString(md5Hash[:])
605-
source, err := base.HexToBase64(hexStr)
606-
if err != nil {
607-
return "", err
608-
}
609-
return string(source), nil
610-
}
611-
612598
func (hlv HybridLogicalVector) MarshalJSON() ([]byte, error) {
613599
type BucketVector struct {
614600
CurrentVersionCAS string `json:"cvCas,omitempty"`

topologytest/peer_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -237,7 +237,7 @@ func NewPeer(t *testing.T, name string, buckets map[PeerBucketID]*base.TestBucke
237237
case PeerTypeCouchbaseServer:
238238
bucket, ok := buckets[opts.BucketID]
239239
require.True(t, ok, "bucket not found for bucket ID %d", opts.BucketID)
240-
sourceID, err := xdcr.GetSourceID(base.TestCtx(t), bucket)
240+
sourceID, err := base.GetSourceID(base.TestCtx(t), bucket)
241241
require.NoError(t, err)
242242
p := &CouchbaseServerPeer{
243243
name: name,

xdcr/replication.go

Lines changed: 0 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@ import (
1414
"fmt"
1515

1616
"github.com/couchbase/sync_gateway/base"
17-
"github.com/couchbase/sync_gateway/db"
1817
"github.com/couchbaselabs/rosmar"
1918
)
2019

@@ -79,16 +78,3 @@ func NewXDCR(ctx context.Context, fromBucket, toBucket base.Bucket, opts XDCROpt
7978
}
8079
return newCouchbaseServerManager(ctx, gocbFromBucket, gocbToBucket, opts)
8180
}
82-
83-
// GetSourceID returns the source ID for a bucket.
84-
func GetSourceID(ctx context.Context, bucket base.Bucket) (string, error) {
85-
serverUUID, err := db.GetServerUUID(ctx, bucket)
86-
if err != nil {
87-
return "", err
88-
}
89-
bucketUUID, err := bucket.UUID()
90-
if err != nil {
91-
return "", err
92-
}
93-
return db.CreateEncodedSourceID(bucketUUID, serverUUID)
94-
}

xdcr/rosmar_xdcr.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ func newRosmarManager(ctx context.Context, fromBucket, toBucket *rosmar.Bucket,
6767
if opts.Mobile != MobileOn {
6868
return nil, errors.New("Only sgbucket.XDCRMobileOn is supported in rosmar")
6969
}
70-
fromBucketSourceID, err := GetSourceID(ctx, fromBucket)
70+
fromBucketSourceID, err := base.GetSourceID(ctx, fromBucket)
7171
if err != nil {
7272
return nil, fmt.Errorf("Could not get source ID for %s: %w", fromBucket.GetName(), err)
7373
}

xdcr/xdcr_test.go

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ func TestMobileXDCRNoSyncDataCopied(t *testing.T) {
7070
fromDs = fromBucket.DefaultDataStore()
7171
toDs = toBucket.DefaultDataStore()
7272
}
73-
fromBucketSourceID, err := GetSourceID(ctx, fromBucket)
73+
fromBucketSourceID, err := base.GetSourceID(ctx, fromBucket)
7474
require.NoError(t, err)
7575
docCas := make(map[string]uint64)
7676
for _, doc := range []string{syncDoc, attachmentDoc, normalDoc} {
@@ -155,7 +155,7 @@ func getTwoBucketDataStores(t *testing.T) (base.Bucket, sgbucket.DataStore, base
155155
func TestReplicateVV(t *testing.T) {
156156
fromBucket, fromDs, toBucket, toDs := getTwoBucketDataStores(t)
157157
ctx := base.TestCtx(t)
158-
fromBucketSourceID, err := GetSourceID(ctx, fromBucket)
158+
fromBucketSourceID, err := base.GetSourceID(ctx, fromBucket)
159159
require.NoError(t, err)
160160

161161
hlvAgent := db.NewHLVAgent(t, fromDs, "fakeHLVSourceID", base.VvXattrName)
@@ -276,7 +276,7 @@ func TestReplicateVV(t *testing.T) {
276276
func TestVVWriteTwice(t *testing.T) {
277277
fromBucket, fromDs, toBucket, toDs := getTwoBucketDataStores(t)
278278
ctx := base.TestCtx(t)
279-
fromBucketSourceID, err := GetSourceID(ctx, fromBucket)
279+
fromBucketSourceID, err := base.GetSourceID(ctx, fromBucket)
280280
require.NoError(t, err)
281281

282282
docID := "doc1"
@@ -311,7 +311,7 @@ func TestVVObeyMou(t *testing.T) {
311311
base.SetUpTestLogging(t, base.LevelDebug, base.KeySGTest)
312312
fromBucket, fromDs, toBucket, toDs := getTwoBucketDataStores(t)
313313
ctx := base.TestCtx(t)
314-
fromBucketSourceID, err := GetSourceID(ctx, fromBucket)
314+
fromBucketSourceID, err := base.GetSourceID(ctx, fromBucket)
315315
require.NoError(t, err)
316316

317317
docID := "doc1"
@@ -390,7 +390,7 @@ func TestVVMouImport(t *testing.T) {
390390
base.SetUpTestLogging(t, base.LevelDebug, base.KeySGTest)
391391
fromBucket, fromDs, toBucket, toDs := getTwoBucketDataStores(t)
392392
ctx := base.TestCtx(t)
393-
fromBucketSourceID, err := GetSourceID(ctx, fromBucket)
393+
fromBucketSourceID, err := base.GetSourceID(ctx, fromBucket)
394394
require.NoError(t, err)
395395

396396
docID := "doc1"
@@ -500,7 +500,7 @@ func TestVVMouImport(t *testing.T) {
500500
func TestLWWAfterInitialReplication(t *testing.T) {
501501
fromBucket, fromDs, toBucket, toDs := getTwoBucketDataStores(t)
502502
ctx := base.TestCtx(t)
503-
fromBucketSourceID, err := GetSourceID(ctx, fromBucket)
503+
fromBucketSourceID, err := base.GetSourceID(ctx, fromBucket)
504504
require.NoError(t, err)
505505

506506
docID := "doc1"
@@ -631,9 +631,9 @@ func TestVVMultiActor(t *testing.T) {
631631
}
632632
fromBucket, fromDs, toBucket, toDs := getTwoBucketDataStores(t)
633633
ctx := base.TestCtx(t)
634-
fromBucketSourceID, err := GetSourceID(ctx, fromBucket)
634+
fromBucketSourceID, err := base.GetSourceID(ctx, fromBucket)
635635
require.NoError(t, err)
636-
toBucketSourceID, err := GetSourceID(ctx, toBucket)
636+
toBucketSourceID, err := base.GetSourceID(ctx, toBucket)
637637
require.NoError(t, err)
638638

639639
// Create document on source

0 commit comments

Comments
 (0)