Skip to content

Commit 969f404

Browse files
committed
Make objdb url configurable
1 parent b50a80b commit 969f404

9 files changed

+202
-72
lines changed

client.go

+28-9
Original file line numberDiff line numberDiff line change
@@ -1,23 +1,42 @@
11
package objdb
22

3-
import log "github.com/Sirupsen/logrus"
3+
import (
4+
"errors"
5+
"strings"
46

5-
var defaultConfStore = "etcd"
7+
log "github.com/Sirupsen/logrus"
8+
)
9+
10+
var defaultDbURL = "etcd://127.0.0.1:2379"
611

712
// NewClient Create a new conf store
8-
func NewClient(clientName string) API {
9-
if clientName == "" {
10-
clientName = defaultConfStore
13+
func NewClient(dbURL string) (API, error) {
14+
// check if we should use default db
15+
if dbURL == "" {
16+
dbURL = defaultDbURL
17+
}
18+
19+
parts := strings.Split(dbURL, "://")
20+
if len(parts) < 2 {
21+
log.Errorf("Invalid DB URL format %s", dbURL)
22+
return nil, errors.New("Invalid DB URL")
1123
}
24+
clientName := parts[0]
25+
clientURL := parts[1]
1226

1327
// Get the plugin
1428
plugin := GetPlugin(clientName)
29+
if plugin == nil {
30+
log.Errorf("Invalid DB type %s", clientName)
31+
return nil, errors.New("Unsupported DB type")
32+
}
1533

1634
// Initialize the objdb client
17-
if err := plugin.Init([]string{}); err != nil {
18-
log.Errorf("Error initializing confstore plugin. Err: %v", err)
19-
log.Fatal("Error initializing confstore plugin")
35+
cl, err := plugin.NewClient([]string{"http://" + clientURL})
36+
if err != nil {
37+
log.Errorf("Error creating client %s to url %s. Err: %v", clientName, clientURL, err)
38+
return nil, err
2039
}
2140

22-
return plugin
41+
return cl, nil
2342
}

client_test.go

+48-3
Original file line numberDiff line numberDiff line change
@@ -20,15 +20,61 @@ var etcdClient API
2020
var consulClient API
2121

2222
func TestMain(m *testing.M) {
23+
var err error
2324
runtime.GOMAXPROCS(runtime.NumCPU())
2425

2526
// Init clients
26-
etcdClient = NewClient("")
27-
consulClient = NewClient("consul")
27+
etcdClient, err = NewClient("")
28+
if err != nil {
29+
log.Fatalf("Error creating etcd client. Err: %v", err)
30+
}
31+
32+
consulClient, err = NewClient("consul://localhost:8500")
33+
if err != nil {
34+
log.Fatalf("Error creating consul client. Err: %v", err)
35+
}
2836

2937
os.Exit(m.Run())
3038
}
3139

40+
// Verify only valid DB urls are accepted
41+
func TestDbUrl(t *testing.T) {
42+
_, err := NewClient("invalid")
43+
if err == nil {
44+
t.Fatalf("Invalid URL accepted")
45+
}
46+
47+
_, err = NewClient("invalid://localhost:2379")
48+
if err == nil {
49+
t.Fatalf("Invalid URL accepted")
50+
}
51+
52+
_, err = NewClient("etcd:/localhost:2379")
53+
if err == nil {
54+
t.Fatalf("Invalid URL accepted")
55+
}
56+
57+
_, err = NewClient("etcd://localhost")
58+
if err == nil {
59+
t.Fatalf("Invalid URL accepted")
60+
}
61+
62+
_, err = NewClient("etcd://localhost:5000")
63+
if err == nil {
64+
t.Fatalf("Invalid URL accepted")
65+
}
66+
67+
_, err = NewClient("consul://localhost")
68+
if err == nil {
69+
t.Fatalf("Invalid URL accepted")
70+
}
71+
72+
_, err = NewClient("consul://localhost:5000")
73+
if err == nil {
74+
t.Fatalf("Invalid URL accepted")
75+
}
76+
}
77+
3278
// Perform Set/Get operation on default conf store
3379
func TestSetGet(t *testing.T) {
3480
// Set
@@ -422,7 +468,6 @@ func TestServiceWatch(t *testing.T) {
422468
stopChan := make(chan bool, 1)
423469

424470
// Start watching for service
425-
426471
if err := etcdClient.WatchService("athena", eventChan, stopChan); err != nil {
427472
t.Fatalf("Fatal watching service. Err %v", err)
428473
}

consulClient.go

+32-18
Original file line numberDiff line numberDiff line change
@@ -13,10 +13,12 @@ import (
1313

1414
// consulPlugin contains consul plugin specific state
1515
type consulPlugin struct {
16+
mutex *sync.Mutex
17+
}
18+
19+
type ConsulClient struct {
1620
client *api.Client // consul client
1721
consulConfig api.Config
18-
19-
mutex *sync.Mutex
2022
}
2123

2224
// init Register the plugin
@@ -25,20 +27,32 @@ func init() {
2527
}
2628

2729
// Init initializes the consul client
28-
func (cp *consulPlugin) Init(machines []string) error {
30+
func (cp *consulPlugin) NewClient(endpoints []string) (API, error) {
31+
cc := new(ConsulClient)
32+
33+
if len(endpoints) == 0 {
34+
endpoints = []string{"127.0.0.1:8500"}
35+
}
2936
// default consul config
30-
cp.consulConfig = api.Config{Address: "127.0.0.1:8500"}
37+
cc.consulConfig = api.Config{Address: strings.TrimPrefix(endpoints[0], "http://")}
3138

3239
// Init consul client
33-
client, err := api.NewClient(&cp.consulConfig)
40+
client, err := api.NewClient(&cc.consulConfig)
3441
if err != nil {
3542
log.Fatalf("Error initializing consul client")
36-
return err
43+
return nil, err
3744
}
3845

39-
cp.client = client
46+
cc.client = client
4047

41-
return nil
48+
// verify we can reach the consul
49+
_, _, err = client.KV().List("/", nil)
50+
if err != nil {
51+
log.Errorf("Error connecting to consul. Err: %v", err)
52+
return nil, err
53+
}
54+
55+
return cc, nil
4256
}
4357

4458
func processKey(inKey string) string {
@@ -47,7 +61,7 @@ func processKey(inKey string) string {
4761
}
4862

4963
// GetObj reads the object
50-
func (cp *consulPlugin) GetObj(key string, retVal interface{}) error {
64+
func (cp *ConsulClient) GetObj(key string, retVal interface{}) error {
5165
key = processKey("/contiv.io/obj/" + processKey(key))
5266

5367
resp, _, err := cp.client.KV().Get(key, &api.QueryOptions{RequireConsistent: true})
@@ -70,7 +84,7 @@ func (cp *consulPlugin) GetObj(key string, retVal interface{}) error {
7084
}
7185

7286
// ListDir returns a list of keys in a directory
73-
func (cp *consulPlugin) ListDir(key string) ([]string, error) {
87+
func (cp *ConsulClient) ListDir(key string) ([]string, error) {
7488
key = processKey("/contiv.io/obj/" + processKey(key))
7589

7690
kvs, _, err := cp.client.KV().List(key, nil)
@@ -92,7 +106,7 @@ func (cp *consulPlugin) ListDir(key string) ([]string, error) {
92106
}
93107

94108
// SetObj writes an object
95-
func (cp *consulPlugin) SetObj(key string, value interface{}) error {
109+
func (cp *ConsulClient) SetObj(key string, value interface{}) error {
96110
key = processKey("/contiv.io/obj/" + processKey(key))
97111

98112
// JSON format the object
@@ -108,38 +122,38 @@ func (cp *consulPlugin) SetObj(key string, value interface{}) error {
108122
}
109123

110124
// DelObj deletes an object
111-
func (cp *consulPlugin) DelObj(key string) error {
125+
func (cp *ConsulClient) DelObj(key string) error {
112126
key = processKey("/contiv.io/obj/" + processKey(key))
113127
_, err := cp.client.KV().Delete(key, nil)
114128
return err
115129
}
116130

117131
// GetLocalAddr gets local address of the host
118-
func (cp *consulPlugin) GetLocalAddr() (string, error) {
132+
func (cp *ConsulClient) GetLocalAddr() (string, error) {
119133
return "", nil
120134
}
121135

122136
// NewLock returns a new lock instance
123-
func (cp *consulPlugin) NewLock(name string, myID string, ttl uint64) (LockInterface, error) {
137+
func (cp *ConsulClient) NewLock(name string, myID string, ttl uint64) (LockInterface, error) {
124138
return nil, nil
125139
}
126140

127141
// RegisterService registers a service
128-
func (cp *consulPlugin) RegisterService(serviceInfo ServiceInfo) error {
142+
func (cp *ConsulClient) RegisterService(serviceInfo ServiceInfo) error {
129143
return nil
130144
}
131145

132146
// GetService gets all instances of a service
133-
func (cp *consulPlugin) GetService(name string) ([]ServiceInfo, error) {
147+
func (cp *ConsulClient) GetService(name string) ([]ServiceInfo, error) {
134148
return nil, nil
135149
}
136150

137151
// WatchService watches for service instance changes
138-
func (cp *consulPlugin) WatchService(name string, eventCh chan WatchServiceEvent, stopCh chan bool) error {
152+
func (cp *ConsulClient) WatchService(name string, eventCh chan WatchServiceEvent, stopCh chan bool) error {
139153
return nil
140154
}
141155

142156
// DeregisterService deregisters a service instance
143-
func (cp *consulPlugin) DeregisterService(serviceInfo ServiceInfo) error {
157+
func (cp *ConsulClient) DeregisterService(serviceInfo ServiceInfo) error {
144158
return nil
145159
}

etcdClient.go

+32-15
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,14 @@ import (
1515
)
1616

1717
type etcdPlugin struct {
18+
mutex *sync.Mutex
19+
}
20+
21+
type EtcdClient struct {
1822
client client.Client // etcd client
1923
kapi client.KeysAPI
2024

2125
serviceDb map[string]*serviceState
22-
mutex *sync.Mutex
2326
}
2427

2528
type member struct {
@@ -37,8 +40,9 @@ func init() {
3740
}
3841

3942
// Initialize the etcd client
40-
func (ep *etcdPlugin) Init(endpoints []string) error {
43+
func (ep *etcdPlugin) NewClient(endpoints []string) (API, error) {
4144
var err error
45+
var ec = new(EtcdClient)
4246

4347
ep.mutex.Lock()
4448
defer ep.mutex.Unlock()
@@ -53,27 +57,34 @@ func (ep *etcdPlugin) Init(endpoints []string) error {
5357
}
5458

5559
// Create a new client
56-
ep.client, err = client.New(etcdConfig)
60+
ec.client, err = client.New(etcdConfig)
5761
if err != nil {
58-
log.Fatal("Error creating etcd client. Err: %v", err)
59-
return err
62+
log.Fatalf("Error creating etcd client. Err: %v", err)
63+
return nil, err
6064
}
6165

6266
// create keys api
63-
ep.kapi = client.NewKeysAPI(ep.client)
67+
ec.kapi = client.NewKeysAPI(ec.client)
6468

6569
// Initialize service DB
66-
ep.serviceDb = make(map[string]*serviceState)
70+
ec.serviceDb = make(map[string]*serviceState)
6771

68-
return nil
72+
// Make sure we can read from etcd
73+
_, err = ec.kapi.Get(context.Background(), "/", &client.GetOptions{Recursive: true, Sort: true})
74+
if err != nil {
75+
log.Errorf("Failed to connect to etcd. Err: %v", err)
76+
return nil, err
77+
}
78+
79+
return ec, nil
6980
}
7081

7182
// Get an object
72-
func (ep *etcdPlugin) GetObj(key string, retVal interface{}) error {
83+
func (ep *EtcdClient) GetObj(key string, retVal interface{}) error {
7384
keyName := "/contiv.io/obj/" + key
7485

7586
// Get the object from etcd client
76-
resp, err := ep.kapi.Get(context.Background(), keyName, nil)
87+
resp, err := ep.kapi.Get(context.Background(), keyName, &client.GetOptions{Quorum: true})
7788
if err != nil {
7889
log.Errorf("Error getting key %s. Err: %v", keyName, err)
7990
return err
@@ -103,11 +114,17 @@ func recursAddNode(node *client.Node, list []string) []string {
103114
}
104115

105116
// Get a list of objects in a directory
106-
func (ep *etcdPlugin) ListDir(key string) ([]string, error) {
117+
func (ep *EtcdClient) ListDir(key string) ([]string, error) {
107118
keyName := "/contiv.io/obj/" + key
108119

120+
getOpts := client.GetOptions{
121+
Recursive: true,
122+
Sort: true,
123+
Quorum: true,
124+
}
125+
109126
// Get the object from etcd client
110-
resp, err := ep.kapi.Get(context.Background(), keyName, &client.GetOptions{Recursive: true, Sort: true})
127+
resp, err := ep.kapi.Get(context.Background(), keyName, &getOpts)
111128
if err != nil {
112129
return nil, nil
113130
}
@@ -128,7 +145,7 @@ func (ep *etcdPlugin) ListDir(key string) ([]string, error) {
128145
}
129146

130147
// Save an object, create if it doesnt exist
131-
func (ep *etcdPlugin) SetObj(key string, value interface{}) error {
148+
func (ep *EtcdClient) SetObj(key string, value interface{}) error {
132149
keyName := "/contiv.io/obj/" + key
133150

134151
// JSON format the object
@@ -148,7 +165,7 @@ func (ep *etcdPlugin) SetObj(key string, value interface{}) error {
148165
}
149166

150167
// Remove an object
151-
func (ep *etcdPlugin) DelObj(key string) error {
168+
func (ep *EtcdClient) DelObj(key string) error {
152169
keyName := "/contiv.io/obj/" + key
153170

154171
// Remove it via etcd client
@@ -184,7 +201,7 @@ func httpGetJSON(url string, data interface{}) (interface{}, error) {
184201
}
185202

186203
// Return the local address where etcd is listening
187-
func (ep *etcdPlugin) GetLocalAddr() (string, error) {
204+
func (ep *EtcdClient) GetLocalAddr() (string, error) {
188205
var epData struct {
189206
Name string `json:"name"`
190207
}

etcdLock.go

+4-4
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,8 @@ type Lock struct {
2828
mutex *sync.Mutex
2929
}
3030

31-
// Create a new lock
32-
func (ep *etcdPlugin) NewLock(name string, myID string, ttl uint64) (LockInterface, error) {
31+
// NewLock Create a new lock
32+
func (ep *EtcdClient) NewLock(name string, myID string, ttl uint64) (LockInterface, error) {
3333
watchCtx, watchCancel := context.WithCancel(context.Background())
3434
// Create a lock
3535
return &Lock{
@@ -157,7 +157,7 @@ func (lk *Lock) acquireLock() {
157157
// Try to acquire the lock
158158
resp, err := lk.kapi.Set(context.Background(), keyName, lk.myID, &client.SetOptions{PrevExist: client.PrevNoExist, TTL: lk.ttl})
159159
if err != nil {
160-
if err.(*client.Error).Code != client.ErrorCodeNodeExist {
160+
if err.(client.Error).Code != client.ErrorCodeNodeExist {
161161
log.Errorf("Error creating key %s. Err: %v", keyName, err)
162162
} else {
163163
log.Infof("Lock %s acquired by someone else", keyName)
@@ -342,7 +342,7 @@ func (lk *Lock) watchLock() {
342342
} else if err != nil {
343343
log.Errorf("Error watching the key %s, Err %v.", keyName, err)
344344
} else {
345-
log.Infof("Got Watch Resp: %+v", resp)
345+
log.Debugf("Got Watch Resp: %+v", resp)
346346

347347
// send the event to watch channel
348348
lk.watchCh <- resp

0 commit comments

Comments
 (0)