Skip to content

Commit b1ffc87

Browse files
authored
feat: refactor Fetcher interface used for downloading migrations (#8728)
* feat: refactor Fetcher interface used for downloading migrations * feat: add RetryFetcher for migration downloads * feat: 3 retries for each HTTP migration download
1 parent 4f3eb4c commit b1ffc87

File tree

10 files changed

+83
-44
lines changed

10 files changed

+83
-44
lines changed

repo/fsrepo/migrations/fetch.go

+3-4
Original file line numberDiff line numberDiff line change
@@ -111,15 +111,14 @@ func FetchBinary(ctx context.Context, fetcher Fetcher, dist, ver, binName, out s
111111
}
112112
defer arcFile.Close()
113113

114-
// Open connection to download archive from ipfs path
115-
rc, err := fetcher.Fetch(ctx, arcDistPath)
114+
// Open connection to download archive from ipfs path and write to file
115+
arcBytes, err := fetcher.Fetch(ctx, arcDistPath)
116116
if err != nil {
117117
return "", err
118118
}
119-
defer rc.Close()
120119

121120
// Write download data
122-
_, err = io.Copy(arcFile, rc)
121+
_, err = io.Copy(arcFile, bytes.NewReader(arcBytes))
123122
if err != nil {
124123
return "", err
125124
}

repo/fsrepo/migrations/fetch_test.go

+4-10
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,10 @@ package migrations
22

33
import (
44
"bufio"
5+
"bytes"
56
"context"
67
"fmt"
78
"io"
8-
"io/ioutil"
99
"net/http"
1010
"net/http/httptest"
1111
"os"
@@ -96,14 +96,13 @@ func TestHttpFetch(t *testing.T) {
9696

9797
fetcher := NewHttpFetcher("", ts.URL, "", 0)
9898

99-
rc, err := fetcher.Fetch(ctx, "/versions")
99+
out, err := fetcher.Fetch(ctx, "/versions")
100100
if err != nil {
101101
t.Fatal(err)
102102
}
103-
defer rc.Close()
104103

105104
var lines []string
106-
scan := bufio.NewScanner(rc)
105+
scan := bufio.NewScanner(bytes.NewReader(out))
107106
for scan.Scan() {
108107
lines = append(lines, scan.Text())
109108
}
@@ -232,16 +231,11 @@ func TestMultiFetcher(t *testing.T) {
232231

233232
mf := NewMultiFetcher(badFetcher, fetcher)
234233

235-
rc, err := mf.Fetch(ctx, "/versions")
234+
vers, err := mf.Fetch(ctx, "/versions")
236235
if err != nil {
237236
t.Fatal(err)
238237
}
239-
defer rc.Close()
240238

241-
vers, err := ioutil.ReadAll(rc)
242-
if err != nil {
243-
t.Fatal("could not read versions:", err)
244-
}
245239
if len(vers) < 45 {
246240
fmt.Println("unexpected more data")
247241
}

repo/fsrepo/migrations/fetcher.go

+4-6
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,7 @@ const (
2121

2222
type Fetcher interface {
2323
// Fetch attempts to fetch the file at the given ipfs path.
24-
// Returns io.ReadCloser on success, which caller must close.
25-
Fetch(ctx context.Context, filePath string) (io.ReadCloser, error)
24+
Fetch(ctx context.Context, filePath string) ([]byte, error)
2625
// Close performs any cleanup after the fetcher is not longer needed.
2726
Close() error
2827
}
@@ -50,13 +49,12 @@ func NewMultiFetcher(f ...Fetcher) *MultiFetcher {
5049
}
5150

5251
// Fetch attempts to fetch the file at each of its fetchers until one succeeds.
53-
// Returns io.ReadCloser on success, which caller must close.
54-
func (f *MultiFetcher) Fetch(ctx context.Context, ipfsPath string) (io.ReadCloser, error) {
52+
func (f *MultiFetcher) Fetch(ctx context.Context, ipfsPath string) ([]byte, error) {
5553
var errs error
5654
for _, fetcher := range f.fetchers {
57-
rc, err := fetcher.Fetch(ctx, ipfsPath)
55+
out, err := fetcher.Fetch(ctx, ipfsPath)
5856
if err == nil {
59-
return rc, nil
57+
return out, nil
6058
}
6159
fmt.Printf("Error fetching: %s\n", err.Error())
6260
errs = multierror.Append(errs, err)

repo/fsrepo/migrations/httpfetcher.go

+9-5
Original file line numberDiff line numberDiff line change
@@ -60,9 +60,8 @@ func NewHttpFetcher(distPath, gateway, userAgent string, fetchLimit int64) *Http
6060
}
6161

6262
// Fetch attempts to fetch the file at the given path, from the distribution
63-
// site configured for this HttpFetcher. Returns io.ReadCloser on success,
64-
// which caller must close.
65-
func (f *HttpFetcher) Fetch(ctx context.Context, filePath string) (io.ReadCloser, error) {
63+
// site configured for this HttpFetcher.
64+
func (f *HttpFetcher) Fetch(ctx context.Context, filePath string) ([]byte, error) {
6665
gwURL := f.gateway + path.Join(f.distPath, filePath)
6766
fmt.Printf("Fetching with HTTP: %q\n", gwURL)
6867

@@ -89,10 +88,15 @@ func (f *HttpFetcher) Fetch(ctx context.Context, filePath string) (io.ReadCloser
8988
return nil, fmt.Errorf("GET %s error: %s: %s", gwURL, resp.Status, string(mes))
9089
}
9190

91+
var rc io.ReadCloser
9292
if f.limit != 0 {
93-
return NewLimitReadCloser(resp.Body, f.limit), nil
93+
rc = NewLimitReadCloser(resp.Body, f.limit)
94+
} else {
95+
rc = resp.Body
9496
}
95-
return resp.Body, nil
97+
defer rc.Close()
98+
99+
return ioutil.ReadAll(rc)
96100
}
97101

98102
func (f *HttpFetcher) Close() error {

repo/fsrepo/migrations/ipfsfetcher/ipfsfetcher.go

+11-5
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,8 @@ type IpfsFetcher struct {
5252
addrInfo peer.AddrInfo
5353
}
5454

55+
var _ migrations.Fetcher = (*IpfsFetcher)(nil)
56+
5557
// NewIpfsFetcher creates a new IpfsFetcher
5658
//
5759
// Specifying "" for distPath sets the default IPNS path.
@@ -85,9 +87,8 @@ func NewIpfsFetcher(distPath string, fetchLimit int64, repoRoot *string) *IpfsFe
8587
}
8688

8789
// Fetch attempts to fetch the file at the given path, from the distribution
88-
// site configured for this HttpFetcher. Returns io.ReadCloser on success,
89-
// which caller must close.
90-
func (f *IpfsFetcher) Fetch(ctx context.Context, filePath string) (io.ReadCloser, error) {
90+
// site configured for this HttpFetcher.
91+
func (f *IpfsFetcher) Fetch(ctx context.Context, filePath string) ([]byte, error) {
9192
// Initialize and start IPFS node on first call to Fetch, since the fetcher
9293
// may be created by not used.
9394
f.openOnce.Do(func() {
@@ -123,10 +124,15 @@ func (f *IpfsFetcher) Fetch(ctx context.Context, filePath string) (io.ReadCloser
123124
return nil, fmt.Errorf("%q is not a file", filePath)
124125
}
125126

127+
var rc io.ReadCloser
126128
if f.limit != 0 {
127-
return migrations.NewLimitReadCloser(fileNode, f.limit), nil
129+
rc = migrations.NewLimitReadCloser(fileNode, f.limit)
130+
} else {
131+
rc = fileNode
128132
}
129-
return fileNode, nil
133+
defer rc.Close()
134+
135+
return ioutil.ReadAll(rc)
130136
}
131137

132138
func (f *IpfsFetcher) Close() error {

repo/fsrepo/migrations/ipfsfetcher/ipfsfetcher_test.go

+4-5
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package ipfsfetcher
22

33
import (
44
"bufio"
5+
"bytes"
56
"context"
67
"fmt"
78
"os"
@@ -28,14 +29,13 @@ func TestIpfsFetcher(t *testing.T) {
2829
fetcher := NewIpfsFetcher("", 0, nil)
2930
defer fetcher.Close()
3031

31-
rc, err := fetcher.Fetch(ctx, "go-ipfs/versions")
32+
out, err := fetcher.Fetch(ctx, "go-ipfs/versions")
3233
if err != nil {
3334
t.Fatal(err)
3435
}
35-
defer rc.Close()
3636

3737
var lines []string
38-
scan := bufio.NewScanner(rc)
38+
scan := bufio.NewScanner(bytes.NewReader(out))
3939
for scan.Scan() {
4040
lines = append(lines, scan.Text())
4141
}
@@ -52,8 +52,7 @@ func TestIpfsFetcher(t *testing.T) {
5252
}
5353

5454
// Check not found
55-
_, err = fetcher.Fetch(ctx, "/no_such_file")
56-
if err == nil {
55+
if _, err = fetcher.Fetch(ctx, "/no_such_file"); err == nil {
5756
t.Fatal("expected error 404")
5857
}
5958

repo/fsrepo/migrations/migrations.go

+3-2
Original file line numberDiff line numberDiff line change
@@ -155,13 +155,14 @@ func ReadMigrationConfig(repoRoot string) (*config.Migration, error) {
155155
// downloadSources,
156156
func GetMigrationFetcher(downloadSources []string, distPath string, newIpfsFetcher func(string) Fetcher) (Fetcher, error) {
157157
const httpUserAgent = "go-ipfs"
158+
const numTriesPerHTTP = 3
158159

159160
var fetchers []Fetcher
160161
for _, src := range downloadSources {
161162
src := strings.TrimSpace(src)
162163
switch src {
163164
case "HTTPS", "https", "HTTP", "http":
164-
fetchers = append(fetchers, NewHttpFetcher(distPath, "", httpUserAgent, 0))
165+
fetchers = append(fetchers, &RetryFetcher{NewHttpFetcher(distPath, "", httpUserAgent, 0), numTriesPerHTTP})
165166
case "IPFS", "ipfs":
166167
if newIpfsFetcher != nil {
167168
fetchers = append(fetchers, newIpfsFetcher(distPath))
@@ -178,7 +179,7 @@ func GetMigrationFetcher(downloadSources []string, distPath string, newIpfsFetch
178179
default:
179180
return nil, errors.New("bad gateway address: url scheme must be http or https")
180181
}
181-
fetchers = append(fetchers, NewHttpFetcher(distPath, u.String(), httpUserAgent, 0))
182+
fetchers = append(fetchers, &RetryFetcher{NewHttpFetcher(distPath, u.String(), httpUserAgent, 0), numTriesPerHTTP})
182183
case "":
183184
// Ignore empty string
184185
}

repo/fsrepo/migrations/migrations_test.go

+9-4
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ package migrations
33
import (
44
"context"
55
"fmt"
6-
"io"
76
"log"
87
"os"
98
"path/filepath"
@@ -290,7 +289,9 @@ func TestReadMigrationConfig(t *testing.T) {
290289

291290
type mockIpfsFetcher struct{}
292291

293-
func (m *mockIpfsFetcher) Fetch(ctx context.Context, filePath string) (io.ReadCloser, error) {
292+
var _ Fetcher = (*mockIpfsFetcher)(nil)
293+
294+
func (m *mockIpfsFetcher) Fetch(ctx context.Context, filePath string) ([]byte, error) {
294295
return nil, nil
295296
}
296297

@@ -323,7 +324,9 @@ func TestGetMigrationFetcher(t *testing.T) {
323324
if err != nil {
324325
t.Fatal(err)
325326
}
326-
if _, ok := f.(*HttpFetcher); !ok {
327+
if rf, ok := f.(*RetryFetcher); !ok {
328+
t.Fatal("expected RetryFetcher")
329+
} else if _, ok := rf.Fetcher.(*HttpFetcher); !ok {
327330
t.Fatal("expected HttpFetcher")
328331
}
329332

@@ -341,7 +344,9 @@ func TestGetMigrationFetcher(t *testing.T) {
341344
if err != nil {
342345
t.Fatal(err)
343346
}
344-
if _, ok := f.(*HttpFetcher); !ok {
347+
if rf, ok := f.(*RetryFetcher); !ok {
348+
t.Fatal("expected RetryFetcher")
349+
} else if _, ok := rf.Fetcher.(*HttpFetcher); !ok {
345350
t.Fatal("expected HttpFetcher")
346351
}
347352

+33
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
package migrations
2+
3+
import (
4+
"context"
5+
"fmt"
6+
)
7+
8+
type RetryFetcher struct {
9+
Fetcher
10+
MaxTries int
11+
}
12+
13+
var _ Fetcher = (*RetryFetcher)(nil)
14+
15+
func (r *RetryFetcher) Fetch(ctx context.Context, filePath string) ([]byte, error) {
16+
var lastErr error
17+
for i := 0; i < r.MaxTries; i++ {
18+
out, err := r.Fetcher.Fetch(ctx, filePath)
19+
if err == nil {
20+
return out, nil
21+
}
22+
23+
if ctx.Err() != nil {
24+
return nil, ctx.Err()
25+
}
26+
lastErr = err
27+
}
28+
return nil, fmt.Errorf("exceeded number of retries. last error was %w", lastErr)
29+
}
30+
31+
func (r *RetryFetcher) Close() error {
32+
return r.Fetcher.Close()
33+
}

repo/fsrepo/migrations/versions.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package migrations
22

33
import (
44
"bufio"
5+
"bytes"
56
"context"
67
"errors"
78
"fmt"
@@ -39,16 +40,15 @@ func LatestDistVersion(ctx context.Context, fetcher Fetcher, dist string, stable
3940
// available on the distriburion site. List is in ascending order, unless
4041
// sortDesc is true.
4142
func DistVersions(ctx context.Context, fetcher Fetcher, dist string, sortDesc bool) ([]string, error) {
42-
rc, err := fetcher.Fetch(ctx, path.Join(dist, distVersions))
43+
versionBytes, err := fetcher.Fetch(ctx, path.Join(dist, distVersions))
4344
if err != nil {
4445
return nil, err
4546
}
46-
defer rc.Close()
4747

4848
prefix := "v"
4949
var vers []semver.Version
5050

51-
scan := bufio.NewScanner(rc)
51+
scan := bufio.NewScanner(bytes.NewReader(versionBytes))
5252
for scan.Scan() {
5353
ver, err := semver.Make(strings.TrimLeft(scan.Text(), prefix))
5454
if err != nil {

0 commit comments

Comments
 (0)