-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathobject.go
418 lines (370 loc) · 10.4 KB
/
object.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
package rkive
import (
"bytes"
"github.com/philhofer/rkive/rpbc"
"strconv"
"unsafe"
)
// unsafe string-to-byte
// only use this when 's' has the same scope
// as the returned byte slice, and there are guarantees
// that the slice will not be mutated.
func ustr(s string) []byte { return *(*[]byte)(unsafe.Pointer(&s)) }
// Object is the interface that must
// be satisfied in order to fetch or
// store an object in Riak.
type Object interface {
// Objects must maintain
// a reference to an Info
// struct, which contains
// this object's riak
// metadata. Info() must
// never return nil, or it
// will cause a panic.
Info() *Info
// Marshal should return the encoded
// value of the object, and any
// relevant errors.
Marshal() ([]byte, error)
// Unmarshal should unmarshal the object
// from a []byte. It can safely use
// zero-copy methods, as the byte slice
// passed to it will "belong" to the
// object.
Unmarshal([]byte) error
}
// Duplicator types know how to return
// an empty copy of themselves, on top of
// fulfilling the Object interface.
type Duplicator interface {
Object
// Empty should return an initialized
// (zero-value) object of the same underlying
// type as the parent.
NewEmpty() Object
}
// ObjectM is an object that also knows how to merge
// itself with siblings. If an object has this interface
// defined, this package will use the Merge method to transparently
// handle siblings returned from Riak.
type ObjectM interface {
Duplicator
// Merge should merge the argument object into the method receiver. It
// is safe to type-assert the argument of Merge to the same type
// as the type of the object satisfying the inteface. (Under the hood,
// the argument passed to Merge is simply the value of NewEmpty() after
// data has been read into it.) Merge is used to iteratively merge many sibling objects.
Merge(o Object)
}
// sibling merge - object should be Store()d after call
func handleMerge(om ObjectM, ct []*rpbc.RpbContent) error {
var err error
for i, ctt := range ct {
if i == 0 {
err = readContent(om, ctt)
if err != nil {
return err
}
continue
}
// read into new empty
nom := om.NewEmpty()
err = readContent(nom, ctt)
nom.Info().vclock = append(nom.Info().vclock[0:0], ctt.Vtag...)
if err != nil {
return err
}
om.Merge(nom)
// transfer vclocks if we didn't have one before
if len(om.Info().vclock) == 0 && len(nom.Info().vclock) > 0 {
om.Info().vclock = append(om.Info().vclock, nom.Info().vclock...)
}
}
return nil
}
// Info contains information
// about a specific Riak object. You can use
// it to satisfy the Object interface.
// Info's zero value (Info{}) is valid.
// You can use the Info object to add
// links, seconary indexes, and user metadata
// to the object referencing this Info object.
// Calls to Fetch(), Push(), Store(), New(),
// etc. will changes the contents of this struct.
type Info struct {
key []byte // key
bucket []byte // bucket
links []*rpbc.RpbLink // Links
idxs []*rpbc.RpbPair // Indexes
meta []*rpbc.RpbPair // Meta
ctype []byte // Content-Type
vclock []byte // Vclock
}
func readHeader(o Object, ctnt *rpbc.RpbContent) {
o.Info().ctype = append(o.Info().ctype[0:0], ctnt.ContentType...)
o.Info().links = append(o.Info().links[0:0], ctnt.Links...)
o.Info().idxs = append(o.Info().idxs[0:0], ctnt.Indexes...)
o.Info().meta = append(o.Info().meta[0:0], ctnt.Usermeta...)
}
// read into 'o' from content
func readContent(o Object, ctnt *rpbc.RpbContent) error {
if ctnt.GetDeleted() {
return ErrDeleted
}
readHeader(o, ctnt)
return o.Unmarshal(ctnt.Value)
}
// write into content from 'o'
func writeContent(o Object, ctnt *rpbc.RpbContent) error {
var err error
ctnt.Value, err = o.Marshal()
if err != nil {
return err
}
ctnt.ContentType = append(ctnt.ContentType[0:0], o.Info().ctype...)
ctnt.Links = append(ctnt.Links[0:0], o.Info().links...)
ctnt.Usermeta = append(ctnt.Usermeta[0:0], o.Info().meta...)
ctnt.Indexes = append(ctnt.Indexes[0:0], o.Info().idxs...)
return nil
}
func set(l *[]*rpbc.RpbPair, key, value []byte) {
if l == nil || len(*l) == 0 {
goto add
}
for _, item := range *l {
if bytes.Equal(key, item.Key) {
item.Key = key
item.Value = value
return
}
}
add:
*l = append(*l, &rpbc.RpbPair{
Key: key,
Value: value,
})
return
}
func get(l *[]*rpbc.RpbPair, key []byte) []byte {
if l == nil || len(*l) == 0 {
return nil
}
for _, item := range *l {
if bytes.Equal(key, item.Key) {
return item.Value
}
}
return nil
}
func add(l *[]*rpbc.RpbPair, key, value []byte) bool {
if l == nil || len(*l) == 0 {
goto add
}
for _, item := range *l {
if bytes.Equal(key, item.Key) {
if bytes.Equal(value, item.Value) {
return true
}
return false
}
}
add:
*l = append(*l, &rpbc.RpbPair{
Key: key,
Value: value,
})
return true
}
func del(l *[]*rpbc.RpbPair, key []byte) {
if l == nil || len(*l) == 0 {
return
}
nl := len(*l)
for i, item := range *l {
if bytes.Equal(key, item.Key) {
(*l)[i], (*l)[nl-1], *l = (*l)[nl-1], nil, (*l)[:nl-1]
return
}
}
}
func all(l *[]*rpbc.RpbPair) [][2]string {
nl := len(*l)
if nl == 0 {
return nil
}
out := make([][2]string, nl)
for i, item := range *l {
out[i] = [2]string{string(item.Key), string(item.Value)}
}
return out
}
// Key is the canonical riak key
func (in *Info) Key() string { return string(in.key) }
// Bucket is the canonical riak bucket
func (in *Info) Bucket() string { return string(in.bucket) }
// ContentType is the content-type
func (in *Info) ContentType() string { return string(in.ctype) }
// SetContentType sets the content-type
// to 's'.
func (in *Info) SetContentType(s string) { in.ctype = []byte(s) }
// Vclock is the vector clock value as a string
func (in *Info) Vclock() string { return string(in.vclock) }
// format key as key_bin
func fmtbin(key string) []byte {
kl := len(key)
kv := make([]byte, kl+4)
copy(kv[0:], key)
copy(kv[kl:], []byte("_bin"))
kv = bytes.ToLower(kv)
return kv
}
// format key as key_int
func fmtint(key string) []byte {
kl := len(key)
kv := make([]byte, kl+4)
copy(kv[0:], key)
copy(kv[kl:], []byte("_int"))
kv = bytes.ToLower(kv)
return kv
}
// Add adds a key-value pair to an Indexes
// object, but returns false if a key already
// exists under that name and has a different value.
// Returns true if the index already has this exact key-value
// pair, or if the pair is written in with no conflicts.
// (All XxxIndex operations append "_bin" to key values
// internally in order to comply with the Riak secondary
// index specification, so the user does not have to
// include it.)
func (in *Info) AddIndex(key string, value string) bool {
return add(&in.idxs, fmtbin(key), []byte(value))
}
// AddIndexInt sets an integer secondary index value
// using the same conditional rules as AddIndex
func (in *Info) AddIndexInt(key string, value int64) bool {
return add(&in.idxs, fmtint(key), ustr(strconv.FormatInt(value, 10)))
}
// Set sets a key-value pair in an Indexes object
func (in *Info) SetIndex(key string, value string) {
set(&in.idxs, fmtbin(key), []byte(value))
}
// SetIndexInt sets a integer secondary index value
func (in *Info) SetIndexInt(key string, value int64) {
set(&in.idxs, fmtint(key), ustr(strconv.FormatInt(value, 10)))
}
// Get gets a key-value pair in an indexes object
func (in *Info) GetIndex(key string) (val string) {
return string(get(&in.idxs, fmtbin(key)))
}
// GetIndexInt gets an integer index value
func (in *Info) GetIndexInt(key string) *int64 {
bts := get(&in.idxs, fmtint(key))
if bts == nil {
return nil
}
val, _ := strconv.ParseInt(string(bts), 10, 64)
return &val
}
// RemoveIndex removes a key from the object
func (in *Info) RemoveIndex(key string) {
del(&in.idxs, fmtbin(key))
}
// RemoveIndexInt removes an integer index key
// from an object
func (in *Info) RemoveIndexInt(key string) {
del(&in.idxs, fmtint(key))
}
// Indexes returns a list of all of the
// key-value pairs in this object. (Key first,
// then value.) Note that string-string
// indexes will have keys postfixed with
// "_bin", and string-int indexes will
// have keys postfixed with "_int", per the
// Riak secondary index specification.
func (in *Info) Indexes() [][2]string {
return all(&in.idxs)
}
// AddMeta conditionally adds a key-value pair
// if it didn't exist already
func (in *Info) AddMeta(key string, value string) bool {
return add(&in.meta, []byte(key), []byte(value))
}
// SetMeta sets a key-value pair
func (in *Info) SetMeta(key string, value string) {
set(&in.meta, []byte(key), []byte(value))
}
// GetMeta gets a meta value
func (in *Info) GetMeta(key string) (val string) {
return string(get(&in.meta, []byte(key)))
}
// RemoveMeta deletes the meta value
// at a key
func (in *Info) RemoveMeta(key string) {
del(&in.meta, []byte(key))
}
// Metas returns all of the metadata
// key-value pairs. (Key first, then value.)
func (in *Info) Metas() [][2]string {
return all(&in.idxs)
}
// AddLink adds a link conditionally. It returns true
// if the value was already set to this bucket-key pair,
// or if no value existed at 'name'. It returns false otherwise.
func (in *Info) AddLink(name string, bucket string, key string) bool {
nm := []byte(name)
// don't duplicate
for _, link := range in.links {
if bytes.Equal(nm, link.GetTag()) {
return false
}
}
in.links = append(in.links, &rpbc.RpbLink{
Bucket: []byte(bucket),
Key: []byte(key),
Tag: nm,
})
return true
}
// SetLink sets a link for an object
func (in *Info) SetLink(name string, bucket string, key string) {
nm := []byte(name)
for _, link := range in.links {
if bytes.Equal(nm, link.GetTag()) {
link.Bucket = []byte(bucket)
link.Key = []byte(key)
return
}
}
in.links = append(in.links, &rpbc.RpbLink{
Bucket: []byte(bucket),
Key: []byte(key),
Tag: nm,
})
return
}
// RemoveLink removes a link (if it exists)
func (in *Info) RemoveLink(name string) {
nm := []byte(name)
nl := len(in.links)
if nl == 0 {
return
}
for i, link := range in.links {
if bytes.Equal(nm, link.GetTag()) {
// swap and don't preserve order
in.links[i], in.links[nl-1], in.links = in.links[nl-1], nil, in.links[:nl-1]
}
}
}
// GetLink gets a link from the object
func (in *Info) GetLink(name string) (bucket string, key string) {
nm := []byte(name)
for _, link := range in.links {
if bytes.Equal(nm, link.GetTag()) {
bucket = string(link.GetBucket())
key = string(link.GetKey())
return
}
}
return
}