Skip to content

Commit 4ee4200

Browse files
committed
added dbfs datasource and resource objects for uploading files
1 parent ae824d3 commit 4ee4200

10 files changed

+599
-5
lines changed

Makefile

+2-2
Original file line numberDiff line numberDiff line change
@@ -29,11 +29,11 @@ python-setup:
2929
@cd docs && python -m virtualenv venv
3030
@cd docs && source venv/bin/activate && python -m pip install -r requirements.txt
3131

32-
docs:
32+
docs: python-setup
3333
@echo "==> Building Docs ..."
3434
@cd docs && source venv/bin/activate && make clean && make html
3535

36-
opendocs: docs
36+
opendocs: python-setup docs
3737
@echo "==> Opening Docs ..."
3838
@cd docs && open build/html/index.html
3939

client/client.go

+9-1
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,14 @@ func (o *DBClientOption) getRequestURI(path string, apiVersion string) (string,
9696
return requestURI, nil
9797
}
9898

99+
func onlyNBytes(buf []byte, numBytes int64) []byte {
100+
if len(buf) > int(numBytes) {
101+
return buf[:numBytes]
102+
} else {
103+
return buf
104+
}
105+
}
106+
99107
// PerformQuery can be used in a client or directly
100108
func PerformQuery(option DBClientOption, method, path string, apiVersion string, headers map[string]string, marshalJson bool, useRawPath bool, data interface{}) ([]byte, error) {
101109

@@ -133,7 +141,7 @@ func PerformQuery(option DBClientOption, method, path string, apiVersion string,
133141
}
134142

135143
requestBody = bodyBytes
136-
log.Println(string(requestBody))
144+
log.Println(string(onlyNBytes(requestBody, 1e3)))
137145
} else {
138146
requestBody = []byte(data.(string))
139147
}

client/model/dbfs.go

+7
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
package model
2+
3+
type FileInfo struct {
4+
Path string `json:"path,omitempty"`
5+
IsDir bool `json:"is_dir,omitempty"`
6+
FileSize int64 `json:"file_size,omitempty"`
7+
}

client/service/api.go

+5
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,11 @@ func (c DBApiClient) Jobs() JobsAPI {
6565
return jobsAPI.init(c)
6666
}
6767

