Skip to content

Commit 781af7d

Browse files
author
Erik Hollensbe
committed
Initial import from objmodel
Signed-off-by: Erik Hollensbe <[email protected]>
0 parents  commit 781af7d

File tree

8 files changed

+1456
-0
lines changed

8 files changed

+1456
-0
lines changed

client/client.go

+26
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
package client
2+
3+
import (
4+
log "github.com/Sirupsen/logrus"
5+
"github.com/contiv/objdb"
6+
"github.com/contiv/objdb/plugins"
7+
)
8+
9+
// Create a new conf store
10+
func NewClient() objdb.ObjdbApi {
11+
defaultConfStore := "etcd"
12+
13+
// Init all plugins
14+
plugins.Init()
15+
16+
// Get the plugin
17+
plugin := objdb.GetPlugin(defaultConfStore)
18+
19+
// Initialize the objdb client
20+
if err := plugin.Init([]string{}); err != nil {
21+
log.Errorf("Error initializing confstore plugin. Err: %v", err)
22+
log.Fatal("Error initializing confstore plugin")
23+
}
24+
25+
return plugin
26+
}

client/client_test.go

+368
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,368 @@
1+
package client
2+
3+
import (
4+
"fmt"
5+
"os"
6+
"runtime"
7+
"strconv"
8+
"testing"
9+
"time"
10+
11+
"github.com/contiv/objdb"
12+
13+
log "github.com/Sirupsen/logrus"
14+
)
15+
16+
type JsonObj struct {
17+
Value string
18+
}
19+
20+
// New objdb client
21+
var client = NewClient()
22+
23+
func TestMain(m *testing.M) {
24+
runtime.GOMAXPROCS(runtime.NumCPU())
25+
os.Exit(m.Run())
26+
}
27+
28+
// Perform Set/Get operation on default conf store
29+
func TestSetGet(t *testing.T) {
30+
// Set
31+
setVal := JsonObj{
32+
Value: "test1",
33+
}
34+
35+
if err := client.SetObj("/contiv.io/test", setVal); err != nil {
36+
fmt.Printf("Fatal setting key. Err: %v\n", err)
37+
t.Fatalf("Fatal setting key")
38+
}
39+
40+
var retVal JsonObj
41+
42+
if err := client.GetObj("/contiv.io/test", &retVal); err != nil {
43+
fmt.Printf("Fatal getting key. Err: %v\n", err)
44+
t.Fatalf("Fatal getting key")
45+
}
46+
47+
if retVal.Value != "test1" {
48+
fmt.Printf("Got invalid response: %+v\n", retVal)
49+
t.Fatalf("Got invalid response")
50+
}
51+
52+
if err := client.DelObj("/contiv.io/test"); err != nil {
53+
t.Fatalf("Fatal deleting test object. Err: %v", err)
54+
}
55+
56+
fmt.Printf("Set/Get/Del test successful\n")
57+
}
58+
59+
func TestSetGetPerformance(t *testing.T) {
60+
// Set
61+
setVal := JsonObj{
62+
Value: "test1",
63+
}
64+
var retVal JsonObj
65+
66+
const testCount = 100
67+
68+
log.Infof("Performing %d write tests", testCount)
69+
70+
startTime := time.Now()
71+
72+
for i := 0; i < testCount; i++ {
73+
if err := client.SetObj("/contiv.io/test"+strconv.Itoa(i), setVal); err != nil {
74+
fmt.Printf("Fatal setting key. Err: %v\n", err)
75+
t.Fatalf("Fatal setting key")
76+
}
77+
}
78+
79+
timeTook := time.Since(startTime).Nanoseconds() / 1000000
80+
log.Infof("Write Test took %d milli seconds per write. %d ms total", timeTook/testCount, timeTook)
81+
82+
log.Infof("Performing %d read tests", testCount)
83+
84+
// Get test
85+
startTime = time.Now()
86+
87+
for i := 0; i < testCount; i++ {
88+
if err := client.GetObj("/contiv.io/test"+strconv.Itoa(i), &retVal); err != nil {
89+
fmt.Printf("Fatal getting key. Err: %v\n", err)
90+
t.Fatalf("Fatal getting key")
91+
}
92+
93+
if retVal.Value != "test1" {
94+
fmt.Printf("Got invalid response: %+v\n", retVal)
95+
t.Fatalf("Got invalid response")
96+
}
97+
}
98+
99+
timeTook = time.Since(startTime).Nanoseconds() / 1000000
100+
log.Infof("Read Test took %d milli seconds per read. %d ms total", timeTook/testCount, timeTook)
101+
102+
startTime = time.Now()
103+
104+
for i := 0; i < testCount; i++ {
105+
if err := client.DelObj("/contiv.io/test" + strconv.Itoa(i)); err != nil {
106+
t.Fatalf("Fatal deleting test object. Err: %v", err)
107+
}
108+
}
109+
110+
timeTook = time.Since(startTime).Nanoseconds() / 1000000
111+
log.Infof("Delete Test took %d milli seconds per delete. %d ms total", timeTook/testCount, timeTook)
112+
113+
fmt.Printf("Set/Get/Del test successful\n")
114+
}
115+
116+
func TestLockAcquireRelease(t *testing.T) {
117+
// Create a lock
118+
lock1, err := client.NewLock("master", "hostname1", 10)
119+
if err != nil {
120+
t.Fatal(err)
121+
}
122+
123+
lock2, err := client.NewLock("master", "hostname2", 10)
124+
if err != nil {
125+
t.Fatal(err)
126+
}
127+
128+
// Acquire the master lock
129+
if err := lock1.Acquire(0); err != nil {
130+
t.Fatalf("Fatal acquiring lock1")
131+
}
132+
133+
time.Sleep(100 * time.Millisecond)
134+
135+
// Try to acquire the same lock again. This should fail
136+
if err := lock2.Acquire(0); err != nil {
137+
t.Fatalf("Fatal acquiring lock2")
138+
}
139+
140+
cnt := 1
141+
for {
142+
select {
143+
case event := <-lock1.EventChan():
144+
fmt.Printf("Event on Lock1: %+v\n\n", event)
145+
if event.EventType == objdb.LockAcquired {
146+
fmt.Printf("Master lock acquired by Lock1\n")
147+
}
148+
case event := <-lock2.EventChan():
149+
fmt.Printf("Event on Lock2: %+v\n\n", event)
150+
if event.EventType == objdb.LockAcquired {
151+
fmt.Printf("Master lock acquired by Lock2\n")
152+
}
153+
case <-time.After(100 * time.Millisecond):
154+
if cnt == 1 {
155+
fmt.Printf("100 ms timer. releasing Lock1\n\n")
156+
// At this point, lock1 should be holding the lock
157+
if !lock1.IsAcquired() {
158+
t.Fatalf("Lock1 failed to acquire lock\n\n")
159+
}
160+
161+
// Release lock1 so that lock2 can acquire it
162+
lock1.Release()
163+
cnt++
164+
} else {
165+
fmt.Printf("200 ms timer. checking if lock2 is acquired\n\n")
166+
167+
// At this point, lock2 should be holding the lock
168+
if !lock2.IsAcquired() {
169+
t.Fatalf("Lock2 failed to acquire lock\n\n")
170+
}
171+
172+
fmt.Printf("Success. Lock2 Successfully acquired. releasing it\n")
173+
// we are done with the test
174+
lock2.Release()
175+
176+
return
177+
}
178+
}
179+
}
180+
}
181+
182+
func TestLockAcquireTimeout(t *testing.T) {
183+
fmt.Printf("\n\n\n =========================================================== \n\n\n")
184+
// Create a lock
185+
lock1, err := client.NewLock("master", "hostname1", 10)
186+
if err != nil {
187+
t.Fatal(err)
188+
}
189+
190+
lock2, err := client.NewLock("master", "hostname2", 10)
191+
if err != nil {
192+
t.Fatal(err)
193+
}
194+
195+
// Acquire the lock
196+
197+
if err := lock1.Acquire(0); err != nil {
198+
t.Fatalf("Fatal acquiring lock1")
199+
}
200+
201+
time.Sleep(100 * time.Millisecond)
202+
203+
if err := lock2.Acquire(2); err != nil {
204+
t.Fatalf("Fatal acquiring lock2")
205+
}
206+
207+
for {
208+
select {
209+
case event := <-lock1.EventChan():
210+
fmt.Printf("Event on Lock1: %+v\n\n", event)
211+
if event.EventType == objdb.LockAcquired {
212+
fmt.Printf("Master lock acquired by Lock1\n")
213+
}
214+
case event := <-lock2.EventChan():
215+
fmt.Printf("Event on Lock2: %+v\n\n", event)
216+
if event.EventType != objdb.LockAcquireTimeout {
217+
fmt.Printf("Invalid event on Lock2\n")
218+
} else {
219+
fmt.Printf("Lock2 timeout as expected")
220+
}
221+
case <-time.After(1 * time.Millisecond):
222+
fmt.Printf("1sec timer. releasing Lock1\n\n")
223+
// At this point, lock1 should be holding the lock
224+
if !lock1.IsAcquired() {
225+
t.Fatalf("Lock1 failed to acquire lock\n\n")
226+
}
227+
lock1.Release()
228+
229+
return
230+
}
231+
}
232+
}
233+
234+
func TestServiceRegister(t *testing.T) {
235+
// Service info
236+
service1Info := objdb.ServiceInfo{
237+
ServiceName: "athena",
238+
HostAddr: "10.10.10.10",
239+
Port: 4567,
240+
}
241+
service2Info := objdb.ServiceInfo{
242+
ServiceName: "athena",
243+
HostAddr: "10.10.10.10",
244+
Port: 4568,
245+
}
246+
247+
// register it
248+
if err := client.RegisterService(service1Info); err != nil {
249+
t.Fatalf("Fatal registering service. Err: %+v\n", err)
250+
}
251+
log.Infof("Registered service: %+v", service1Info)
252+
253+
if err := client.RegisterService(service2Info); err != nil {
254+
t.Fatalf("Fatal registering service. Err: %+v\n", err)
255+
}
256+
log.Infof("Registered service: %+v", service2Info)
257+
258+
resp, err := client.GetService("athena")
259+
if err != nil {
260+
t.Fatalf("Fatal getting service. Err: %+v\n", err)
261+
}
262+
263+
log.Infof("Got service list: %+v\n", resp)
264+
265+
if (len(resp) < 2) || (resp[0] != service1Info) || (resp[1] != service2Info) {
266+
t.Fatalf("Resp service list did not match input")
267+
}
268+
269+
// Wait a while to make sure background refresh is working correctly
270+
time.Sleep(5 * time.Millisecond)
271+
272+
resp, err = client.GetService("athena")
273+
if err != nil {
274+
t.Fatalf("Fatal getting service. Err: %+v\n", err)
275+
}
276+
277+
log.Infof("Got service list: %+v\n", resp)
278+
279+
if (len(resp) < 2) || (resp[0] != service1Info) || (resp[1] != service2Info) {
280+
t.Fatalf("Resp service list did not match input")
281+
}
282+
}
283+
284+
func TestServiceDeregister(t *testing.T) {
285+
// Service info
286+
service1Info := objdb.ServiceInfo{
287+
ServiceName: "athena",
288+
HostAddr: "10.10.10.10",
289+
Port: 4567,
290+
}
291+
service2Info := objdb.ServiceInfo{
292+
ServiceName: "athena",
293+
HostAddr: "10.10.10.10",
294+
Port: 4568,
295+
}
296+
297+
// register it
298+
if err := client.DeregisterService(service1Info); err != nil {
299+
t.Fatalf("Fatal deregistering service. Err: %+v\n", err)
300+
}
301+
302+
if err := client.DeregisterService(service2Info); err != nil {
303+
t.Fatalf("Fatal deregistering service. Err: %+v\n", err)
304+
}
305+
306+
time.Sleep(time.Millisecond * 1)
307+
}
308+
309+
func TestServiceWatch(t *testing.T) {
310+
service1Info := objdb.ServiceInfo{
311+
ServiceName: "athena",
312+
HostAddr: "10.10.10.10",
313+
Port: 4567,
314+
}
315+
316+
// register it
317+
318+
if err := client.RegisterService(service1Info); err != nil {
319+
t.Fatalf("Fatal registering service. Err: %+v\n", err)
320+
}
321+
log.Infof("Registered service: %+v", service1Info)
322+
323+
// Create event channel
324+
eventChan := make(chan objdb.WatchServiceEvent, 1)
325+
stopChan := make(chan bool, 1)
326+
327+
// Start watching for service
328+
329+
if err := client.WatchService("athena", eventChan, stopChan); err != nil {
330+
t.Fatalf("Fatal watching service. Err %v", err)
331+
}
332+
333+
cnt := 1
334+
for {
335+
select {
336+
case srvEvent := <-eventChan:
337+
log.Infof("\n----\nReceived event: %+v\n----", srvEvent)
338+
case <-time.After(time.Millisecond * time.Duration(10)):
339+
service2Info := objdb.ServiceInfo{
340+
ServiceName: "athena",
341+
HostAddr: "10.10.10.11",
342+
Port: 4567,
343+
}
344+
if cnt == 1 {
345+
// register it
346+
if err := client.RegisterService(service2Info); err != nil {
347+
t.Fatalf("Fatal registering service. Err: %+v\n", err)
348+
}
349+
log.Infof("Registered service: %+v", service2Info)
350+
} else if cnt == 5 {
351+
// deregister it
352+
if err := client.DeregisterService(service2Info); err != nil {
353+
t.Fatalf("Fatal deregistering service. Err: %+v\n", err)
354+
}
355+
log.Infof("Deregistered service: %+v", service2Info)
356+
} else if cnt == 7 {
357+
// Stop the watch
358+
stopChan <- true
359+
360+
// wait a little and exit
361+
time.Sleep(time.Millisecond)
362+
363+
return
364+
}
365+
cnt++
366+
}
367+
}
368+
}

0 commit comments

Comments
 (0)