Skip to content

Commit 141cf9b

Browse files
authored
lakectl import (#4558)
1 parent 9a75778 commit 141cf9b

File tree

8 files changed

+348
-8
lines changed

8 files changed

+348
-8
lines changed

cmd/lakectl/cmd/import.go

+234
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,234 @@
1+
package cmd
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"net/http"
7+
"os"
8+
"regexp"
9+
"time"
10+
11+
"github.com/schollz/progressbar/v3"
12+
"github.com/spf13/cobra"
13+
"github.com/treeverse/lakefs/pkg/api"
14+
)
15+
16+
const importSummaryTemplate = `Import of {{ .Objects | yellow }} object(s) into "{{.Branch}}" completed.
17+
MetaRange ID: {{.MetaRangeID|yellow}}
18+
Commit ID: {{.Commit.Id|yellow}}
19+
Message: {{.Commit.Message}}
20+
Timestamp: {{.Commit.CreationDate|date}}
21+
Parents: {{.Commit.Parents|join ", "}}
22+
`
23+
24+
var importCmd = &cobra.Command{
25+
Use: "import --from <object store URI> --to <lakeFS path URI> [--merge]",
26+
Short: "Import data from external source to an imported branch (with optional merge)",
27+
Run: func(cmd *cobra.Command, args []string) {
28+
flags := cmd.Flags()
29+
merge := MustBool(flags.GetBool("merge"))
30+
noProgress := MustBool(flags.GetBool("no-progress"))
31+
from := MustString(flags.GetString("from"))
32+
to := MustString(flags.GetString("to"))
33+
toURI := MustParsePathURI("to", to)
34+
message := MustString(flags.GetString("message"))
35+
metadata, err := getKV(cmd, "meta")
36+
if err != nil {
37+
DieErr(err)
38+
}
39+
40+
ctx := cmd.Context()
41+
client := getClient()
42+
verifySourceMatchConfiguredStorage(ctx, client, from)
43+
44+
// verify target branch exists before we try to create and import into the associated imported branch
45+
if err, ok := branchExists(ctx, client, toURI.Repository, toURI.Ref); err != nil {
46+
DieErr(err)
47+
} else if !ok {
48+
DieFmt("Target branch '%s', does not exists!", toURI.Ref)
49+
}
50+
51+
// setup progress bar - based on `progressbar.Default` defaults + control visibility
52+
bar := newImportProgressBar(!noProgress)
53+
var (
54+
sum int
55+
continuationToken *string
56+
after string
57+
ranges = make([]api.RangeMetadata, 0)
58+
)
59+
for {
60+
rangeResp, err := client.IngestRangeWithResponse(ctx, toURI.Repository, api.IngestRangeJSONRequestBody{
61+
After: after,
62+
ContinuationToken: continuationToken,
63+
FromSourceURI: from,
64+
Prepend: api.StringValue(toURI.Path),
65+
})
66+
DieOnErrorOrUnexpectedStatusCode(rangeResp, err, http.StatusCreated)
67+
if rangeResp.JSON201 == nil {
68+
Die("Bad response from server", 1)
69+
}
70+
if rangeResp.JSON201.Range != nil {
71+
rangeInfo := *rangeResp.JSON201.Range
72+
ranges = append(ranges, rangeInfo)
73+
sum += rangeInfo.Count
74+
_ = bar.Add(rangeInfo.Count)
75+
}
76+
77+
continuationToken = rangeResp.JSON201.Pagination.ContinuationToken
78+
after = rangeResp.JSON201.Pagination.LastKey
79+
if !rangeResp.JSON201.Pagination.HasMore {
80+
break
81+
}
82+
}
83+
_ = bar.Clear()
84+
85+
// create metarange with all the ranges we created
86+
metaRangeResp, err := client.CreateMetaRangeWithResponse(ctx, toURI.Repository, api.CreateMetaRangeJSONRequestBody{
87+
Ranges: ranges,
88+
})
89+
DieOnErrorOrUnexpectedStatusCode(metaRangeResp, err, http.StatusCreated)
90+
if metaRangeResp.JSON201 == nil {
91+
Die("Bad response from server", 1)
92+
}
93+
94+
importedBranchID := formatImportedBranchID(toURI.Ref)
95+
ensureBranchExists(ctx, client, toURI.Repository, importedBranchID, toURI.Ref)
96+
97+
// commit metarange to the imported branch
98+
commitResp, err := client.CommitWithResponse(ctx, toURI.Repository, importedBranchID, &api.CommitParams{
99+
SourceMetarange: metaRangeResp.JSON201.Id,
100+
}, api.CommitJSONRequestBody{
101+
Message: message,
102+
Metadata: &api.CommitCreation_Metadata{
103+
AdditionalProperties: metadata,
104+
},
105+
})
106+
DieOnErrorOrUnexpectedStatusCode(commitResp, err, http.StatusCreated)
107+
if commitResp.JSON201 == nil {
108+
Die("Bad response from server", 1)
109+
}
110+
Write(importSummaryTemplate, struct {
111+
Objects int
112+
MetaRangeID string
113+
Branch string
114+
Commit *api.Commit
115+
}{
116+
Objects: sum,
117+
MetaRangeID: api.StringValue(metaRangeResp.JSON201.Id),
118+
Branch: importedBranchID,
119+
Commit: commitResp.JSON201,
120+
})
121+
122+
// merge to target branch if needed
123+
if merge {
124+
mergeImportedBranch(ctx, client, toURI.Repository, importedBranchID, toURI.Ref)
125+
}
126+
},
127+
}
128+
129+
func newImportProgressBar(visible bool) *progressbar.ProgressBar {
130+
const (
131+
barSpinnerType = 14
132+
barWidth = 10
133+
barThrottle = 65 * time.Millisecond
134+
)
135+
bar := progressbar.NewOptions64(
136+
-1,
137+
progressbar.OptionSetDescription("Importing"),
138+
progressbar.OptionSetWriter(os.Stderr),
139+
progressbar.OptionSetWidth(barWidth),
140+
progressbar.OptionThrottle(barThrottle),
141+
progressbar.OptionShowCount(),
142+
progressbar.OptionShowIts(),
143+
progressbar.OptionSetItsString("object"),
144+
progressbar.OptionOnCompletion(func() {
145+
_, _ = fmt.Fprint(os.Stderr, "\n")
146+
}),
147+
progressbar.OptionSpinnerType(barSpinnerType),
148+
progressbar.OptionFullWidth(),
149+
progressbar.OptionSetVisibility(visible),
150+
)
151+
_ = bar.RenderBlank()
152+
return bar
153+
}
154+
155+
func verifySourceMatchConfiguredStorage(ctx context.Context, client *api.ClientWithResponses, source string) {
156+
storageConfResp, err := client.GetStorageConfigWithResponse(ctx)
157+
DieOnErrorOrUnexpectedStatusCode(storageConfResp, err, http.StatusOK)
158+
storageConfig := storageConfResp.JSON200
159+
if storageConfig == nil {
160+
Die("Bad response from server", 1)
161+
}
162+
if storageConfig.BlockstoreNamespaceValidityRegex == "" {
163+
return
164+
}
165+
matched, err := regexp.MatchString(storageConfig.BlockstoreNamespaceValidityRegex, source)
166+
if err != nil {
167+
DieErr(err)
168+
}
169+
if !matched {
170+
DieFmt("import source '%s' doesn't match current configured storage '%s'", source, storageConfig.BlockstoreType)
171+
}
172+
}
173+
174+
func mergeImportedBranch(ctx context.Context, client *api.ClientWithResponses, repository, fromBranch, toBranch string) {
175+
mergeResp, err := client.MergeIntoBranchWithResponse(ctx, repository, fromBranch, toBranch, api.MergeIntoBranchJSONRequestBody{})
176+
DieOnErrorOrUnexpectedStatusCode(mergeResp, err, http.StatusOK)
177+
if mergeResp.JSON200 == nil {
178+
Die("Bad response from server", 1)
179+
}
180+
Write(mergeCreateTemplate, struct {
181+
Merge FromTo
182+
Result *api.MergeResult
183+
}{
184+
Merge: FromTo{
185+
FromRef: fromBranch,
186+
ToRef: toBranch,
187+
},
188+
Result: mergeResp.JSON200,
189+
})
190+
}
191+
192+
func branchExists(ctx context.Context, client *api.ClientWithResponses, repository string, branch string) (error, bool) {
193+
resp, err := client.GetBranchWithResponse(ctx, repository, branch)
194+
if err != nil {
195+
return err, false
196+
}
197+
if resp.JSON200 != nil {
198+
return nil, true
199+
}
200+
if resp.JSON404 != nil {
201+
return nil, false
202+
}
203+
return RetrieveError(resp, err), false
204+
}
205+
206+
func ensureBranchExists(ctx context.Context, client *api.ClientWithResponses, repository, branch, sourceBranch string) {
207+
if err, ok := branchExists(ctx, client, repository, branch); err != nil {
208+
DieErr(err)
209+
} else if ok {
210+
return
211+
}
212+
createBranchResp, err := client.CreateBranchWithResponse(ctx, repository, api.CreateBranchJSONRequestBody{
213+
Name: branch,
214+
Source: sourceBranch,
215+
})
216+
DieOnErrorOrUnexpectedStatusCode(createBranchResp, err, http.StatusCreated)
217+
}
218+
219+
func formatImportedBranchID(branch string) string {
220+
return "_" + branch + "_imported"
221+
}
222+
223+
//nolint:gochecknoinits,gomnd
224+
func init() {
225+
importCmd.Flags().String("from", "", "prefix to read from (e.g. \"s3://bucket/sub/path/\"). must not be in a storage namespace")
226+
_ = importCmd.MarkFlagRequired("from")
227+
importCmd.Flags().String("to", "", "lakeFS path to load objects into (e.g. \"lakefs://repo/branch/sub/path/\")")
228+
_ = importCmd.MarkFlagRequired("to")
229+
importCmd.Flags().Bool("merge", false, "merge imported branch into target branch")
230+
importCmd.Flags().Bool("no-progress", false, "switch off the progress output")
231+
importCmd.Flags().StringP("message", "m", "Import objects", "commit message")
232+
importCmd.Flags().StringSlice("meta", []string{}, "key value pair in the form of key=value")
233+
rootCmd.AddCommand(importCmd)
234+
}

