Skip to content

Commit cb33efb

Browse files
authored
Fix file locking for the ModuleDataStore (#3139)
1 parent 5bdce5d commit cb33efb

File tree

9 files changed

+383
-79
lines changed

9 files changed

+383
-79
lines changed

.dockerignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ private/buf/cmd/buf/command/alpha/protoc/internal/protoc-gen-insertion-point-rec
1414
private/buf/cmd/buf/command/alpha/protoc/internal/protoc-gen-insertion-point-writer/protoc-gen-insertion-point-writer
1515
private/buf/cmd/buf/command/alpha/protoc/test.txt
1616
private/buf/cmd/buf/command/generate/internal/protoc-gen-top-level-type-names-yaml/protoc-gen-top-level-type-names-yaml
17+
private/buf/cmd/buf/testdata/imports/*/v3/modulelocks/
1718
private/bufpkg/bufmodule/bufmoduleapi/cmd/buf-legacyfederation-go-data/buf-legacyfederation-go-data
1819
private/bufpkg/bufmodule/bufmoduletesting/cmd/buf-digest/buf-digest
1920
private/bufpkg/bufmodule/bufmoduletesting/cmd/buf-new-commit-id/buf-new-commit-id

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
/private/buf/cmd/buf/command/alpha/protoc/internal/protoc-gen-insertion-point-writer/protoc-gen-insertion-point-writer
1515
/private/buf/cmd/buf/command/alpha/protoc/test.txt
1616
/private/buf/cmd/buf/command/generate/internal/protoc-gen-top-level-type-names-yaml/protoc-gen-top-level-type-names-yaml
17+
/private/buf/cmd/buf/testdata/imports/*/v3/modulelocks/
1718
/private/bufpkg/bufmodule/bufmoduleapi/cmd/buf-legacyfederation-go-data/buf-legacyfederation-go-data
1819
/private/bufpkg/bufmodule/bufmoduletesting/cmd/buf-digest/buf-digest
1920
/private/bufpkg/bufmodule/bufmoduletesting/cmd/buf-new-commit-id/buf-new-commit-id

make/buf/all.mk

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,8 @@ FILE_IGNORES := $(FILE_IGNORES) \
3131
private/buf/cmd/buf/command/alpha/protoc/test.txt \
3232
private/bufpkg/buftesting/cache/ \
3333
private/buf/buftesting/cache/ \
34-
private/pkg/storage/storageos/tmp/
34+
private/pkg/storage/storageos/tmp/ \
35+
private/buf/cmd/buf/testdata/imports/*/v3/modulelocks/
3536
LICENSE_HEADER_LICENSE_TYPE := apache
3637
LICENSE_HEADER_COPYRIGHT_HOLDER := Buf Technologies, Inc.
3738
LICENSE_HEADER_YEAR_RANGE := 2020-2024

private/buf/bufcli/cache.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ import (
2929
"github.com/bufbuild/buf/private/bufpkg/bufmodule/bufmodulestore"
3030
"github.com/bufbuild/buf/private/pkg/app/appext"
3131
"github.com/bufbuild/buf/private/pkg/command"
32+
"github.com/bufbuild/buf/private/pkg/filelock"
3233
"github.com/bufbuild/buf/private/pkg/normalpath"
3334
"github.com/bufbuild/buf/private/pkg/storage/storageos"
3435
)
@@ -50,6 +51,7 @@ var (
5051
v3CacheModuleRelDirPath,
5152
v3CacheCommitsRelDirPath,
5253
v3CacheWKTRelDirPath,
54+
v3CacheModuleLockRelDirPath,
5355
}
5456

5557
// v1CacheModuleDataRelDirPath is the relative path to the cache directory where module data
@@ -96,6 +98,11 @@ var (
9698
//
9799
// Normalized.
98100
v3CacheWKTRelDirPath = normalpath.Join("v3", "wellknowntypes")
101+
// v3CacheModuleLockRelDirPath is the relative path to the lock files directory for module data.
102+
// This directory is used to store lock files for synchronizing reading and writing module data from the cache.
103+
//
104+
// Normalized.
105+
v3CacheModuleLockRelDirPath = normalpath.Join("v3", "modulelocks")
99106
)
100107

101108
// NewModuleDataProvider returns a new ModuleDataProvider while creating the
@@ -166,12 +173,20 @@ func newModuleDataProvider(
166173
if err != nil {
167174
return nil, err
168175
}
176+
if err := createCacheDir(container.CacheDirPath(), v3CacheModuleLockRelDirPath); err != nil {
177+
return nil, err
178+
}
179+
filelocker, err := filelock.NewLocker(normalpath.Join(container.CacheDirPath(), v3CacheModuleLockRelDirPath))
180+
if err != nil {
181+
return nil, err
182+
}
169183
return bufmodulecache.NewModuleDataProvider(
170184
container.Logger(),
171185
delegateModuleDataProvider,
172186
bufmodulestore.NewModuleDataStore(
173187
container.Logger(),
174188
cacheBucket,
189+
filelocker,
175190
),
176191
), nil
177192
}

private/bufpkg/bufmodule/bufmodulecache/bufmodulecache_test.go

Lines changed: 71 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,15 +16,23 @@ package bufmodulecache
1616

1717
import (
1818
"context"
19+
"fmt"
20+
"os"
21+
"path/filepath"
1922
"testing"
23+
"time"
2024

2125
"github.com/bufbuild/buf/private/bufpkg/bufmodule"
2226
"github.com/bufbuild/buf/private/bufpkg/bufmodule/bufmodulestore"
2327
"github.com/bufbuild/buf/private/bufpkg/bufmodule/bufmoduletesting"
28+
"github.com/bufbuild/buf/private/pkg/filelock"
2429
"github.com/bufbuild/buf/private/pkg/slicesext"
2530
"github.com/bufbuild/buf/private/pkg/storage/storagemem"
31+
"github.com/bufbuild/buf/private/pkg/storage/storageos"
32+
"github.com/bufbuild/buf/private/pkg/thread"
2633
"github.com/stretchr/testify/require"
2734
"go.uber.org/zap"
35+
"go.uber.org/zap/zaptest"
2836
)
2937

3038
func TestCommitProviderForModuleKeyBasic(t *testing.T) {
@@ -165,6 +173,7 @@ func TestModuleDataProviderBasic(t *testing.T) {
165173
bufmodulestore.NewModuleDataStore(
166174
zap.NewNop(),
167175
storagemem.NewReadWriteBucket(),
176+
filelock.NewNopLocker(),
168177
),
169178
)
170179

@@ -214,6 +223,64 @@ func TestModuleDataProviderBasic(t *testing.T) {
214223
)
215224
}
216225

226+
func TestConcurrentCacheReadWrite(t *testing.T) {
227+
t.Parallel()
228+
229+
bsrProvider, moduleKeys := testGetBSRProviderAndModuleKeys(t, context.Background())
230+
tempDir := t.TempDir()
231+
cacheDir := filepath.Join(tempDir, "cache")
232+
logger := zaptest.NewLogger(t, zaptest.Level(zap.InfoLevel))
233+
234+
for i := 0; i < 20; i++ {
235+
require.NoError(t, os.MkdirAll(cacheDir, 0755))
236+
jobs, err := slicesext.MapError(
237+
[]int{0, 1, 2, 3, 4},
238+
func(i int) (func(ctx context.Context) error, error) {
239+
logger := logger.Named(fmt.Sprintf("job-%d", i))
240+
bucket, err := storageos.NewProvider().NewReadWriteBucket(cacheDir)
241+
if err != nil {
242+
return nil, err
243+
}
244+
filelocker, err := filelock.NewLocker(
245+
cacheDir,
246+
filelock.LockerWithLockRetryDelay(10*time.Millisecond), // Drops test time from ~16s to ~1s
247+
)
248+
if err != nil {
249+
return nil, err
250+
}
251+
cacheProvider := newModuleDataProvider(
252+
logger,
253+
bsrProvider,
254+
bufmodulestore.NewModuleDataStore(
255+
logger,
256+
bucket,
257+
filelocker,
258+
),
259+
)
260+
return func(ctx context.Context) error {
261+
moduleDatas, err := cacheProvider.GetModuleDatasForModuleKeys(
262+
ctx,
263+
moduleKeys,
264+
)
265+
if err != nil {
266+
return err
267+
}
268+
for _, moduleData := range moduleDatas {
269+
// Calling moduleData.Bucket() checks the digest
270+
if _, err := moduleData.Bucket(); err != nil {
271+
return err
272+
}
273+
}
274+
return nil
275+
}, nil
276+
},
277+
)
278+
require.NoError(t, err)
279+
require.NoError(t, thread.Parallelize(context.Background(), jobs))
280+
require.NoError(t, os.RemoveAll(cacheDir))
281+
}
282+
}
283+
217284
func testGetBSRProviderAndModuleKeys(t *testing.T, ctx context.Context) (bufmoduletesting.OmniProvider, []bufmodule.ModuleKey) {
218285
bsrProvider, err := bufmoduletesting.NewOmniProvider(
219286
bufmoduletesting.ModuleData{
@@ -235,7 +302,10 @@ func testGetBSRProviderAndModuleKeys(t *testing.T, ctx context.Context) (bufmodu
235302
bufmoduletesting.ModuleData{
236303
Name: "buf.build/foo/mod3",
237304
PathToData: map[string][]byte{
238-
"mod3.proto": []byte(
305+
"mod3a.proto": []byte(
306+
`syntax = proto3; package mod3;`,
307+
),
308+
"mod3b.proto": []byte(
239309
`syntax = proto3; package mod3;`,
240310
),
241311
},

0 commit comments

Comments
 (0)