Skip to content

Commit d33fd63

Browse files
committed
add basic consul support
1 parent 34faf4d commit d33fd63

File tree

3 files changed

+240
-1
lines changed

3 files changed

+240
-1
lines changed

client/client_test.go

+103-1
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ func TestSetGetPerformance(t *testing.T) {
6363
}
6464
var retVal JsonObj
6565

66-
const testCount = 100
66+
const testCount = 1000
6767

6868
log.Infof("Performing %d write tests", testCount)
6969

@@ -113,6 +113,108 @@ func TestSetGetPerformance(t *testing.T) {
113113
fmt.Printf("Set/Get/Del test successful\n")
114114
}
115115

116+
func TestConsulClientSetGet(t *testing.T) {
117+
// Get the consul plugin
118+
consulClient := objdb.GetPlugin("consul")
119+
120+
// Initialize the consul client
121+
if err := consulClient.Init([]string{}); err != nil {
122+
log.Fatalf("Error initializing consul plugin. Err: %v", err)
123+
}
124+
125+
setVal := JsonObj{
126+
Value: "test1",
127+
}
128+
129+
if err := consulClient.SetObj("/contiv.io/test", setVal); err != nil {
130+
fmt.Printf("Fatal setting key. Err: %v\n", err)
131+
t.Fatalf("Fatal setting key")
132+
}
133+
134+
var retVal JsonObj
135+
136+
if err := consulClient.GetObj("/contiv.io/test", &retVal); err != nil {
137+
fmt.Printf("Fatal getting key. Err: %v\n", err)
138+
t.Fatalf("Fatal getting key")
139+
}
140+
141+
if retVal.Value != "test1" {
142+
fmt.Printf("Got invalid response: %+v\n", retVal)
143+
t.Fatalf("Got invalid response")
144+
}
145+
146+
if err := consulClient.DelObj("/contiv.io/test"); err != nil {
147+
t.Fatalf("Fatal deleting test object. Err: %v", err)
148+
}
149+
150+
fmt.Printf("Consul Set/Get/Del test successful\n")
151+
}
152+
153+
func TestConsulSetGetPerformance(t *testing.T) {
154+
// Set
155+
setVal := JsonObj{
156+
Value: "test1",
157+
}
158+
var retVal JsonObj
159+
160+
const testCount = 1000
161+
162+
// Get the consul plugin
163+
consulClient := objdb.GetPlugin("consul")
164+
165+
// Initialize the consul client
166+
if err := consulClient.Init([]string{}); err != nil {
167+
log.Fatalf("Error initializing consul plugin. Err: %v", err)
168+
}
169+
170+
log.Infof("Performing %d write tests", testCount)
171+
172+
startTime := time.Now()
173+
174+
for i := 0; i < testCount; i++ {
175+
if err := consulClient.SetObj("/contiv.io/test"+strconv.Itoa(i), setVal); err != nil {
176+
fmt.Printf("Fatal setting key. Err: %v\n", err)
177+
t.Fatalf("Fatal setting key")
178+
}
179+
}
180+
181+
timeTook := time.Since(startTime).Nanoseconds() / 1000000
182+
log.Infof("Write Test took %d milli seconds per write. %d ms total", timeTook/testCount, timeTook)
183+
184+
log.Infof("Performing %d read tests", testCount)
185+
186+
// Get test
187+
startTime = time.Now()
188+
189+
for i := 0; i < testCount; i++ {
190+
if err := consulClient.GetObj("/contiv.io/test"+strconv.Itoa(i), &retVal); err != nil {
191+
fmt.Printf("Fatal getting key. Err: %v\n", err)
192+
t.Fatalf("Fatal getting key")
193+
}
194+
195+
if retVal.Value != "test1" {
196+
fmt.Printf("Got invalid response: %+v\n", retVal)
197+
t.Fatalf("Got invalid response")
198+
}
199+
}
200+
201+
timeTook = time.Since(startTime).Nanoseconds() / 1000000
202+
log.Infof("Read Test took %d milli seconds per read. %d ms total", timeTook/testCount, timeTook)
203+
204+
startTime = time.Now()
205+
206+
for i := 0; i < testCount; i++ {
207+
if err := consulClient.DelObj("/contiv.io/test" + strconv.Itoa(i)); err != nil {
208+
t.Fatalf("Fatal deleting test object. Err: %v", err)
209+
}
210+
}
211+
212+
timeTook = time.Since(startTime).Nanoseconds() / 1000000
213+
log.Infof("Delete Test took %d milli seconds per delete. %d ms total", timeTook/testCount, timeTook)
214+
215+
fmt.Printf("Set/Get/Del test successful\n")
216+
}
217+
116218
func TestLockAcquireRelease(t *testing.T) {
117219
// Create a lock
118220
lock1, err := client.NewLock("master", "hostname1", 10)

plugins/consulClient/consulClient.go

+135
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,135 @@
1+
package consulClient
2+
3+
import (
4+
"encoding/json"
5+
"strings"
6+
"sync"
7+
8+
"github.com/hashicorp/consul/api"
9+
"github.com/netplugin.orig/core"
10+
11+
log "github.com/Sirupsen/logrus"
12+
"github.com/contiv/objdb"
13+
)
14+
15+
type ConsulPlugin struct {
16+
client *api.Client // consul client
17+
consulConfig api.Config
18+
19+
mutex *sync.Mutex
20+
}
21+
22+
var consulPlugin = &ConsulPlugin{mutex: new(sync.Mutex)}
23+
24+
// InitPlugin Register the plugin
25+
func InitPlugin() {
26+
objdb.RegisterPlugin("consul", consulPlugin)
27+
}
28+
29+
// Init initializes the consul client
30+
func (self *ConsulPlugin) Init(machines []string) error {
31+
// default consul config
32+
self.consulConfig = api.Config{Address: "127.0.0.1:8500"}
33+
34+
// Init consul client
35+
client, err := api.NewClient(&self.consulConfig)
36+
if err != nil {
37+
log.Fatalf("Error initializing consul client")
38+
return err
39+
}
40+
41+
self.client = client
42+
43+
return nil
44+
}
45+
46+
func processKey(inKey string) string {
47+
//consul doesn't accepts keys starting with a '/', so trim the leading slash
48+
return strings.TrimPrefix(inKey, "/")
49+
}
50+
51+
func (self *ConsulPlugin) GetObj(key string, retVal interface{}) error {
52+
key = processKey("/contiv.io/obj/" + processKey(key))
53+
54+
resp, _, err := self.client.KV().Get(key, nil)
55+
if err != nil {
56+
return err
57+
}
58+
// Consul returns success and a nil kv when a key is not found,
59+
// translate it to 'Key not found' error
60+
if resp == nil {
61+
return core.Errorf("Key not found")
62+
}
63+
64+
// Parse JSON response
65+
if err := json.Unmarshal([]byte(resp.Value), retVal); err != nil {
66+
log.Errorf("Error parsing object %s, Err %v", resp.Value, err)
67+
return err
68+
}
69+
70+
return nil
71+
}
72+
73+
// ListDir returns a list of keys in a directory
74+
func (self *ConsulPlugin) ListDir(key string) ([]string, error) {
75+
key = processKey("/contiv.io/obj/" + processKey(key))
76+
77+
kvs, _, err := self.client.KV().List(key, nil)
78+
if err != nil {
79+
return nil, err
80+
}
81+
// Consul returns success and a nil kv when a key is not found,
82+
// translate it to 'Key not found' error
83+
if kvs == nil {
84+
return nil, core.Errorf("Key not found")
85+
}
86+
87+
var keys []string
88+
for _, kv := range kvs {
89+
keys = append(keys, kv.Key)
90+
}
91+
92+
return keys, nil
93+
}
94+
95+
// SetObj writes an object
96+
func (self *ConsulPlugin) SetObj(key string, value interface{}) error {
97+
key = processKey("/contiv.io/obj/" + processKey(key))
98+
99+
// JSON format the object
100+
jsonVal, err := json.Marshal(value)
101+
if err != nil {
102+
log.Errorf("Json conversion error. Err %v", err)
103+
return err
104+
}
105+
106+
_, err = self.client.KV().Put(&api.KVPair{Key: key, Value: jsonVal}, nil)
107+
108+
return err
109+
}
110+
111+
// DelObj deletes an object
112+
func (self *ConsulPlugin) DelObj(key string) error {
113+
key = processKey("/contiv.io/obj/" + processKey(key))
114+
_, err := self.client.KV().Delete(key, nil)
115+
return err
116+
}
117+
118+
func (self *ConsulPlugin) GetLocalAddr() (string, error) {
119+
return "", nil
120+
}
121+
func (self *ConsulPlugin) NewLock(name string, myId string, ttl uint64) (objdb.LockInterface, error) {
122+
return nil, nil
123+
}
124+
func (self *ConsulPlugin) RegisterService(serviceInfo objdb.ServiceInfo) error {
125+
return nil
126+
}
127+
func (self *ConsulPlugin) GetService(name string) ([]objdb.ServiceInfo, error) {
128+
return nil, nil
129+
}
130+
func (self *ConsulPlugin) WatchService(name string, eventCh chan objdb.WatchServiceEvent, stopCh chan bool) error {
131+
return nil
132+
}
133+
func (self *ConsulPlugin) DeregisterService(serviceInfo objdb.ServiceInfo) error {
134+
return nil
135+
}

plugins/plugins.go

+2
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,12 @@
11
package plugins
22

33
import (
4+
"github.com/contiv/objdb/plugins/consulClient"
45
"github.com/contiv/objdb/plugins/etcdClient"
56
)
67

78
func Init() {
89
// Initialize all conf store plugins
910
etcdClient.InitPlugin()
11+
consulClient.InitPlugin()
1012
}

0 commit comments

Comments
 (0)