docs/reference/commands.md

+23
Original file line numberDiff line numberDiff line change
@@ -2168,6 +2168,29 @@ lakectl help [command] [flags]
21682168

21692169

21702170

2171+
### lakectl import
2172+
2173+
Import data from external source to an imported branch (with optional merge)
2174+
2175+
```
2176+
lakectl import --from <object store URI> --to <lakeFS path URI> [--merge] [flags]
2177+
```
2178+
2179+
#### Options
2180+
{:.no_toc}
2181+
2182+
```
2183+
--from string prefix to read from (e.g. "s3://bucket/sub/path/"). must not be in a storage namespace
2184+
-h, --help help for import
2185+
--merge merge imported branch into target branch
2186+
-m, --message string commit message (default "Import objects")
2187+
--meta strings key value pair in the form of key=value
2188+
--no-progress switch off the progress output
2189+
--to string lakeFS path to load objects into (e.g. "lakefs://repo/branch/sub/path/")
2190+
```
2191+
2192+
2193+
21712194
### lakectl ingest
21722195

21732196
Ingest objects from an external source into a lakeFS branch (without actually copying them)

docs/setup/import.md

+48-8
Original file line numberDiff line numberDiff line change
@@ -26,15 +26,15 @@ and in the same region of your destination bucket.
2626
lakeFS supports two ways to ingest objects from the object store without copying the data:
2727

