Skip to content

feat: refactor Fetcher interface used for downloading migrations #8728

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Feb 11, 2022
13 changes: 2 additions & 11 deletions repo/fsrepo/migrations/fetch.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"bytes"
"context"
"fmt"
"io"
"io/ioutil"
"os"
"os/exec"
Expand Down Expand Up @@ -111,16 +110,8 @@ func FetchBinary(ctx context.Context, fetcher Fetcher, dist, ver, binName, out s
}
defer arcFile.Close()

// Open connection to download archive from ipfs path
rc, err := fetcher.Fetch(ctx, arcDistPath)
if err != nil {
return "", err
}
defer rc.Close()

// Write download data
_, err = io.Copy(arcFile, rc)
if err != nil {
// Open connection to download archive from ipfs path and write to file
if err := fetcher.Fetch(ctx, arcDistPath, arcFile); err != nil {
return "", err
}
arcFile.Close()
Expand Down
16 changes: 8 additions & 8 deletions repo/fsrepo/migrations/fetch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package migrations

import (
"bufio"
"bytes"
"context"
"fmt"
"io"
Expand Down Expand Up @@ -96,14 +97,14 @@ func TestHttpFetch(t *testing.T) {

fetcher := NewHttpFetcher("", ts.URL, "", 0)

rc, err := fetcher.Fetch(ctx, "/versions")
var buf bytes.Buffer
err := fetcher.Fetch(ctx, "/versions", &buf)
if err != nil {
t.Fatal(err)
}
defer rc.Close()

var lines []string
scan := bufio.NewScanner(rc)
scan := bufio.NewScanner(&buf)
for scan.Scan() {
lines = append(lines, scan.Text())
}
Expand All @@ -120,7 +121,7 @@ func TestHttpFetch(t *testing.T) {
}

// Check not found
_, err = fetcher.Fetch(ctx, "/no_such_file")
err = fetcher.Fetch(ctx, "/no_such_file", &bytes.Buffer{})
if err == nil || !strings.Contains(err.Error(), "404") {
t.Fatal("expected error 404")
}
Expand Down Expand Up @@ -232,13 +233,12 @@ func TestMultiFetcher(t *testing.T) {

mf := NewMultiFetcher(badFetcher, fetcher)

rc, err := mf.Fetch(ctx, "/versions")
if err != nil {
var buf bytes.Buffer
if err := mf.Fetch(ctx, "/versions", &buf); err != nil {
t.Fatal(err)
}
defer rc.Close()

vers, err := ioutil.ReadAll(rc)
vers, err := ioutil.ReadAll(&buf)
if err != nil {
t.Fatal("could not read versions:", err)
}
Expand Down
15 changes: 10 additions & 5 deletions repo/fsrepo/migrations/fetcher.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package migrations

import (
"bytes"
"context"
"io"
"os"
Expand All @@ -21,7 +22,7 @@ const (
type Fetcher interface {
// Fetch attempts to fetch the file at the given ipfs path.
// Returns io.ReadCloser on success, which caller must close.
Fetch(ctx context.Context, filePath string) (io.ReadCloser, error)
Fetch(ctx context.Context, filePath string, writer io.Writer) error
// Close performs any cleanup after the fetcher is not longer needed.
Close() error
}
Expand Down Expand Up @@ -49,16 +50,20 @@ func NewMultiFetcher(f ...Fetcher) Fetcher {

// Fetch attempts to fetch the file at each of its fetchers until one succeeds.
// Returns io.ReadCloser on success, which caller must close.
func (f *MultiFetcher) Fetch(ctx context.Context, ipfsPath string) (io.ReadCloser, error) {
func (f *MultiFetcher) Fetch(ctx context.Context, ipfsPath string, writer io.Writer) error {
var errs error
for _, fetcher := range f.fetchers {
rc, err := fetcher.Fetch(ctx, ipfsPath)
var buf bytes.Buffer
err := fetcher.Fetch(ctx, ipfsPath, &buf)
if err == nil {
return rc, nil
if _, err := io.Copy(writer, &buf); err != nil {
return err
}
return nil
}
errs = multierror.Append(errs, err)
}
return nil, errs
return errs
}

func (f *MultiFetcher) Close() error {
Expand Down
22 changes: 15 additions & 7 deletions repo/fsrepo/migrations/httpfetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,13 +62,13 @@ func NewHttpFetcher(distPath, gateway, userAgent string, fetchLimit int64) *Http
// Fetch attempts to fetch the file at the given path, from the distribution
// site configured for this HttpFetcher. Returns io.ReadCloser on success,
// which caller must close.
func (f *HttpFetcher) Fetch(ctx context.Context, filePath string) (io.ReadCloser, error) {
func (f *HttpFetcher) Fetch(ctx context.Context, filePath string, writer io.Writer) error {
gwURL := f.gateway + path.Join(f.distPath, filePath)
fmt.Printf("Fetching with HTTP: %q\n", gwURL)

req, err := http.NewRequestWithContext(ctx, http.MethodGet, gwURL, nil)
if err != nil {
return nil, fmt.Errorf("http.NewRequest error: %s", err)
return fmt.Errorf("http.NewRequest error: %s", err)
}

if f.userAgent != "" {
Expand All @@ -77,22 +77,30 @@ func (f *HttpFetcher) Fetch(ctx context.Context, filePath string) (io.ReadCloser

resp, err := http.DefaultClient.Do(req)
if err != nil {
return nil, fmt.Errorf("http.DefaultClient.Do error: %s", err)
return fmt.Errorf("http.DefaultClient.Do error: %s", err)
}

if resp.StatusCode >= 400 {
defer resp.Body.Close()
mes, err := ioutil.ReadAll(resp.Body)
if err != nil {
return nil, fmt.Errorf("error reading error body: %s", err)
return fmt.Errorf("error reading error body: %s", err)
}
return nil, fmt.Errorf("GET %s error: %s: %s", gwURL, resp.Status, string(mes))
return fmt.Errorf("GET %s error: %s: %s", gwURL, resp.Status, string(mes))
}

var rc io.ReadCloser
if f.limit != 0 {
return NewLimitReadCloser(resp.Body, f.limit), nil
rc = NewLimitReadCloser(resp.Body, f.limit)
} else {
rc = resp.Body
}
return resp.Body, nil
defer rc.Close()

if _, err := io.Copy(writer, rc); err != nil {
return err
}
return nil
}

func (f *HttpFetcher) Close() error {
Expand Down
24 changes: 17 additions & 7 deletions repo/fsrepo/migrations/ipfsfetcher/ipfsfetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ type IpfsFetcher struct {
addrInfo peer.AddrInfo
}

var _ migrations.Fetcher = (*IpfsFetcher)(nil)

// NewIpfsFetcher creates a new IpfsFetcher
//
// Specifying "" for distPath sets the default IPNS path.
Expand Down Expand Up @@ -87,7 +89,7 @@ func NewIpfsFetcher(distPath string, fetchLimit int64, repoRoot *string) *IpfsFe
// Fetch attempts to fetch the file at the given path, from the distribution
// site configured for this HttpFetcher. Returns io.ReadCloser on success,
// which caller must close.
func (f *IpfsFetcher) Fetch(ctx context.Context, filePath string) (io.ReadCloser, error) {
func (f *IpfsFetcher) Fetch(ctx context.Context, filePath string, writer io.Writer) error {
// Initialize and start IPFS node on first call to Fetch, since the fetcher
// may be created by not used.
f.openOnce.Do(func() {
Expand All @@ -103,30 +105,38 @@ func (f *IpfsFetcher) Fetch(ctx context.Context, filePath string) (io.ReadCloser
fmt.Printf("Fetching with IPFS: %q\n", filePath)

if f.openErr != nil {
return nil, f.openErr
return f.openErr
}

iPath, err := parsePath(path.Join(f.distPath, filePath))
if err != nil {
return nil, err
return err
}

nd, err := f.ipfs.Unixfs().Get(ctx, iPath)
if err != nil {
return nil, err
return err
}

f.recordFetched(iPath)

fileNode, ok := nd.(files.File)
if !ok {
return nil, fmt.Errorf("%q is not a file", filePath)
return fmt.Errorf("%q is not a file", filePath)
}

var rc io.ReadCloser
if f.limit != 0 {
return migrations.NewLimitReadCloser(fileNode, f.limit), nil
rc = migrations.NewLimitReadCloser(fileNode, f.limit)
} else {
rc = fileNode
}
return fileNode, nil
defer rc.Close()

if _, err := io.Copy(writer, rc); err != nil {
return err
}
return nil
}

func (f *IpfsFetcher) Close() error {
Expand Down
10 changes: 5 additions & 5 deletions repo/fsrepo/migrations/ipfsfetcher/ipfsfetcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package ipfsfetcher

import (
"bufio"
"bytes"
"context"
"fmt"
"os"
Expand All @@ -28,14 +29,14 @@ func TestIpfsFetcher(t *testing.T) {
fetcher := NewIpfsFetcher("", 0, nil)
defer fetcher.Close()

rc, err := fetcher.Fetch(ctx, "go-ipfs/versions")
var buf bytes.Buffer
err := fetcher.Fetch(ctx, "go-ipfs/versions", &buf)
if err != nil {
t.Fatal(err)
}
defer rc.Close()

var lines []string
scan := bufio.NewScanner(rc)
scan := bufio.NewScanner(&buf)
for scan.Scan() {
lines = append(lines, scan.Text())
}
Expand All @@ -52,8 +53,7 @@ func TestIpfsFetcher(t *testing.T) {
}

// Check not found
_, err = fetcher.Fetch(ctx, "/no_such_file")
if err == nil {
if err = fetcher.Fetch(ctx, "/no_such_file", &bytes.Buffer{}); err == nil {
t.Fatal("expected error 404")
}

Expand Down
5 changes: 3 additions & 2 deletions repo/fsrepo/migrations/migrations.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,13 +155,14 @@ func ReadMigrationConfig(repoRoot string) (*config.Migration, error) {
// downloadSources,
func GetMigrationFetcher(downloadSources []string, distPath string, newIpfsFetcher func(string) Fetcher) (Fetcher, error) {
const httpUserAgent = "go-ipfs"
const numRetriesPerHTTP = 3

var fetchers []Fetcher
for _, src := range downloadSources {
src := strings.TrimSpace(src)
switch src {
case "HTTPS", "https", "HTTP", "http":
fetchers = append(fetchers, NewHttpFetcher(distPath, "", httpUserAgent, 0))
fetchers = append(fetchers, NewRetryFetcher(NewHttpFetcher(distPath, "", httpUserAgent, 0), numRetriesPerHTTP))
case "IPFS", "ipfs":
if newIpfsFetcher != nil {
fetchers = append(fetchers, newIpfsFetcher(distPath))
Expand All @@ -178,7 +179,7 @@ func GetMigrationFetcher(downloadSources []string, distPath string, newIpfsFetch
default:
return nil, errors.New("bad gateway address: url scheme must be http or https")
}
fetchers = append(fetchers, NewHttpFetcher(distPath, u.String(), httpUserAgent, 0))
fetchers = append(fetchers, NewRetryFetcher(NewHttpFetcher(distPath, u.String(), httpUserAgent, 0), numRetriesPerHTTP))
case "":
// Ignore empty string
}
Expand Down
14 changes: 10 additions & 4 deletions repo/fsrepo/migrations/migrations_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -290,8 +290,10 @@ func TestReadMigrationConfig(t *testing.T) {

type mockIpfsFetcher struct{}

func (m *mockIpfsFetcher) Fetch(ctx context.Context, filePath string) (io.ReadCloser, error) {
return nil, nil
var _ Fetcher = (*mockIpfsFetcher)(nil)

func (m *mockIpfsFetcher) Fetch(ctx context.Context, filePath string, writer io.Writer) error {
return nil
}

func (m *mockIpfsFetcher) Close() error {
Expand Down Expand Up @@ -323,7 +325,9 @@ func TestGetMigrationFetcher(t *testing.T) {
if err != nil {
t.Fatal(err)
}
if _, ok := f.(*HttpFetcher); !ok {
if rf, ok := f.(*RetryFetcher); !ok {
t.Fatal("expected RetryFetcher")
} else if _, ok := rf.Fetcher.(*HttpFetcher); !ok {
t.Fatal("expected HttpFetcher")
}

Expand All @@ -341,7 +345,9 @@ func TestGetMigrationFetcher(t *testing.T) {
if err != nil {
t.Fatal(err)
}
if _, ok := f.(*HttpFetcher); !ok {
if rf, ok := f.(*RetryFetcher); !ok {
t.Fatal("expected RetryFetcher")
} else if _, ok := rf.Fetcher.(*HttpFetcher); !ok {
t.Fatal("expected HttpFetcher")
}

Expand Down
43 changes: 43 additions & 0 deletions repo/fsrepo/migrations/retryfetcher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package migrations

import (
"bytes"
"context"
"fmt"
"io"
)

type RetryFetcher struct {
Fetcher
maxRetries int
}

var _ Fetcher = (*RetryFetcher)(nil)

func NewRetryFetcher(baseFetcher Fetcher, maxRetries int) *RetryFetcher {
return &RetryFetcher{Fetcher: baseFetcher, maxRetries: maxRetries}
}

func (r *RetryFetcher) Fetch(ctx context.Context, filePath string, writer io.Writer) error {
var lastErr error
for i := 0; i < r.maxRetries; i++ {
var buf bytes.Buffer
err := r.Fetcher.Fetch(ctx, filePath, &buf)
if err == nil {
if _, err := io.Copy(writer, &buf); err != nil {
return err
}
return nil
}

if ctx.Err() != nil {
return ctx.Err()
}
lastErr = err
}
return fmt.Errorf("exceeded number of retries. last error was %w", lastErr)
}

func (r *RetryFetcher) Close() error {
return r.Fetcher.Close()
}
Loading