diff --git a/repo/fsrepo/migrations/fetch.go b/repo/fsrepo/migrations/fetch.go index a3493e75009..ff202088a00 100644 --- a/repo/fsrepo/migrations/fetch.go +++ b/repo/fsrepo/migrations/fetch.go @@ -111,15 +111,14 @@ func FetchBinary(ctx context.Context, fetcher Fetcher, dist, ver, binName, out s } defer arcFile.Close() - // Open connection to download archive from ipfs path - rc, err := fetcher.Fetch(ctx, arcDistPath) + // Open connection to download archive from ipfs path and write to file + arcBytes, err := fetcher.Fetch(ctx, arcDistPath) if err != nil { return "", err } - defer rc.Close() // Write download data - _, err = io.Copy(arcFile, rc) + _, err = io.Copy(arcFile, bytes.NewReader(arcBytes)) if err != nil { return "", err } diff --git a/repo/fsrepo/migrations/fetch_test.go b/repo/fsrepo/migrations/fetch_test.go index ec7c6d5e73e..2273cb5e9e0 100644 --- a/repo/fsrepo/migrations/fetch_test.go +++ b/repo/fsrepo/migrations/fetch_test.go @@ -2,10 +2,10 @@ package migrations import ( "bufio" + "bytes" "context" "fmt" "io" - "io/ioutil" "net/http" "net/http/httptest" "os" @@ -96,14 +96,13 @@ func TestHttpFetch(t *testing.T) { fetcher := NewHttpFetcher("", ts.URL, "", 0) - rc, err := fetcher.Fetch(ctx, "/versions") + out, err := fetcher.Fetch(ctx, "/versions") if err != nil { t.Fatal(err) } - defer rc.Close() var lines []string - scan := bufio.NewScanner(rc) + scan := bufio.NewScanner(bytes.NewReader(out)) for scan.Scan() { lines = append(lines, scan.Text()) } @@ -232,16 +231,11 @@ func TestMultiFetcher(t *testing.T) { mf := NewMultiFetcher(badFetcher, fetcher) - rc, err := mf.Fetch(ctx, "/versions") + vers, err := mf.Fetch(ctx, "/versions") if err != nil { t.Fatal(err) } - defer rc.Close() - vers, err := ioutil.ReadAll(rc) - if err != nil { - t.Fatal("could not read versions:", err) - } if len(vers) < 45 { fmt.Println("unexpected more data") } diff --git a/repo/fsrepo/migrations/fetcher.go b/repo/fsrepo/migrations/fetcher.go index 26a9275ded7..8e30f06fb66 100644 --- a/repo/fsrepo/migrations/fetcher.go +++ b/repo/fsrepo/migrations/fetcher.go @@ -20,8 +20,7 @@ const ( type Fetcher interface { // Fetch attempts to fetch the file at the given ipfs path. - // Returns io.ReadCloser on success, which caller must close. - Fetch(ctx context.Context, filePath string) (io.ReadCloser, error) + Fetch(ctx context.Context, filePath string) ([]byte, error) // Close performs any cleanup after the fetcher is not longer needed. Close() error } @@ -48,13 +47,12 @@ func NewMultiFetcher(f ...Fetcher) Fetcher { } // Fetch attempts to fetch the file at each of its fetchers until one succeeds. -// Returns io.ReadCloser on success, which caller must close. -func (f *MultiFetcher) Fetch(ctx context.Context, ipfsPath string) (io.ReadCloser, error) { +func (f *MultiFetcher) Fetch(ctx context.Context, ipfsPath string) ([]byte, error) { var errs error for _, fetcher := range f.fetchers { - rc, err := fetcher.Fetch(ctx, ipfsPath) + out, err := fetcher.Fetch(ctx, ipfsPath) if err == nil { - return rc, nil + return out, nil } errs = multierror.Append(errs, err) } diff --git a/repo/fsrepo/migrations/httpfetcher.go b/repo/fsrepo/migrations/httpfetcher.go index 6fb20bb45c2..3f74a084ed6 100644 --- a/repo/fsrepo/migrations/httpfetcher.go +++ b/repo/fsrepo/migrations/httpfetcher.go @@ -60,9 +60,8 @@ func NewHttpFetcher(distPath, gateway, userAgent string, fetchLimit int64) *Http } // Fetch attempts to fetch the file at the given path, from the distribution -// site configured for this HttpFetcher. Returns io.ReadCloser on success, -// which caller must close. -func (f *HttpFetcher) Fetch(ctx context.Context, filePath string) (io.ReadCloser, error) { +// site configured for this HttpFetcher. +func (f *HttpFetcher) Fetch(ctx context.Context, filePath string) ([]byte, error) { gwURL := f.gateway + path.Join(f.distPath, filePath) fmt.Printf("Fetching with HTTP: %q\n", gwURL) @@ -89,10 +88,15 @@ func (f *HttpFetcher) Fetch(ctx context.Context, filePath string) (io.ReadCloser return nil, fmt.Errorf("GET %s error: %s: %s", gwURL, resp.Status, string(mes)) } + var rc io.ReadCloser if f.limit != 0 { - return NewLimitReadCloser(resp.Body, f.limit), nil + rc = NewLimitReadCloser(resp.Body, f.limit) + } else { + rc = resp.Body } - return resp.Body, nil + defer rc.Close() + + return ioutil.ReadAll(rc) } func (f *HttpFetcher) Close() error { diff --git a/repo/fsrepo/migrations/ipfsfetcher/ipfsfetcher.go b/repo/fsrepo/migrations/ipfsfetcher/ipfsfetcher.go index 88f07b502ee..11203ed5a05 100644 --- a/repo/fsrepo/migrations/ipfsfetcher/ipfsfetcher.go +++ b/repo/fsrepo/migrations/ipfsfetcher/ipfsfetcher.go @@ -52,6 +52,8 @@ type IpfsFetcher struct { addrInfo peer.AddrInfo } +var _ migrations.Fetcher = (*IpfsFetcher)(nil) + // NewIpfsFetcher creates a new IpfsFetcher // // Specifying "" for distPath sets the default IPNS path. @@ -85,9 +87,8 @@ func NewIpfsFetcher(distPath string, fetchLimit int64, repoRoot *string) *IpfsFe } // Fetch attempts to fetch the file at the given path, from the distribution -// site configured for this HttpFetcher. Returns io.ReadCloser on success, -// which caller must close. -func (f *IpfsFetcher) Fetch(ctx context.Context, filePath string) (io.ReadCloser, error) { +// site configured for this HttpFetcher. +func (f *IpfsFetcher) Fetch(ctx context.Context, filePath string) ([]byte, error) { // Initialize and start IPFS node on first call to Fetch, since the fetcher // may be created by not used. f.openOnce.Do(func() { @@ -123,10 +124,15 @@ func (f *IpfsFetcher) Fetch(ctx context.Context, filePath string) (io.ReadCloser return nil, fmt.Errorf("%q is not a file", filePath) } + var rc io.ReadCloser if f.limit != 0 { - return migrations.NewLimitReadCloser(fileNode, f.limit), nil + rc = migrations.NewLimitReadCloser(fileNode, f.limit) + } else { + rc = fileNode } - return fileNode, nil + defer rc.Close() + + return ioutil.ReadAll(rc) } func (f *IpfsFetcher) Close() error { diff --git a/repo/fsrepo/migrations/ipfsfetcher/ipfsfetcher_test.go b/repo/fsrepo/migrations/ipfsfetcher/ipfsfetcher_test.go index 877de5e788e..e300371a679 100644 --- a/repo/fsrepo/migrations/ipfsfetcher/ipfsfetcher_test.go +++ b/repo/fsrepo/migrations/ipfsfetcher/ipfsfetcher_test.go @@ -2,6 +2,7 @@ package ipfsfetcher import ( "bufio" + "bytes" "context" "fmt" "os" @@ -28,14 +29,13 @@ func TestIpfsFetcher(t *testing.T) { fetcher := NewIpfsFetcher("", 0, nil) defer fetcher.Close() - rc, err := fetcher.Fetch(ctx, "go-ipfs/versions") + out, err := fetcher.Fetch(ctx, "go-ipfs/versions") if err != nil { t.Fatal(err) } - defer rc.Close() var lines []string - scan := bufio.NewScanner(rc) + scan := bufio.NewScanner(bytes.NewReader(out)) for scan.Scan() { lines = append(lines, scan.Text()) } @@ -52,8 +52,7 @@ func TestIpfsFetcher(t *testing.T) { } // Check not found - _, err = fetcher.Fetch(ctx, "/no_such_file") - if err == nil { + if _, err = fetcher.Fetch(ctx, "/no_such_file"); err == nil { t.Fatal("expected error 404") } diff --git a/repo/fsrepo/migrations/migrations.go b/repo/fsrepo/migrations/migrations.go index 861dc7b7caf..5eac91b2932 100644 --- a/repo/fsrepo/migrations/migrations.go +++ b/repo/fsrepo/migrations/migrations.go @@ -155,13 +155,14 @@ func ReadMigrationConfig(repoRoot string) (*config.Migration, error) { // downloadSources, func GetMigrationFetcher(downloadSources []string, distPath string, newIpfsFetcher func(string) Fetcher) (Fetcher, error) { const httpUserAgent = "go-ipfs" + const numTriesPerHTTP = 3 var fetchers []Fetcher for _, src := range downloadSources { src := strings.TrimSpace(src) switch src { case "HTTPS", "https", "HTTP", "http": - fetchers = append(fetchers, NewHttpFetcher(distPath, "", httpUserAgent, 0)) + fetchers = append(fetchers, &RetryFetcher{NewHttpFetcher(distPath, "", httpUserAgent, 0), numTriesPerHTTP}) case "IPFS", "ipfs": if newIpfsFetcher != nil { fetchers = append(fetchers, newIpfsFetcher(distPath)) @@ -178,7 +179,7 @@ func GetMigrationFetcher(downloadSources []string, distPath string, newIpfsFetch default: return nil, errors.New("bad gateway address: url scheme must be http or https") } - fetchers = append(fetchers, NewHttpFetcher(distPath, u.String(), httpUserAgent, 0)) + fetchers = append(fetchers, &RetryFetcher{NewHttpFetcher(distPath, u.String(), httpUserAgent, 0), numTriesPerHTTP}) case "": // Ignore empty string } diff --git a/repo/fsrepo/migrations/migrations_test.go b/repo/fsrepo/migrations/migrations_test.go index b09174a4930..0e52b3a65ed 100644 --- a/repo/fsrepo/migrations/migrations_test.go +++ b/repo/fsrepo/migrations/migrations_test.go @@ -3,7 +3,6 @@ package migrations import ( "context" "fmt" - "io" "log" "os" "path/filepath" @@ -290,7 +289,9 @@ func TestReadMigrationConfig(t *testing.T) { type mockIpfsFetcher struct{} -func (m *mockIpfsFetcher) Fetch(ctx context.Context, filePath string) (io.ReadCloser, error) { +var _ Fetcher = (*mockIpfsFetcher)(nil) + +func (m *mockIpfsFetcher) Fetch(ctx context.Context, filePath string) ([]byte, error) { return nil, nil } @@ -323,7 +324,9 @@ func TestGetMigrationFetcher(t *testing.T) { if err != nil { t.Fatal(err) } - if _, ok := f.(*HttpFetcher); !ok { + if rf, ok := f.(*RetryFetcher); !ok { + t.Fatal("expected RetryFetcher") + } else if _, ok := rf.Fetcher.(*HttpFetcher); !ok { t.Fatal("expected HttpFetcher") } @@ -341,7 +344,9 @@ func TestGetMigrationFetcher(t *testing.T) { if err != nil { t.Fatal(err) } - if _, ok := f.(*HttpFetcher); !ok { + if rf, ok := f.(*RetryFetcher); !ok { + t.Fatal("expected RetryFetcher") + } else if _, ok := rf.Fetcher.(*HttpFetcher); !ok { t.Fatal("expected HttpFetcher") } diff --git a/repo/fsrepo/migrations/retryfetcher.go b/repo/fsrepo/migrations/retryfetcher.go new file mode 100644 index 00000000000..81415bb6756 --- /dev/null +++ b/repo/fsrepo/migrations/retryfetcher.go @@ -0,0 +1,33 @@ +package migrations + +import ( + "context" + "fmt" +) + +type RetryFetcher struct { + Fetcher + MaxTries int +} + +var _ Fetcher = (*RetryFetcher)(nil) + +func (r *RetryFetcher) Fetch(ctx context.Context, filePath string) ([]byte, error) { + var lastErr error + for i := 0; i < r.MaxTries; i++ { + out, err := r.Fetcher.Fetch(ctx, filePath) + if err == nil { + return out, nil + } + + if ctx.Err() != nil { + return nil, ctx.Err() + } + lastErr = err + } + return nil, fmt.Errorf("exceeded number of retries. last error was %w", lastErr) +} + +func (r *RetryFetcher) Close() error { + return r.Fetcher.Close() +} diff --git a/repo/fsrepo/migrations/versions.go b/repo/fsrepo/migrations/versions.go index 69b2e290bb4..af5bbbbd969 100644 --- a/repo/fsrepo/migrations/versions.go +++ b/repo/fsrepo/migrations/versions.go @@ -2,6 +2,7 @@ package migrations import ( "bufio" + "bytes" "context" "errors" "fmt" @@ -39,16 +40,15 @@ func LatestDistVersion(ctx context.Context, fetcher Fetcher, dist string, stable // available on the distriburion site. List is in ascending order, unless // sortDesc is true. func DistVersions(ctx context.Context, fetcher Fetcher, dist string, sortDesc bool) ([]string, error) { - rc, err := fetcher.Fetch(ctx, path.Join(dist, distVersions)) + versionBytes, err := fetcher.Fetch(ctx, path.Join(dist, distVersions)) if err != nil { return nil, err } - defer rc.Close() prefix := "v" var vers []semver.Version - scan := bufio.NewScanner(rc) + scan := bufio.NewScanner(bytes.NewReader(versionBytes)) for scan.Scan() { ver, err := semver.Make(strings.TrimLeft(scan.Text(), prefix)) if err != nil {