Skip to content

Commit 3895f24

Browse files
committed
Warmup data for ossfs2.0
1 parent 67e8986 commit 3895f24

File tree

5 files changed

+173
-1
lines changed

5 files changed

+173
-1
lines changed

pkg/mounter/proxy/server/alinas/driver.go

+4
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,10 @@ func (h *Driver) Mount(ctx context.Context, req *proxy.MountRequest) error {
5252
return h.mounter.Mount(req.Source, req.Target, req.Fstype, options)
5353
}
5454

55+
func (h *Driver) Warmup(targetPath, warmupDir string, workercount int, totalBytes int64) {
56+
return
57+
}
58+
5559
func (h *Driver) Init() {
5660
go runCommandForever("aliyun-alinas-mount-watchdog")
5761
go runCommandForever("aliyun-cpfs-mount-watchdog")

pkg/mounter/proxy/server/driver.go

+8-1
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"fmt"
66

77
"github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/mounter/proxy"
8+
"github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/utils"
89
)
910

1011
type Driver interface {
@@ -13,6 +14,7 @@ type Driver interface {
1314
Init()
1415
Terminate()
1516
Mount(ctx context.Context, req *proxy.MountRequest) error
17+
Warmup(string, string, int, int64)
1618
}
1719

1820
var (
@@ -29,5 +31,10 @@ func handleMountRequest(ctx context.Context, req *proxy.MountRequest) error {
2931
if h == nil {
3032
return fmt.Errorf("fstype %q not supported", req.Fstype)
3133
}
32-
return h.Mount(ctx, req)
34+
err := h.Mount(ctx, req)
35+
if err != nil {
36+
return err
37+
}
38+
h.Warmup(req.Target, "warmupPath", 10, 10 * utils.GiB)
39+
return nil
3340
}

pkg/mounter/proxy/server/ossfs/driver.go

+4
Original file line numberDiff line numberDiff line change
@@ -160,3 +160,7 @@ func (h *Driver) Terminate() {
160160
h.wg.Wait()
161161
klog.InfoS("All ossfs processes exited")
162162
}
163+
164+
func (h *Driver) Warmup(targetPath, warmupDir string, workercount int, totalBytes int64) {
165+
return
166+
}

pkg/mounter/proxy/server/ossfs2/driver.go

+54
Original file line numberDiff line numberDiff line change
@@ -164,3 +164,57 @@ func (h *Driver) Terminate() {
164164
h.wg.Wait()
165165
klog.InfoS("All ossfs2 processes exited")
166166
}
167+
168+
func (h *Driver) Warmup(targetPath, warmupDir string, workerCount int, totalBytes int64) {
169+
klog.Infof("Starting FUSE mountpoint warmup for: %s, workercount: %s", targetPath, workerCount)
170+
startTime := time.Now()
171+
172+
// Find the immediate entries (files and subdirectories) - our chunks
173+
entries, err := os.ReadDir(targetPath)
174+
if err != nil {
175+
klog.Errorf("Error reading mountpoint directory %s: %v", targetPath, err)
176+
return
177+
}
178+
179+
if len(entries) == 0 {
180+
klog.Errorf("No entries found in %s to process.", targetPath)
181+
return
182+
}
183+
184+
// Use a channel to send entry paths to workers
185+
entryChan := make(chan string, len(entries))
186+
// Use a channel to receive bytes read from workers
187+
bytesReadChan := make(chan int64, workerCount)
188+
var totalBytesRead int64
189+
var totalBytesMu sync.Mutex
190+
var wg sync.WaitGroup
191+
192+
wg.Add(1)
193+
go func() {
194+
for bytes := range bytesReadChan {
195+
totalBytesMu.Lock()
196+
totalBytesRead += bytes
197+
totalBytesMu.Unlock()
198+
}
199+
wg.Done()
200+
}()
201+
202+
for i := 0; i < workerCount; i++ {
203+
wg.Add(1)
204+
go worker(i, entryChan, bytesReadChan, &wg)
205+
}
206+
207+
for _, entry := range entries {
208+
entryPath := filepath.Join(targetPath, entry.Name())
209+
entryChan <- entryPath
210+
}
211+
close(entryChan)
212+
213+
wg.Wait()
214+
// all works done
215+
close(bytesReadChan)
216+
217+
duration := time.Since(startTime)
218+
klog.Infof("Finished FUSE mountpoint warmup for: %s, total bytes read: %d MiB, duration: %v", targetPath, totalBytesRead/(1024*1024), duration)
219+
return
220+
}
+103
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
package ossfs2
2+
3+
import (
4+
"io"
5+
"io/fs"
6+
"os"
7+
"path/filepath"
8+
"k8s.io/klog/v2"
9+
"sync"
10+
)
11+
12+
// Read files in 1GB chunks to trigger caching without excessive memory usage
13+
const readChunkSize = 1024 * 1024 * 1024
14+
var totalBytesRead int64
15+
16+
17+
func worker(id int, entryChan <-chan string, bytesReadChan chan<- int64, wg *sync.WaitGroup) {
18+
defer wg.Done()
19+
20+
// Function to read a single file (used if the entry is a file)
21+
readSingleFile := func(filePath string, workerBytesRead *int64) error {
22+
klog.Infof("Worker %d: Reading single file: %s", id, filePath)
23+
file, err := os.Open(filePath)
24+
if err != nil {
25+
klog.Errorf("Worker %d: Error opening file %s: %v", id, filePath, err)
26+
return err
27+
}
28+
defer file.Close()
29+
30+
buffer := make([]byte, readChunkSize)
31+
for {
32+
n, readErr := file.Read(buffer)
33+
if n > 0 {
34+
*workerBytesRead += int64(n)
35+
}
36+
if readErr == io.EOF {
37+
break
38+
}
39+
if readErr != nil {
40+
klog.Errorf("Worker %d: Error reading file %s: %v", id, filePath, readErr)
41+
return readErr
42+
}
43+
}
44+
return nil
45+
}
46+
47+
48+
walkAndRead := func(startPath string, workerBytesRead *int64) error {
49+
return filepath.WalkDir(startPath, func(path string, d fs.DirEntry, err error) error {
50+
if err != nil {
51+
klog.Errorf("Worker %d: Error accessing path %s: %v", id, path, err)
52+
return nil
53+
}
54+
55+
// Skip the starting directory itself, as we're processing its contents
56+
if path == startPath && d.IsDir() {
57+
return nil
58+
}
59+
60+
if !d.IsDir() {
61+
return readSingleFile(path, workerBytesRead)
62+
} else {
63+
klog.Infof("Worker %d: Visiting directory: %s", id, path)
64+
}
65+
return nil
66+
})
67+
}
68+
69+
for entryPath := range entryChan {
70+
klog.Infof("Worker %d received task for: %s", id, entryPath)
71+
var workerBytesRead int64
72+
73+
info, err := os.Stat(entryPath)
74+
if err != nil {
75+
klog.Errorf("Worker %d: Error stating entry %s: %v", id, entryPath, err)
76+
// Report 0 bytes for this failed entry and continue
77+
bytesReadChan <- 0
78+
continue
79+
}
80+
81+
if info.IsDir() {
82+
// Process directory recursively
83+
err = walkAndRead(entryPath, &workerBytesRead)
84+
if err != nil {
85+
klog.Errorf("Worker %d: Error during recursive walk for %s: %v", id, entryPath, err)
86+
}
87+
klog.Infof("Worker %d finished directory task: %s (Read %d MiB)", id, entryPath, workerBytesRead/(1024*1024))
88+
89+
} else {
90+
// Process single file
91+
err = readSingleFile(entryPath, &workerBytesRead)
92+
if err != nil {
93+
klog.Errorf("Worker %d: Error reading single file %s: %v", id, entryPath, err)
94+
}
95+
klog.Infof("Worker %d finished file task: %s (Read %d MiB)", id, entryPath, workerBytesRead/(1024*1024))
96+
}
97+
98+
// Report bytes read for this task (either recursive walk or single file read)
99+
bytesReadChan <- workerBytesRead
100+
}
101+
102+
klog.Infof("Worker %d shutting down.", id)
103+
}

0 commit comments

Comments
 (0)