From 65725006bd269a583ea8dc532cf34f7755147f9b Mon Sep 17 00:00:00 2001 From: Adin Schmahmann Date: Thu, 10 Feb 2022 13:32:04 -0500 Subject: [PATCH 1/7] feat: refactor Fetcher interface used for downloading migrations --- repo/fsrepo/migrations/fetch.go | 13 ++-------- repo/fsrepo/migrations/fetch_test.go | 16 ++++++------- repo/fsrepo/migrations/fetcher.go | 15 ++++++++---- repo/fsrepo/migrations/httpfetcher.go | 22 +++++++++++------ .../migrations/ipfsfetcher/ipfsfetcher.go | 24 +++++++++++++------ .../ipfsfetcher/ipfsfetcher_test.go | 10 ++++---- repo/fsrepo/migrations/migrations_test.go | 6 +++-- repo/fsrepo/migrations/versions.go | 8 +++---- 8 files changed, 65 insertions(+), 49 deletions(-) diff --git a/repo/fsrepo/migrations/fetch.go b/repo/fsrepo/migrations/fetch.go index a3493e75009..4c39004f086 100644 --- a/repo/fsrepo/migrations/fetch.go +++ b/repo/fsrepo/migrations/fetch.go @@ -5,7 +5,6 @@ import ( "bytes" "context" "fmt" - "io" "io/ioutil" "os" "os/exec" @@ -111,16 +110,8 @@ func FetchBinary(ctx context.Context, fetcher Fetcher, dist, ver, binName, out s } defer arcFile.Close() - // Open connection to download archive from ipfs path - rc, err := fetcher.Fetch(ctx, arcDistPath) - if err != nil { - return "", err - } - defer rc.Close() - - // Write download data - _, err = io.Copy(arcFile, rc) - if err != nil { + // Open connection to download archive from ipfs path and write to file + if err := fetcher.Fetch(ctx, arcDistPath, arcFile); err != nil { return "", err } arcFile.Close() diff --git a/repo/fsrepo/migrations/fetch_test.go b/repo/fsrepo/migrations/fetch_test.go index ec7c6d5e73e..c15dc8ff8e4 100644 --- a/repo/fsrepo/migrations/fetch_test.go +++ b/repo/fsrepo/migrations/fetch_test.go @@ -2,6 +2,7 @@ package migrations import ( "bufio" + "bytes" "context" "fmt" "io" @@ -96,14 +97,14 @@ func TestHttpFetch(t *testing.T) { fetcher := NewHttpFetcher("", ts.URL, "", 0) - rc, err := fetcher.Fetch(ctx, "/versions") + var buf bytes.Buffer + err := fetcher.Fetch(ctx, "/versions", &buf) if err != nil { t.Fatal(err) } - defer rc.Close() var lines []string - scan := bufio.NewScanner(rc) + scan := bufio.NewScanner(&buf) for scan.Scan() { lines = append(lines, scan.Text()) } @@ -120,7 +121,7 @@ func TestHttpFetch(t *testing.T) { } // Check not found - _, err = fetcher.Fetch(ctx, "/no_such_file") + err = fetcher.Fetch(ctx, "/no_such_file", &bytes.Buffer{}) if err == nil || !strings.Contains(err.Error(), "404") { t.Fatal("expected error 404") } @@ -232,13 +233,12 @@ func TestMultiFetcher(t *testing.T) { mf := NewMultiFetcher(badFetcher, fetcher) - rc, err := mf.Fetch(ctx, "/versions") - if err != nil { + var buf bytes.Buffer + if err := mf.Fetch(ctx, "/versions", &buf); err != nil { t.Fatal(err) } - defer rc.Close() - vers, err := ioutil.ReadAll(rc) + vers, err := ioutil.ReadAll(&buf) if err != nil { t.Fatal("could not read versions:", err) } diff --git a/repo/fsrepo/migrations/fetcher.go b/repo/fsrepo/migrations/fetcher.go index 26a9275ded7..7a3b98f0d68 100644 --- a/repo/fsrepo/migrations/fetcher.go +++ b/repo/fsrepo/migrations/fetcher.go @@ -1,6 +1,7 @@ package migrations import ( + "bytes" "context" "io" "os" @@ -21,7 +22,7 @@ const ( type Fetcher interface { // Fetch attempts to fetch the file at the given ipfs path. // Returns io.ReadCloser on success, which caller must close. - Fetch(ctx context.Context, filePath string) (io.ReadCloser, error) + Fetch(ctx context.Context, filePath string, writer io.Writer) error // Close performs any cleanup after the fetcher is not longer needed. Close() error } @@ -49,16 +50,20 @@ func NewMultiFetcher(f ...Fetcher) Fetcher { // Fetch attempts to fetch the file at each of its fetchers until one succeeds. // Returns io.ReadCloser on success, which caller must close. -func (f *MultiFetcher) Fetch(ctx context.Context, ipfsPath string) (io.ReadCloser, error) { +func (f *MultiFetcher) Fetch(ctx context.Context, ipfsPath string, writer io.Writer) error { var errs error for _, fetcher := range f.fetchers { - rc, err := fetcher.Fetch(ctx, ipfsPath) + var buf bytes.Buffer + err := fetcher.Fetch(ctx, ipfsPath, &buf) if err == nil { - return rc, nil + if _, err := io.Copy(writer, &buf); err != nil { + return err + } + return nil } errs = multierror.Append(errs, err) } - return nil, errs + return errs } func (f *MultiFetcher) Close() error { diff --git a/repo/fsrepo/migrations/httpfetcher.go b/repo/fsrepo/migrations/httpfetcher.go index 6fb20bb45c2..bc9e4400a8a 100644 --- a/repo/fsrepo/migrations/httpfetcher.go +++ b/repo/fsrepo/migrations/httpfetcher.go @@ -62,13 +62,13 @@ func NewHttpFetcher(distPath, gateway, userAgent string, fetchLimit int64) *Http // Fetch attempts to fetch the file at the given path, from the distribution // site configured for this HttpFetcher. Returns io.ReadCloser on success, // which caller must close. -func (f *HttpFetcher) Fetch(ctx context.Context, filePath string) (io.ReadCloser, error) { +func (f *HttpFetcher) Fetch(ctx context.Context, filePath string, writer io.Writer) error { gwURL := f.gateway + path.Join(f.distPath, filePath) fmt.Printf("Fetching with HTTP: %q\n", gwURL) req, err := http.NewRequestWithContext(ctx, http.MethodGet, gwURL, nil) if err != nil { - return nil, fmt.Errorf("http.NewRequest error: %s", err) + return fmt.Errorf("http.NewRequest error: %s", err) } if f.userAgent != "" { @@ -77,22 +77,30 @@ func (f *HttpFetcher) Fetch(ctx context.Context, filePath string) (io.ReadCloser resp, err := http.DefaultClient.Do(req) if err != nil { - return nil, fmt.Errorf("http.DefaultClient.Do error: %s", err) + return fmt.Errorf("http.DefaultClient.Do error: %s", err) } if resp.StatusCode >= 400 { defer resp.Body.Close() mes, err := ioutil.ReadAll(resp.Body) if err != nil { - return nil, fmt.Errorf("error reading error body: %s", err) + return fmt.Errorf("error reading error body: %s", err) } - return nil, fmt.Errorf("GET %s error: %s: %s", gwURL, resp.Status, string(mes)) + return fmt.Errorf("GET %s error: %s: %s", gwURL, resp.Status, string(mes)) } + var rc io.ReadCloser if f.limit != 0 { - return NewLimitReadCloser(resp.Body, f.limit), nil + rc = NewLimitReadCloser(resp.Body, f.limit) + } else { + rc = resp.Body } - return resp.Body, nil + defer rc.Close() + + if _, err := io.Copy(writer, rc); err != nil { + return err + } + return nil } func (f *HttpFetcher) Close() error { diff --git a/repo/fsrepo/migrations/ipfsfetcher/ipfsfetcher.go b/repo/fsrepo/migrations/ipfsfetcher/ipfsfetcher.go index 88f07b502ee..938f220e524 100644 --- a/repo/fsrepo/migrations/ipfsfetcher/ipfsfetcher.go +++ b/repo/fsrepo/migrations/ipfsfetcher/ipfsfetcher.go @@ -52,6 +52,8 @@ type IpfsFetcher struct { addrInfo peer.AddrInfo } +var _ migrations.Fetcher = (*IpfsFetcher)(nil) + // NewIpfsFetcher creates a new IpfsFetcher // // Specifying "" for distPath sets the default IPNS path. @@ -87,7 +89,7 @@ func NewIpfsFetcher(distPath string, fetchLimit int64, repoRoot *string) *IpfsFe // Fetch attempts to fetch the file at the given path, from the distribution // site configured for this HttpFetcher. Returns io.ReadCloser on success, // which caller must close. -func (f *IpfsFetcher) Fetch(ctx context.Context, filePath string) (io.ReadCloser, error) { +func (f *IpfsFetcher) Fetch(ctx context.Context, filePath string, writer io.Writer) error { // Initialize and start IPFS node on first call to Fetch, since the fetcher // may be created by not used. f.openOnce.Do(func() { @@ -103,30 +105,38 @@ func (f *IpfsFetcher) Fetch(ctx context.Context, filePath string) (io.ReadCloser fmt.Printf("Fetching with IPFS: %q\n", filePath) if f.openErr != nil { - return nil, f.openErr + return f.openErr } iPath, err := parsePath(path.Join(f.distPath, filePath)) if err != nil { - return nil, err + return err } nd, err := f.ipfs.Unixfs().Get(ctx, iPath) if err != nil { - return nil, err + return err } f.recordFetched(iPath) fileNode, ok := nd.(files.File) if !ok { - return nil, fmt.Errorf("%q is not a file", filePath) + return fmt.Errorf("%q is not a file", filePath) } + var rc io.ReadCloser if f.limit != 0 { - return migrations.NewLimitReadCloser(fileNode, f.limit), nil + rc = migrations.NewLimitReadCloser(fileNode, f.limit) + } else { + rc = fileNode } - return fileNode, nil + defer rc.Close() + + if _, err := io.Copy(writer, rc); err != nil { + return err + } + return nil } func (f *IpfsFetcher) Close() error { diff --git a/repo/fsrepo/migrations/ipfsfetcher/ipfsfetcher_test.go b/repo/fsrepo/migrations/ipfsfetcher/ipfsfetcher_test.go index 877de5e788e..b89e67f64c4 100644 --- a/repo/fsrepo/migrations/ipfsfetcher/ipfsfetcher_test.go +++ b/repo/fsrepo/migrations/ipfsfetcher/ipfsfetcher_test.go @@ -2,6 +2,7 @@ package ipfsfetcher import ( "bufio" + "bytes" "context" "fmt" "os" @@ -28,14 +29,14 @@ func TestIpfsFetcher(t *testing.T) { fetcher := NewIpfsFetcher("", 0, nil) defer fetcher.Close() - rc, err := fetcher.Fetch(ctx, "go-ipfs/versions") + var buf bytes.Buffer + err := fetcher.Fetch(ctx, "go-ipfs/versions", &buf) if err != nil { t.Fatal(err) } - defer rc.Close() var lines []string - scan := bufio.NewScanner(rc) + scan := bufio.NewScanner(&buf) for scan.Scan() { lines = append(lines, scan.Text()) } @@ -52,8 +53,7 @@ func TestIpfsFetcher(t *testing.T) { } // Check not found - _, err = fetcher.Fetch(ctx, "/no_such_file") - if err == nil { + if err = fetcher.Fetch(ctx, "/no_such_file", &bytes.Buffer{}); err == nil { t.Fatal("expected error 404") } diff --git a/repo/fsrepo/migrations/migrations_test.go b/repo/fsrepo/migrations/migrations_test.go index b09174a4930..5497a81ec73 100644 --- a/repo/fsrepo/migrations/migrations_test.go +++ b/repo/fsrepo/migrations/migrations_test.go @@ -290,8 +290,10 @@ func TestReadMigrationConfig(t *testing.T) { type mockIpfsFetcher struct{} -func (m *mockIpfsFetcher) Fetch(ctx context.Context, filePath string) (io.ReadCloser, error) { - return nil, nil +var _ Fetcher = (*mockIpfsFetcher)(nil) + +func (m *mockIpfsFetcher) Fetch(ctx context.Context, filePath string, writer io.Writer) error { + return nil } func (m *mockIpfsFetcher) Close() error { diff --git a/repo/fsrepo/migrations/versions.go b/repo/fsrepo/migrations/versions.go index 69b2e290bb4..1c510a8fd7e 100644 --- a/repo/fsrepo/migrations/versions.go +++ b/repo/fsrepo/migrations/versions.go @@ -2,6 +2,7 @@ package migrations import ( "bufio" + "bytes" "context" "errors" "fmt" @@ -39,16 +40,15 @@ func LatestDistVersion(ctx context.Context, fetcher Fetcher, dist string, stable // available on the distriburion site. List is in ascending order, unless // sortDesc is true. func DistVersions(ctx context.Context, fetcher Fetcher, dist string, sortDesc bool) ([]string, error) { - rc, err := fetcher.Fetch(ctx, path.Join(dist, distVersions)) - if err != nil { + var buf bytes.Buffer + if err := fetcher.Fetch(ctx, path.Join(dist, distVersions), &buf); err != nil { return nil, err } - defer rc.Close() prefix := "v" var vers []semver.Version - scan := bufio.NewScanner(rc) + scan := bufio.NewScanner(&buf) for scan.Scan() { ver, err := semver.Make(strings.TrimLeft(scan.Text(), prefix)) if err != nil { From ddd7a4da46c96f3db5a1b0f154f50e0d0fc49e39 Mon Sep 17 00:00:00 2001 From: Adin Schmahmann Date: Thu, 10 Feb 2022 15:52:27 -0500 Subject: [PATCH 2/7] feat: add retry fetcher for HTTP migration downloads --- repo/fsrepo/migrations/migrations.go | 5 +-- repo/fsrepo/migrations/migrations_test.go | 8 +++-- repo/fsrepo/migrations/retryfetcher.go | 43 +++++++++++++++++++++++ 3 files changed, 52 insertions(+), 4 deletions(-) create mode 100644 repo/fsrepo/migrations/retryfetcher.go diff --git a/repo/fsrepo/migrations/migrations.go b/repo/fsrepo/migrations/migrations.go index 861dc7b7caf..2d495fc4138 100644 --- a/repo/fsrepo/migrations/migrations.go +++ b/repo/fsrepo/migrations/migrations.go @@ -155,13 +155,14 @@ func ReadMigrationConfig(repoRoot string) (*config.Migration, error) { // downloadSources, func GetMigrationFetcher(downloadSources []string, distPath string, newIpfsFetcher func(string) Fetcher) (Fetcher, error) { const httpUserAgent = "go-ipfs" + const numRetriesPerHTTP = 3 var fetchers []Fetcher for _, src := range downloadSources { src := strings.TrimSpace(src) switch src { case "HTTPS", "https", "HTTP", "http": - fetchers = append(fetchers, NewHttpFetcher(distPath, "", httpUserAgent, 0)) + fetchers = append(fetchers, NewRetryFetcher(NewHttpFetcher(distPath, "", httpUserAgent, 0), numRetriesPerHTTP)) case "IPFS", "ipfs": if newIpfsFetcher != nil { fetchers = append(fetchers, newIpfsFetcher(distPath)) @@ -178,7 +179,7 @@ func GetMigrationFetcher(downloadSources []string, distPath string, newIpfsFetch default: return nil, errors.New("bad gateway address: url scheme must be http or https") } - fetchers = append(fetchers, NewHttpFetcher(distPath, u.String(), httpUserAgent, 0)) + fetchers = append(fetchers, NewRetryFetcher(NewHttpFetcher(distPath, u.String(), httpUserAgent, 0), numRetriesPerHTTP)) case "": // Ignore empty string } diff --git a/repo/fsrepo/migrations/migrations_test.go b/repo/fsrepo/migrations/migrations_test.go index 5497a81ec73..526f76062c7 100644 --- a/repo/fsrepo/migrations/migrations_test.go +++ b/repo/fsrepo/migrations/migrations_test.go @@ -325,7 +325,9 @@ func TestGetMigrationFetcher(t *testing.T) { if err != nil { t.Fatal(err) } - if _, ok := f.(*HttpFetcher); !ok { + if rf, ok := f.(*RetryFetcher); !ok { + t.Fatal("expected RetryFetcher") + } else if _, ok := rf.Fetcher.(*HttpFetcher); !ok { t.Fatal("expected HttpFetcher") } @@ -343,7 +345,9 @@ func TestGetMigrationFetcher(t *testing.T) { if err != nil { t.Fatal(err) } - if _, ok := f.(*HttpFetcher); !ok { + if rf, ok := f.(*RetryFetcher); !ok { + t.Fatal("expected RetryFetcher") + } else if _, ok := rf.Fetcher.(*HttpFetcher); !ok { t.Fatal("expected HttpFetcher") } diff --git a/repo/fsrepo/migrations/retryfetcher.go b/repo/fsrepo/migrations/retryfetcher.go new file mode 100644 index 00000000000..f60ae97ad6a --- /dev/null +++ b/repo/fsrepo/migrations/retryfetcher.go @@ -0,0 +1,43 @@ +package migrations + +import ( + "bytes" + "context" + "fmt" + "io" +) + +type RetryFetcher struct { + Fetcher + maxRetries int +} + +var _ Fetcher = (*RetryFetcher)(nil) + +func NewRetryFetcher(baseFetcher Fetcher, maxRetries int) *RetryFetcher { + return &RetryFetcher{Fetcher: baseFetcher, maxRetries: maxRetries} +} + +func (r *RetryFetcher) Fetch(ctx context.Context, filePath string, writer io.Writer) error { + var lastErr error + for i := 0; i < r.maxRetries; i++ { + var buf bytes.Buffer + err := r.Fetcher.Fetch(ctx, filePath, &buf) + if err == nil { + if _, err := io.Copy(writer, &buf); err != nil { + return err + } + return nil + } + + if ctx.Err() != nil { + return ctx.Err() + } + lastErr = err + } + return fmt.Errorf("exceeded number of retries. last error was %w", lastErr) +} + +func (r *RetryFetcher) Close() error { + return r.Fetcher.Close() +} From 98a6fca0adb1ec79273f869ae024d83e886cc0f4 Mon Sep 17 00:00:00 2001 From: Adin Schmahmann Date: Thu, 10 Feb 2022 16:18:40 -0500 Subject: [PATCH 3/7] feat: switch migration fetcher interface to return bytes rather than take a writer --- repo/fsrepo/migrations/fetch.go | 10 +++++++++- repo/fsrepo/migrations/fetch_test.go | 16 +++++----------- repo/fsrepo/migrations/fetcher.go | 15 +++++---------- repo/fsrepo/migrations/httpfetcher.go | 15 ++++++--------- .../fsrepo/migrations/ipfsfetcher/ipfsfetcher.go | 15 ++++++--------- .../migrations/ipfsfetcher/ipfsfetcher_test.go | 7 +++---- repo/fsrepo/migrations/migrations_test.go | 5 ++--- repo/fsrepo/migrations/retryfetcher.go | 16 +++++----------- repo/fsrepo/migrations/versions.go | 6 +++--- 9 files changed, 44 insertions(+), 61 deletions(-) diff --git a/repo/fsrepo/migrations/fetch.go b/repo/fsrepo/migrations/fetch.go index 4c39004f086..ff202088a00 100644 --- a/repo/fsrepo/migrations/fetch.go +++ b/repo/fsrepo/migrations/fetch.go @@ -5,6 +5,7 @@ import ( "bytes" "context" "fmt" + "io" "io/ioutil" "os" "os/exec" @@ -111,7 +112,14 @@ func FetchBinary(ctx context.Context, fetcher Fetcher, dist, ver, binName, out s defer arcFile.Close() // Open connection to download archive from ipfs path and write to file - if err := fetcher.Fetch(ctx, arcDistPath, arcFile); err != nil { + arcBytes, err := fetcher.Fetch(ctx, arcDistPath) + if err != nil { + return "", err + } + + // Write download data + _, err = io.Copy(arcFile, bytes.NewReader(arcBytes)) + if err != nil { return "", err } arcFile.Close() diff --git a/repo/fsrepo/migrations/fetch_test.go b/repo/fsrepo/migrations/fetch_test.go index c15dc8ff8e4..2273cb5e9e0 100644 --- a/repo/fsrepo/migrations/fetch_test.go +++ b/repo/fsrepo/migrations/fetch_test.go @@ -6,7 +6,6 @@ import ( "context" "fmt" "io" - "io/ioutil" "net/http" "net/http/httptest" "os" @@ -97,14 +96,13 @@ func TestHttpFetch(t *testing.T) { fetcher := NewHttpFetcher("", ts.URL, "", 0) - var buf bytes.Buffer - err := fetcher.Fetch(ctx, "/versions", &buf) + out, err := fetcher.Fetch(ctx, "/versions") if err != nil { t.Fatal(err) } var lines []string - scan := bufio.NewScanner(&buf) + scan := bufio.NewScanner(bytes.NewReader(out)) for scan.Scan() { lines = append(lines, scan.Text()) } @@ -121,7 +119,7 @@ func TestHttpFetch(t *testing.T) { } // Check not found - err = fetcher.Fetch(ctx, "/no_such_file", &bytes.Buffer{}) + _, err = fetcher.Fetch(ctx, "/no_such_file") if err == nil || !strings.Contains(err.Error(), "404") { t.Fatal("expected error 404") } @@ -233,15 +231,11 @@ func TestMultiFetcher(t *testing.T) { mf := NewMultiFetcher(badFetcher, fetcher) - var buf bytes.Buffer - if err := mf.Fetch(ctx, "/versions", &buf); err != nil { + vers, err := mf.Fetch(ctx, "/versions") + if err != nil { t.Fatal(err) } - vers, err := ioutil.ReadAll(&buf) - if err != nil { - t.Fatal("could not read versions:", err) - } if len(vers) < 45 { fmt.Println("unexpected more data") } diff --git a/repo/fsrepo/migrations/fetcher.go b/repo/fsrepo/migrations/fetcher.go index 7a3b98f0d68..067fd371848 100644 --- a/repo/fsrepo/migrations/fetcher.go +++ b/repo/fsrepo/migrations/fetcher.go @@ -1,7 +1,6 @@ package migrations import ( - "bytes" "context" "io" "os" @@ -22,7 +21,7 @@ const ( type Fetcher interface { // Fetch attempts to fetch the file at the given ipfs path. // Returns io.ReadCloser on success, which caller must close. - Fetch(ctx context.Context, filePath string, writer io.Writer) error + Fetch(ctx context.Context, filePath string) ([]byte, error) // Close performs any cleanup after the fetcher is not longer needed. Close() error } @@ -50,20 +49,16 @@ func NewMultiFetcher(f ...Fetcher) Fetcher { // Fetch attempts to fetch the file at each of its fetchers until one succeeds. // Returns io.ReadCloser on success, which caller must close. -func (f *MultiFetcher) Fetch(ctx context.Context, ipfsPath string, writer io.Writer) error { +func (f *MultiFetcher) Fetch(ctx context.Context, ipfsPath string) ([]byte, error) { var errs error for _, fetcher := range f.fetchers { - var buf bytes.Buffer - err := fetcher.Fetch(ctx, ipfsPath, &buf) + out, err := fetcher.Fetch(ctx, ipfsPath) if err == nil { - if _, err := io.Copy(writer, &buf); err != nil { - return err - } - return nil + return out, nil } errs = multierror.Append(errs, err) } - return errs + return nil, errs } func (f *MultiFetcher) Close() error { diff --git a/repo/fsrepo/migrations/httpfetcher.go b/repo/fsrepo/migrations/httpfetcher.go index bc9e4400a8a..705ada11919 100644 --- a/repo/fsrepo/migrations/httpfetcher.go +++ b/repo/fsrepo/migrations/httpfetcher.go @@ -62,13 +62,13 @@ func NewHttpFetcher(distPath, gateway, userAgent string, fetchLimit int64) *Http // Fetch attempts to fetch the file at the given path, from the distribution // site configured for this HttpFetcher. Returns io.ReadCloser on success, // which caller must close. -func (f *HttpFetcher) Fetch(ctx context.Context, filePath string, writer io.Writer) error { +func (f *HttpFetcher) Fetch(ctx context.Context, filePath string) ([]byte, error) { gwURL := f.gateway + path.Join(f.distPath, filePath) fmt.Printf("Fetching with HTTP: %q\n", gwURL) req, err := http.NewRequestWithContext(ctx, http.MethodGet, gwURL, nil) if err != nil { - return fmt.Errorf("http.NewRequest error: %s", err) + return nil, fmt.Errorf("http.NewRequest error: %s", err) } if f.userAgent != "" { @@ -77,16 +77,16 @@ func (f *HttpFetcher) Fetch(ctx context.Context, filePath string, writer io.Writ resp, err := http.DefaultClient.Do(req) if err != nil { - return fmt.Errorf("http.DefaultClient.Do error: %s", err) + return nil, fmt.Errorf("http.DefaultClient.Do error: %s", err) } if resp.StatusCode >= 400 { defer resp.Body.Close() mes, err := ioutil.ReadAll(resp.Body) if err != nil { - return fmt.Errorf("error reading error body: %s", err) + return nil, fmt.Errorf("error reading error body: %s", err) } - return fmt.Errorf("GET %s error: %s: %s", gwURL, resp.Status, string(mes)) + return nil, fmt.Errorf("GET %s error: %s: %s", gwURL, resp.Status, string(mes)) } var rc io.ReadCloser @@ -97,10 +97,7 @@ func (f *HttpFetcher) Fetch(ctx context.Context, filePath string, writer io.Writ } defer rc.Close() - if _, err := io.Copy(writer, rc); err != nil { - return err - } - return nil + return ioutil.ReadAll(rc) } func (f *HttpFetcher) Close() error { diff --git a/repo/fsrepo/migrations/ipfsfetcher/ipfsfetcher.go b/repo/fsrepo/migrations/ipfsfetcher/ipfsfetcher.go index 938f220e524..14b2d485e5c 100644 --- a/repo/fsrepo/migrations/ipfsfetcher/ipfsfetcher.go +++ b/repo/fsrepo/migrations/ipfsfetcher/ipfsfetcher.go @@ -89,7 +89,7 @@ func NewIpfsFetcher(distPath string, fetchLimit int64, repoRoot *string) *IpfsFe // Fetch attempts to fetch the file at the given path, from the distribution // site configured for this HttpFetcher. Returns io.ReadCloser on success, // which caller must close. -func (f *IpfsFetcher) Fetch(ctx context.Context, filePath string, writer io.Writer) error { +func (f *IpfsFetcher) Fetch(ctx context.Context, filePath string) ([]byte, error) { // Initialize and start IPFS node on first call to Fetch, since the fetcher // may be created by not used. f.openOnce.Do(func() { @@ -105,24 +105,24 @@ func (f *IpfsFetcher) Fetch(ctx context.Context, filePath string, writer io.Writ fmt.Printf("Fetching with IPFS: %q\n", filePath) if f.openErr != nil { - return f.openErr + return nil, f.openErr } iPath, err := parsePath(path.Join(f.distPath, filePath)) if err != nil { - return err + return nil, err } nd, err := f.ipfs.Unixfs().Get(ctx, iPath) if err != nil { - return err + return nil, err } f.recordFetched(iPath) fileNode, ok := nd.(files.File) if !ok { - return fmt.Errorf("%q is not a file", filePath) + return nil, fmt.Errorf("%q is not a file", filePath) } var rc io.ReadCloser @@ -133,10 +133,7 @@ func (f *IpfsFetcher) Fetch(ctx context.Context, filePath string, writer io.Writ } defer rc.Close() - if _, err := io.Copy(writer, rc); err != nil { - return err - } - return nil + return ioutil.ReadAll(rc) } func (f *IpfsFetcher) Close() error { diff --git a/repo/fsrepo/migrations/ipfsfetcher/ipfsfetcher_test.go b/repo/fsrepo/migrations/ipfsfetcher/ipfsfetcher_test.go index b89e67f64c4..e300371a679 100644 --- a/repo/fsrepo/migrations/ipfsfetcher/ipfsfetcher_test.go +++ b/repo/fsrepo/migrations/ipfsfetcher/ipfsfetcher_test.go @@ -29,14 +29,13 @@ func TestIpfsFetcher(t *testing.T) { fetcher := NewIpfsFetcher("", 0, nil) defer fetcher.Close() - var buf bytes.Buffer - err := fetcher.Fetch(ctx, "go-ipfs/versions", &buf) + out, err := fetcher.Fetch(ctx, "go-ipfs/versions") if err != nil { t.Fatal(err) } var lines []string - scan := bufio.NewScanner(&buf) + scan := bufio.NewScanner(bytes.NewReader(out)) for scan.Scan() { lines = append(lines, scan.Text()) } @@ -53,7 +52,7 @@ func TestIpfsFetcher(t *testing.T) { } // Check not found - if err = fetcher.Fetch(ctx, "/no_such_file", &bytes.Buffer{}); err == nil { + if _, err = fetcher.Fetch(ctx, "/no_such_file"); err == nil { t.Fatal("expected error 404") } diff --git a/repo/fsrepo/migrations/migrations_test.go b/repo/fsrepo/migrations/migrations_test.go index 526f76062c7..0e52b3a65ed 100644 --- a/repo/fsrepo/migrations/migrations_test.go +++ b/repo/fsrepo/migrations/migrations_test.go @@ -3,7 +3,6 @@ package migrations import ( "context" "fmt" - "io" "log" "os" "path/filepath" @@ -292,8 +291,8 @@ type mockIpfsFetcher struct{} var _ Fetcher = (*mockIpfsFetcher)(nil) -func (m *mockIpfsFetcher) Fetch(ctx context.Context, filePath string, writer io.Writer) error { - return nil +func (m *mockIpfsFetcher) Fetch(ctx context.Context, filePath string) ([]byte, error) { + return nil, nil } func (m *mockIpfsFetcher) Close() error { diff --git a/repo/fsrepo/migrations/retryfetcher.go b/repo/fsrepo/migrations/retryfetcher.go index f60ae97ad6a..504a2e0fa1d 100644 --- a/repo/fsrepo/migrations/retryfetcher.go +++ b/repo/fsrepo/migrations/retryfetcher.go @@ -1,10 +1,8 @@ package migrations import ( - "bytes" "context" "fmt" - "io" ) type RetryFetcher struct { @@ -18,24 +16,20 @@ func NewRetryFetcher(baseFetcher Fetcher, maxRetries int) *RetryFetcher { return &RetryFetcher{Fetcher: baseFetcher, maxRetries: maxRetries} } -func (r *RetryFetcher) Fetch(ctx context.Context, filePath string, writer io.Writer) error { +func (r *RetryFetcher) Fetch(ctx context.Context, filePath string) ([]byte, error) { var lastErr error for i := 0; i < r.maxRetries; i++ { - var buf bytes.Buffer - err := r.Fetcher.Fetch(ctx, filePath, &buf) + out, err := r.Fetcher.Fetch(ctx, filePath) if err == nil { - if _, err := io.Copy(writer, &buf); err != nil { - return err - } - return nil + return out, nil } if ctx.Err() != nil { - return ctx.Err() + return nil, ctx.Err() } lastErr = err } - return fmt.Errorf("exceeded number of retries. last error was %w", lastErr) + return nil, fmt.Errorf("exceeded number of retries. last error was %w", lastErr) } func (r *RetryFetcher) Close() error { diff --git a/repo/fsrepo/migrations/versions.go b/repo/fsrepo/migrations/versions.go index 1c510a8fd7e..af5bbbbd969 100644 --- a/repo/fsrepo/migrations/versions.go +++ b/repo/fsrepo/migrations/versions.go @@ -40,15 +40,15 @@ func LatestDistVersion(ctx context.Context, fetcher Fetcher, dist string, stable // available on the distriburion site. List is in ascending order, unless // sortDesc is true. func DistVersions(ctx context.Context, fetcher Fetcher, dist string, sortDesc bool) ([]string, error) { - var buf bytes.Buffer - if err := fetcher.Fetch(ctx, path.Join(dist, distVersions), &buf); err != nil { + versionBytes, err := fetcher.Fetch(ctx, path.Join(dist, distVersions)) + if err != nil { return nil, err } prefix := "v" var vers []semver.Version - scan := bufio.NewScanner(&buf) + scan := bufio.NewScanner(bytes.NewReader(versionBytes)) for scan.Scan() { ver, err := semver.Make(strings.TrimLeft(scan.Text(), prefix)) if err != nil { From b7ded9b72e9c939f4ec2c57795fd6837ec26f25d Mon Sep 17 00:00:00 2001 From: Adin Schmahmann Date: Thu, 10 Feb 2022 16:22:51 -0500 Subject: [PATCH 4/7] fix: rename maxRetries to maxTries --- repo/fsrepo/migrations/retryfetcher.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/repo/fsrepo/migrations/retryfetcher.go b/repo/fsrepo/migrations/retryfetcher.go index 504a2e0fa1d..d1b216ea36c 100644 --- a/repo/fsrepo/migrations/retryfetcher.go +++ b/repo/fsrepo/migrations/retryfetcher.go @@ -7,18 +7,18 @@ import ( type RetryFetcher struct { Fetcher - maxRetries int + maxTries int } var _ Fetcher = (*RetryFetcher)(nil) -func NewRetryFetcher(baseFetcher Fetcher, maxRetries int) *RetryFetcher { - return &RetryFetcher{Fetcher: baseFetcher, maxRetries: maxRetries} +func NewRetryFetcher(baseFetcher Fetcher, maxTries int) *RetryFetcher { + return &RetryFetcher{Fetcher: baseFetcher, maxTries: maxTries} } func (r *RetryFetcher) Fetch(ctx context.Context, filePath string) ([]byte, error) { var lastErr error - for i := 0; i < r.maxRetries; i++ { + for i := 0; i < r.maxTries; i++ { out, err := r.Fetcher.Fetch(ctx, filePath) if err == nil { return out, nil From 8647e9b465584e9c4eac1f302d5700a278def54e Mon Sep 17 00:00:00 2001 From: Adin Schmahmann Date: Fri, 11 Feb 2022 11:49:23 -0500 Subject: [PATCH 5/7] rename numRetriesPerHTTP to numTriesPerHTTP --- repo/fsrepo/migrations/migrations.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/repo/fsrepo/migrations/migrations.go b/repo/fsrepo/migrations/migrations.go index 2d495fc4138..e6271219e9a 100644 --- a/repo/fsrepo/migrations/migrations.go +++ b/repo/fsrepo/migrations/migrations.go @@ -155,14 +155,14 @@ func ReadMigrationConfig(repoRoot string) (*config.Migration, error) { // downloadSources, func GetMigrationFetcher(downloadSources []string, distPath string, newIpfsFetcher func(string) Fetcher) (Fetcher, error) { const httpUserAgent = "go-ipfs" - const numRetriesPerHTTP = 3 + const numTriesPerHTTP = 3 var fetchers []Fetcher for _, src := range downloadSources { src := strings.TrimSpace(src) switch src { case "HTTPS", "https", "HTTP", "http": - fetchers = append(fetchers, NewRetryFetcher(NewHttpFetcher(distPath, "", httpUserAgent, 0), numRetriesPerHTTP)) + fetchers = append(fetchers, NewRetryFetcher(NewHttpFetcher(distPath, "", httpUserAgent, 0), numTriesPerHTTP)) case "IPFS", "ipfs": if newIpfsFetcher != nil { fetchers = append(fetchers, newIpfsFetcher(distPath)) @@ -179,7 +179,7 @@ func GetMigrationFetcher(downloadSources []string, distPath string, newIpfsFetch default: return nil, errors.New("bad gateway address: url scheme must be http or https") } - fetchers = append(fetchers, NewRetryFetcher(NewHttpFetcher(distPath, u.String(), httpUserAgent, 0), numRetriesPerHTTP)) + fetchers = append(fetchers, NewRetryFetcher(NewHttpFetcher(distPath, u.String(), httpUserAgent, 0), numTriesPerHTTP)) case "": // Ignore empty string } From fa4aaee3c0c192ae6ecca224deece701700db042 Mon Sep 17 00:00:00 2001 From: Adin Schmahmann Date: Fri, 11 Feb 2022 11:52:33 -0500 Subject: [PATCH 6/7] switch RetryFetcher to just be a struct with exported fields --- repo/fsrepo/migrations/migrations.go | 4 ++-- repo/fsrepo/migrations/retryfetcher.go | 8 ++------ 2 files changed, 4 insertions(+), 8 deletions(-) diff --git a/repo/fsrepo/migrations/migrations.go b/repo/fsrepo/migrations/migrations.go index e6271219e9a..5eac91b2932 100644 --- a/repo/fsrepo/migrations/migrations.go +++ b/repo/fsrepo/migrations/migrations.go @@ -162,7 +162,7 @@ func GetMigrationFetcher(downloadSources []string, distPath string, newIpfsFetch src := strings.TrimSpace(src) switch src { case "HTTPS", "https", "HTTP", "http": - fetchers = append(fetchers, NewRetryFetcher(NewHttpFetcher(distPath, "", httpUserAgent, 0), numTriesPerHTTP)) + fetchers = append(fetchers, &RetryFetcher{NewHttpFetcher(distPath, "", httpUserAgent, 0), numTriesPerHTTP}) case "IPFS", "ipfs": if newIpfsFetcher != nil { fetchers = append(fetchers, newIpfsFetcher(distPath)) @@ -179,7 +179,7 @@ func GetMigrationFetcher(downloadSources []string, distPath string, newIpfsFetch default: return nil, errors.New("bad gateway address: url scheme must be http or https") } - fetchers = append(fetchers, NewRetryFetcher(NewHttpFetcher(distPath, u.String(), httpUserAgent, 0), numTriesPerHTTP)) + fetchers = append(fetchers, &RetryFetcher{NewHttpFetcher(distPath, u.String(), httpUserAgent, 0), numTriesPerHTTP}) case "": // Ignore empty string } diff --git a/repo/fsrepo/migrations/retryfetcher.go b/repo/fsrepo/migrations/retryfetcher.go index d1b216ea36c..81415bb6756 100644 --- a/repo/fsrepo/migrations/retryfetcher.go +++ b/repo/fsrepo/migrations/retryfetcher.go @@ -7,18 +7,14 @@ import ( type RetryFetcher struct { Fetcher - maxTries int + MaxTries int } var _ Fetcher = (*RetryFetcher)(nil) -func NewRetryFetcher(baseFetcher Fetcher, maxTries int) *RetryFetcher { - return &RetryFetcher{Fetcher: baseFetcher, maxTries: maxTries} -} - func (r *RetryFetcher) Fetch(ctx context.Context, filePath string) ([]byte, error) { var lastErr error - for i := 0; i < r.maxTries; i++ { + for i := 0; i < r.MaxTries; i++ { out, err := r.Fetcher.Fetch(ctx, filePath) if err == nil { return out, nil From f7f22f19f7643ea4dec408512f124975c4ae7436 Mon Sep 17 00:00:00 2001 From: Adin Schmahmann Date: Fri, 11 Feb 2022 11:54:14 -0500 Subject: [PATCH 7/7] docs: fix migration fetcher comments --- repo/fsrepo/migrations/fetcher.go | 2 -- repo/fsrepo/migrations/httpfetcher.go | 3 +-- repo/fsrepo/migrations/ipfsfetcher/ipfsfetcher.go | 3 +-- 3 files changed, 2 insertions(+), 6 deletions(-) diff --git a/repo/fsrepo/migrations/fetcher.go b/repo/fsrepo/migrations/fetcher.go index 067fd371848..8e30f06fb66 100644 --- a/repo/fsrepo/migrations/fetcher.go +++ b/repo/fsrepo/migrations/fetcher.go @@ -20,7 +20,6 @@ const ( type Fetcher interface { // Fetch attempts to fetch the file at the given ipfs path. - // Returns io.ReadCloser on success, which caller must close. Fetch(ctx context.Context, filePath string) ([]byte, error) // Close performs any cleanup after the fetcher is not longer needed. Close() error @@ -48,7 +47,6 @@ func NewMultiFetcher(f ...Fetcher) Fetcher { } // Fetch attempts to fetch the file at each of its fetchers until one succeeds. -// Returns io.ReadCloser on success, which caller must close. func (f *MultiFetcher) Fetch(ctx context.Context, ipfsPath string) ([]byte, error) { var errs error for _, fetcher := range f.fetchers { diff --git a/repo/fsrepo/migrations/httpfetcher.go b/repo/fsrepo/migrations/httpfetcher.go index 705ada11919..3f74a084ed6 100644 --- a/repo/fsrepo/migrations/httpfetcher.go +++ b/repo/fsrepo/migrations/httpfetcher.go @@ -60,8 +60,7 @@ func NewHttpFetcher(distPath, gateway, userAgent string, fetchLimit int64) *Http } // Fetch attempts to fetch the file at the given path, from the distribution -// site configured for this HttpFetcher. Returns io.ReadCloser on success, -// which caller must close. +// site configured for this HttpFetcher. func (f *HttpFetcher) Fetch(ctx context.Context, filePath string) ([]byte, error) { gwURL := f.gateway + path.Join(f.distPath, filePath) fmt.Printf("Fetching with HTTP: %q\n", gwURL) diff --git a/repo/fsrepo/migrations/ipfsfetcher/ipfsfetcher.go b/repo/fsrepo/migrations/ipfsfetcher/ipfsfetcher.go index 14b2d485e5c..11203ed5a05 100644 --- a/repo/fsrepo/migrations/ipfsfetcher/ipfsfetcher.go +++ b/repo/fsrepo/migrations/ipfsfetcher/ipfsfetcher.go @@ -87,8 +87,7 @@ func NewIpfsFetcher(distPath string, fetchLimit int64, repoRoot *string) *IpfsFe } // Fetch attempts to fetch the file at the given path, from the distribution -// site configured for this HttpFetcher. Returns io.ReadCloser on success, -// which caller must close. +// site configured for this HttpFetcher. func (f *IpfsFetcher) Fetch(ctx context.Context, filePath string) ([]byte, error) { // Initialize and start IPFS node on first call to Fetch, since the fetcher // may be created by not used.