Skip to content

Commit b22f3f1

Browse files
committed
Add FileReplica.Sync() unit tests.
1 parent 3075b2e commit b22f3f1

File tree

2 files changed

+101
-0
lines changed

2 files changed

+101
-0
lines changed

replica.go

+11
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,10 @@ type FileReplica struct {
8989
wg sync.WaitGroup
9090
ctx context.Context
9191
cancel func()
92+
93+
// If true, replica monitors database for changes automatically.
94+
// Set to false if replica is being used synchronously (such as in tests).
95+
MonitorEnabled bool
9296
}
9397

9498
// NewFileReplica returns a new instance of FileReplica.
@@ -98,6 +102,8 @@ func NewFileReplica(db *DB, name, dst string) *FileReplica {
98102
name: name,
99103
dst: dst,
100104
cancel: func() {},
105+
106+
MonitorEnabled: true,
101107
}
102108
}
103109

@@ -406,6 +412,11 @@ func (r *FileReplica) WALs(ctx context.Context) ([]*WALInfo, error) {
406412

407413
// Start starts replication for a given generation.
408414
func (r *FileReplica) Start(ctx context.Context) {
415+
// Ignore if replica is being used sychronously.
416+
if !r.MonitorEnabled {
417+
return
418+
}
419+
409420
// Stop previous replication.
410421
r.Stop()
411422

replica_test.go

+90
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
package litestream_test
2+
3+
import (
4+
"context"
5+
"testing"
6+
7+
"github.com/benbjohnson/litestream"
8+
)
9+
10+
func TestFileReplica_Sync(t *testing.T) {
11+
// Ensure replica can successfully sync after DB has sync'd.
12+
t.Run("InitialSync", func(t *testing.T) {
13+
db, sqldb := MustOpenDBs(t)
14+
defer MustCloseDBs(t, db, sqldb)
15+
r := NewTestFileReplica(t, db)
16+
17+
// Sync database & then sync replica.
18+
if err := db.Sync(); err != nil {
19+
t.Fatal(err)
20+
} else if err := r.Sync(context.Background()); err != nil {
21+
t.Fatal(err)
22+
}
23+
24+
// Ensure posistions match.
25+
if pos, err := db.Pos(); err != nil {
26+
t.Fatal(err)
27+
} else if got, want := r.LastPos(), pos; got != want {
28+
t.Fatalf("LastPos()=%v, want %v", got, want)
29+
}
30+
})
31+
32+
// Ensure replica can successfully sync multiple times.
33+
t.Run("MultiSync", func(t *testing.T) {
34+
db, sqldb := MustOpenDBs(t)
35+
defer MustCloseDBs(t, db, sqldb)
36+
r := NewTestFileReplica(t, db)
37+
38+
if _, err := sqldb.Exec(`CREATE TABLE foo (bar TEXT);`); err != nil {
39+
t.Fatal(err)
40+
}
41+
42+
// Write to the database multiple times and sync after each write.
43+
for i, n := 0, db.MinCheckpointPageN*2; i < n; i++ {
44+
if _, err := sqldb.Exec(`INSERT INTO foo (bar) VALUES ('baz')`); err != nil {
45+
t.Fatal(err)
46+
}
47+
48+
// Sync periodically.
49+
if i%100 == 0 || i == n-1 {
50+
if err := db.Sync(); err != nil {
51+
t.Fatal(err)
52+
} else if err := r.Sync(context.Background()); err != nil {
53+
t.Fatal(err)
54+
}
55+
}
56+
}
57+
58+
// Ensure posistions match.
59+
if pos, err := db.Pos(); err != nil {
60+
t.Fatal(err)
61+
} else if got, want := pos.Index, 2; got != want {
62+
t.Fatalf("Index=%v, want %v", got, want)
63+
} else if calcPos, err := r.CalcPos(pos.Generation); err != nil {
64+
t.Fatal(err)
65+
} else if got, want := calcPos, pos; got != want {
66+
t.Fatalf("CalcPos()=%v, want %v", got, want)
67+
} else if got, want := r.LastPos(), pos; got != want {
68+
t.Fatalf("LastPos()=%v, want %v", got, want)
69+
}
70+
})
71+
72+
// Ensure replica returns an error if there is no generation available from the DB.
73+
t.Run("ErrNoGeneration", func(t *testing.T) {
74+
db, sqldb := MustOpenDBs(t)
75+
defer MustCloseDBs(t, db, sqldb)
76+
r := NewTestFileReplica(t, db)
77+
78+
if err := r.Sync(context.Background()); err == nil || err.Error() != `no generation, waiting for data` {
79+
t.Fatal(err)
80+
}
81+
})
82+
}
83+
84+
// NewTestFileReplica returns a new replica using a temp directory & with monitoring disabled.
85+
func NewTestFileReplica(tb testing.TB, db *litestream.DB) *litestream.FileReplica {
86+
r := litestream.NewFileReplica(db, "", tb.TempDir())
87+
r.MonitorEnabled = false
88+
db.Replicas = []litestream.Replica{r}
89+
return r
90+
}

0 commit comments

Comments
 (0)