68+
func (c DBApiClient) DBFS() DBFSAPI {
69+
var DBFSAPI DBFSAPI
70+
return DBFSAPI.init(c)
71+
}
72+
6873
func (c DBApiClient) InstancePools() InstancePoolsAPI {
6974
var instancePoolsAPI InstancePoolsAPI
7075
return instancePoolsAPI.init(c)

client/service/dbfs.go

+223
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,223 @@
1+
package service
2+
3+
import (
4+
"encoding/base64"
5+
"encoding/json"
6+
"github.com/databrickslabs/databricks-terraform/client/model"
7+
"log"
8+
"net/http"
9+
)
10+
11+
// TokensAPI exposes the Secrets API
12+
type DBFSAPI struct {
13+
Client DBApiClient
14+
}
15+
16+
func (a DBFSAPI) init(client DBApiClient) DBFSAPI {
17+
a.Client = client
18+
return a
19+
}
20+
21+
func (a DBFSAPI) Create(path string, overwrite bool, data string) error {
22+
byteArr, err := base64.StdEncoding.DecodeString(data)
23+
if err != nil {
24+
return err
25+
}
26+
byteChunks := split(byteArr, 1e6)
27+
handle, err := a.createHandle(path, overwrite)
28+
if err != nil {
29+
return err
30+
}
31+
for _, byteChunk := range byteChunks {
32+
b64Data := base64.StdEncoding.EncodeToString(byteChunk)
33+
err := a.addBlock(b64Data, handle)
34+
if err != nil {
35+
return err
36+
}
37+
}
38+
err = a.closeHandle(handle)
39+
return err
40+
}
41+
42+
func (a DBFSAPI) Read(path string) (string, error) {
43+
var bytesFetched []byte
44+
fetchLoop := true
45+
offSet := int64(0)
46+
length := int64(1e6)
47+
for fetchLoop == true {
48+
bytesRead, bytes, err := a.read(path, offSet, length)
49+
if err != nil {
50+
return "", err
51+
}
52+
log.Println(bytesRead)
53+
if bytesRead == 0 || bytesRead < length {
54+
fetchLoop = false
55+
}
56+
57+
bytesFetched = append(bytesFetched, bytes...)
58+
offSet = offSet + length
59+
}
60+
resp := base64.StdEncoding.EncodeToString(bytesFetched)
61+
return resp, nil
62+
}
63+
64+
func (a DBFSAPI) Delete(path string, recursive bool) error {
65+
deleteRequest := struct {
66+
Path string `json:"path,omitempty" url:"path,omitempty"`
67+
Recursive bool `json:"recursive,omitempty" url:"recursive,omitempty"`
68+
}{
69+
Path: path,
70+
Recursive: recursive,
71+
}
72+
_, err := a.Client.performQuery(http.MethodPost, "/dbfs/delete", "2.0", nil, deleteRequest)
73+
74+
return err
75+
}
76+
77+
func (a DBFSAPI) read(path string, offset, length int64) (int64, []byte, error) {
78+
var readBytes struct {
79+
BytesRead int64 `json:"bytes_read,omitempty" url:"bytes_read,omitempty"`
80+
Data string `json:"data,omitempty" url:"data,omitempty"`
81+
}
82+
readRequest := struct {
83+
Path string `json:"path,omitempty" url:"path,omitempty"`
84+
Offset int64 `json:"offset,omitempty" url:"offset,omitempty"`
85+
Length int64 `json:"length,omitempty" url:"length,omitempty"`
86+
}{
87+
Path: path,
88+
Offset: offset,
89+
Length: length,
90+
}
91+
resp, err := a.Client.performQuery(http.MethodGet, "/dbfs/read", "2.0", nil, readRequest)
92+
if err != nil {
93+
return readBytes.BytesRead, []byte{}, err
94+
}
95+
err = json.Unmarshal(resp, &readBytes)
96+
if err != nil {
97+
return 0, []byte{}, err
98+
}
99+
dataBytes, err := base64.StdEncoding.DecodeString(readBytes.Data)
100+
return readBytes.BytesRead, dataBytes, err
101+
}
102+
103+
func (a DBFSAPI) Status(path string) (model.FileInfo, error) {
104+
var fileInfo model.FileInfo
105+
statusRequest := struct {
106+
Path string `json:"path,omitempty" url:"path,omitempty"`
107+
}{
108+
Path: path,
109+
}
110+
resp, err := a.Client.performQuery(http.MethodGet, "/dbfs/get-status", "2.0", nil, statusRequest)
111+
if err != nil {
112+
return fileInfo, err
113+
}
114+
err = json.Unmarshal(resp, &fileInfo)
115+
return fileInfo, err
116+
}
117+
118+
func (a DBFSAPI) List(path string, recursive bool) ([]model.FileInfo, error) {
119+
if recursive == true {
120+
var paths []model.FileInfo
121+
a.recursiveAddPaths(path, &paths)
122+
return paths, nil
123+
} else {
124+
return a.list(path)
125+
}
126+
}
127+
128+
func (a DBFSAPI) recursiveAddPaths(path string, pathList *[]model.FileInfo) {
129+
fileInfoList, _ := a.list(path)
130+
for _, v := range fileInfoList {
131+
if v.IsDir == false {
132+
*pathList = append(*pathList, v)
133+
} else if v.IsDir == true {
134+
a.recursiveAddPaths(v.Path, pathList)
135+
}
136+
}
137+
}
138+
139+
func (a DBFSAPI) list(path string) ([]model.FileInfo, error) {
140+
var dbfsList struct {
141+
Files []model.FileInfo `json:"files,omitempty" url:"files,omitempty"`
142+
}
143+
listRequest := struct {
144+
Path string `json:"path,omitempty" url:"path,omitempty"`
145+
}{}
146+
listRequest.Path = path
147+
148+
resp, err := a.Client.performQuery(http.MethodGet, "/dbfs/list", "2.0", nil, listRequest)
149+
if err != nil {
150+
return dbfsList.Files, err
151+
}
152+
153+
err = json.Unmarshal(resp, &dbfsList)
154+
return dbfsList.Files, err
155+
}
156+
157+
func (a DBFSAPI) Mkdirs(path string) error {
158+
mkDirsRequest := struct {
159+
Path string `json:"path,omitempty" url:"path,omitempty"`
160+
}{}
161+
mkDirsRequest.Path = path
162+
163+
_, err := a.Client.performQuery(http.MethodPost, "/dbfs/mkdirs", "2.0", nil, mkDirsRequest)
164+
165+
return err
166+
}
167+
168+
func (a DBFSAPI) createHandle(path string, overwrite bool) (int64, error) {
169+
var handle struct {
170+
Handle int64 `json:"handle,omitempty" url:"handle,omitempty"`
171+
}
172+
createDBFSHandleRequest := struct {
173+
Path string `json:"path,omitempty" url:"path,omitempty"`
174+
Overwrite bool `json:"overwrite,omitempty" url:"overwrite,omitempty"`
175+
}{
176+
Path: path,
177+
Overwrite: overwrite,
178+
}
179+
180+
resp, err := a.Client.performQuery(http.MethodPost, "/dbfs/create", "2.0", nil, createDBFSHandleRequest)
181+
if err != nil {
182+
return handle.Handle, err
183+
}
184+
185+
err = json.Unmarshal(resp, &handle)
186+
return handle.Handle, err
187+
}
188+
189+
func (a DBFSAPI) addBlock(data string, handle int64) error {
190+
var addDBFSBlockRequest = struct {
191+
Data string `json:"data,omitempty" url:"data,omitempty"`
192+
Handle int64 `json:"handle,omitempty" url:"handle,omitempty"`
193+
}{
194+
Data: data,
195+
Handle: handle,
196+
}
197+
_, err := a.Client.performQuery(http.MethodPost, "/dbfs/add-block", "2.0", nil, addDBFSBlockRequest)
198+
return err
199+
}
200+
201+
func (a DBFSAPI) closeHandle(handle int64) error {
202+
closeHandleRequest := struct {
203+
Handle int64 `json:"handle,omitempty" url:"handle,omitempty"`
204+
}{
205+
Handle: handle,
206+
}
207+
208+
_, err := a.Client.performQuery(http.MethodPost, "/dbfs/close", "2.0", nil, closeHandleRequest)
209+
return err
210+
}
211+
212+
func split(buf []byte, lim int) [][]byte {
213+
var chunk []byte
214+
chunks := make([][]byte, 0, len(buf)/lim+1)
215+
for len(buf) >= lim {
216+
chunk, buf = buf[:lim], buf[lim:]
217+
chunks = append(chunks, chunk)
218+
}
219+
if len(buf) > 0 {
220+
chunks = append(chunks, buf[:len(buf)])
221+
}
222+
return chunks
223+
}
+82
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
package service
2+
3+
import (
4+
"bytes"
5+
"encoding/base64"
6+
"github.com/stretchr/testify/assert"
7+
"io/ioutil"
8+
"strings"
9+
"testing"
10+
)
11+
12+
func TestRead(t *testing.T) {
13+
dat, _ := ioutil.ReadFile("/Users/Sri.Tikkireddy/Downloads/DatabricksConcepts&BestPractices.pdf")
14+
t.Log(len(dat))
15+
}
16+
17+
func TestSplit(t *testing.T) {
18+
dat, _ := ioutil.ReadFile("/Users/Sri.Tikkireddy/Downloads/DatabricksConcepts&BestPractices.pdf")
19+
t.Log(len(dat))
20+
t.Log(len(split(dat, 1e6)))
21+
}
22+
23+
func GenString(times int) []byte {
24+
var buf bytes.Buffer
25+
for i := 0; i < times; i++ {
26+
buf.WriteString("Hello world how are you doing?\n")
27+
}
28+
return buf.Bytes()
29+
}
30+
31+
func TestCreateFile(t *testing.T) {
32+
if testing.Short() {
33+
t.Skip("skipping integration test in short mode.")
34+
}
35+
36+
path := "/sri/randomfile"
37+
38+
randomStr := GenString(500000)
39+
t.Log(len(randomStr))
40+
t.Log(len(base64.StdEncoding.EncodeToString(randomStr)))
41+
42+
client := GetIntegrationDBAPIClient()
43+
err := client.DBFS().Create(path, true, base64.StdEncoding.EncodeToString(randomStr))
44+
assert.NoError(t, err, err)
45+
46+
err = client.DBFS().Delete(path, false)
47+
assert.NoError(t, err, err)
48+
}
49+
50+
func TestReadFile(t *testing.T) {
51+
if testing.Short() {
52+
t.Skip("skipping integration test in short mode.")
53+
}
54+
55+
path := "/sri/randomfile"
56+
client := GetIntegrationDBAPIClient()
57+
data, err := client.DBFS().Read(path)
58+
assert.NoError(t, err, err)
59+
60+
byteArr, err := base64.StdEncoding.DecodeString(data)
61+
assert.NoError(t, err, err)
62+
63+
t.Log(len(strings.Split(string(byteArr), "\n")))
64+
t.Log(strings.Split(string(byteArr), "\n")[0])
65+
t.Log(strings.Split(string(byteArr), "\n")[1])
66+
t.Log(strings.Split(string(byteArr), "\n")[2])
67+
t.Log(strings.Split(string(byteArr), "\n")[499999])
68+
t.Log(strings.Split(string(byteArr), "\n")[500000])
69+
}
70+
71+
func TestListRecursive(t *testing.T) {
72+
if testing.Short() {
73+
t.Skip("skipping integration test in short mode.")
74+
}
75+
76+
path := "/andre_mesarovic/mlflow"
77+
client := GetIntegrationDBAPIClient()
78+
data, err := client.DBFS().List(path, true)
79+
assert.NoError(t, err, err)
80+
81+
t.Log(data)
82+
}

0 commit comments

Comments
 (0)