2828
1. [Importing using the lakeFS UI](#importing-using-the-lakefs-ui) - A UI dialog to trigger an import to a designated import branch. It creates a commit from all imported objects.
29-
1. [Importing using lakectl cli](#importing-using-lakectl-cli) - You can use a the [`lakectl` CLI command](../reference/commands.md#lakectl) to create uncommitted objects in a branch. It will make sequential calls between the CLI and the server.
29+
1. [Importing using lakectl cli](#importing-using-lakectl-cli) - You can use the [`lakectl` CLI command](../reference/commands.md#lakectl) to create uncommitted objects in a branch. It will make sequential calls between the CLI and the server.
3030

3131
#### Using the import wizard
3232

3333
Clicking the Import button from any branch will open the following dialog:
3434

3535
![Import dialog example configured with S3](../assets/img/UI-Import-Dialog.png)
3636

37-
If it's the first import to the selected branch, it will create the import branch named `<branch_name>_imported`.
37+
If it's the first import to the selected branch, it will create the import branch named `_<branch_name>_imported`.
3838
lakeFS will import all objects from the Source URI to the import branch under the given prefix.
3939

4040
The UI will update periodically with the amount of objects imported. How long it takes depends on the amount of objects to be imported but will roughly be a few thousand objects per second.
@@ -48,18 +48,58 @@ Once the import is completed, you can merge the changes from the import branch t
4848

4949
### Importing using lakectl cli
5050

51-
The `lakectl` command supports ingesting objects from an external source.
52-
This is done by listing the source bucket (and optional prefix), and creating pointers to the returned objects in lakeFS.
51+
The `lakectl` cli supports _import_ and _ingest_ commands to import objects from an external source.
5352

54-
#### Prerequisites
53+
- The _import_ command acts the same as the UI import wizard. It imports (zero copy) and commits the changes on `_<branch_name>_imported` branch with an optional flag to also merge the changes to `<branch_name>`.
54+
- The _Ingest_ is listing the source bucket (and optional prefix) from the client, and creating pointers to the returned objects in lakeFS. The objects will be staged on the branch.
5555

56-
1. The user calling `lakectl ingest` has permissions to list the objects at the source object store.
57-
2. _recommended_: The lakeFS installation has read permissions to the objects being ingested (to support downloading them directly from the lakeFS server)
58-
3. The source path is **not** a storage namespace used by lakeFS. For example, if `lakefs://my-repo` created with storage namespace `s3://my-bucket`, then `s3://my-bucket/*` cannot be an ingestion source.
56+
57+
#### Using the `lakectl import` command
58+
59+
##### Usage
60+
61+
<div class="tabs">
62+
<ul>
63+
<li><a href="#import-tabs-1">AWS S3 or S3 API Compatible storage</a></li>
64+
<li><a href="#import-tabs-2">Azure Blob</a></li>
65+
<li><a href="#import-tabs-3">Google Cloud Storage</a></li>
66+
</ul>
67+
<div markdown="1" id="import-tabs-1">
68+
```shell
69+
lakectl import \
70+
--from s3://bucket/optional/prefix/ \
71+
--to lakefs://my-repo/my-branch/optional/path/
72+
```
73+
</div>
74+
<div markdown="1" id="import-tabs-2">
75+
```shell
76+
lakectl import \
77+
--from https://storageAccountName.blob.core.windows.net/container/optional/prefix/ \
78+
--to lakefs://my-repo/my-branch/optional/path/
79+
```
80+
</div>
81+
<div markdown="1" id="import-tabs-3">
82+
```shell
83+
lakectl import \
84+
--from gs://bucket/optional/prefix/ \
85+
--to lakefs://my-repo/my-branch/optional/path/
86+
```
87+
</div>
88+
</div>
89+
90+
The imported objects will be committed to `_my-branch_imported` branch. If the branch does not exist, it will be created. The flag `--merge` will merge the branch `_my-branch_imported` to `my-branch` after a successful import.
5991

6092

6193
#### Using the `lakectl ingest` command
6294

95+
##### Prerequisites
96+
97+
1. The user calling `lakectl ingest` has permissions to list the objects at the source object store.
98+
2. _Recommended_: The lakeFS installation has read permissions to the objects being ingested (to support downloading them directly from the lakeFS server)
99+
3. The source path is **not** a storage namespace used by lakeFS. For example, if `lakefs://my-repo` created with storage namespace `s3://my-bucket`, then `s3://my-bucket/*` cannot be an ingestion source.
100+
101+
##### Usage
102+
63103
<div class="tabs">
64104
<ul>
65105
<li><a href="#ingest-tabs-1">AWS S3 or S3 API Compatible storage</a></li>

esti/golden/lakectl_help.golden

+1
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ Available Commands:
1818
fs View and manipulate objects
1919
gc Manage the garbage collection policy
2020
help Help about any command
21+
import Import data from external source to an imported branch (with optional merge)
2122
ingest Ingest objects from an external source into a lakeFS branch (without actually copying them)
2223
log Show log of commits
2324
merge Merge & commit changes from source branch into destination branch

esti/golden/lakectl_import.golden

+6
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
Import of ${OBJECTS} object(s) into "${IMPORTED_BRANCH}" completed.
2+
MetaRange ID: <COMMIT_ID>
3+
Commit ID: <COMMIT_ID>
4+
Message: Import objects
5+
Timestamp: <DATE> <TIME> <TZ>
6+
Parents: <COMMIT_ID>

esti/golden/lakectl_import_and_merge.golden

+7
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
Import of ${OBJECTS} object(s) into "${IMPORTED_BRANCH}" completed.
2+
MetaRange ID: <COMMIT_ID>
3+
Commit ID: <COMMIT_ID>
4+
Message: Import objects
5+
Timestamp: <DATE> <TIME> <TZ>
6+
Parents: <COMMIT_ID>
7+
Merged "${IMPORTED_BRANCH}" into "${BRANCH}" to get "<COMMIT_ID>".

esti/golden/lakectl_import_with_message.golden

+6
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
Import of ${OBJECTS} object(s) into "${IMPORTED_BRANCH}" completed.
2+
MetaRange ID: <COMMIT_ID>
3+
Commit ID: <COMMIT_ID>
4+
Message: import too
5+
Timestamp: <DATE> <TIME> <TZ>
6+
Parents: <COMMIT_ID>

0 commit comments

Comments
 (0)