Skip to content

Commit 02b763c

Browse files
committed
NSP Target Registry chained server
* New target registry server pattern allowing future feature * Current chain: watch handler -> store (sqlite + keepalive)
1 parent 7e375e6 commit 02b763c

File tree

7 files changed

+355
-67
lines changed

7 files changed

+355
-67
lines changed

cmd/nsp/main.go

+39-18
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,9 @@ import (
3232
"github.com/nordix/meridio/pkg/configuration/registry"
3333
"github.com/nordix/meridio/pkg/health"
3434
"github.com/nordix/meridio/pkg/nsp"
35+
nspRegistry "github.com/nordix/meridio/pkg/nsp/registry"
36+
"github.com/nordix/meridio/pkg/nsp/watchhandler"
37+
"github.com/pkg/errors"
3538

3639
keepAliveRegistry "github.com/nordix/meridio/pkg/nsp/registry/keepalive"
3740
sqliteRegistry "github.com/nordix/meridio/pkg/nsp/registry/sqlite"
@@ -97,31 +100,18 @@ func main() {
97100
ctx = health.CreateChecker(ctx)
98101

99102
// configuration
100-
configurationEventChan := make(chan *registry.ConfigurationEvent, 10)
101-
configurationRegistry := registry.New(configurationEventChan)
102-
configurationMonitor, err := monitor.New(config.ConfigMapName, config.Namespace, configurationRegistry)
103+
configurationManagerServer, err := CreateConfigurationManagerServer(ctx, &config)
103104
if err != nil {
104-
logrus.Fatalf("Unable to start configuration monitor: %v", err)
105+
logrus.Fatalf("CreateConfigurationManagerServer err: %v", err)
105106
}
106-
go configurationMonitor.Start(context.Background())
107-
watcherNotifier := manager.NewWatcherNotifier(configurationRegistry, configurationEventChan)
108-
go watcherNotifier.Start(context.Background())
109-
configurationManagerServer := manager.NewServer(watcherNotifier)
110107

111108
// target registry
112-
sqlr, err := sqliteRegistry.New(config.Datasource)
109+
targetRegistryServer, err := CreateTargetRegistryServer(ctx, &config)
113110
if err != nil {
114-
logrus.Fatalf("Unable create sqlite registry: %v", err)
111+
logrus.Fatalf("CreateTargetRegistryServer err: %v", err)
115112
}
116-
keepAliveRegistry, err := keepAliveRegistry.New(
117-
keepAliveRegistry.WithRegistry(sqlr),
118-
keepAliveRegistry.WithTimeout(config.EntryTimeout),
119-
)
120-
if err != nil {
121-
logrus.Fatalf("Unable create keepalive registry: %v", err)
122-
}
123-
targetRegistryServer := nsp.NewServer(keepAliveRegistry)
124113

114+
// Create Server
125115
server := grpc.NewServer(grpc.Creds(
126116
credentials.GetServer(context.Background()),
127117
))
@@ -141,3 +131,34 @@ func main() {
141131

142132
<-ctx.Done()
143133
}
134+
135+
func CreateTargetRegistryServer(ctx context.Context, config *Config) (nspAPI.TargetRegistryServer, error) {
136+
sqlr, err := sqliteRegistry.New(config.Datasource)
137+
if err != nil {
138+
return nil, errors.Wrap(err, "Unable create sqlite registry")
139+
}
140+
keepAliveRegistry, err := keepAliveRegistry.New(
141+
keepAliveRegistry.WithRegistry(sqlr),
142+
keepAliveRegistry.WithTimeout(config.EntryTimeout),
143+
)
144+
if err != nil {
145+
return nil, errors.Wrap(err, "Unable create keepalive registry")
146+
}
147+
return nsp.NewServer(
148+
nspRegistry.NewServer(keepAliveRegistry),
149+
watchhandler.NewServer(keepAliveRegistry),
150+
), nil
151+
}
152+
153+
func CreateConfigurationManagerServer(ctx context.Context, config *Config) (nspAPI.ConfigurationManagerServer, error) {
154+
configurationEventChan := make(chan *registry.ConfigurationEvent, 10)
155+
configurationRegistry := registry.New(configurationEventChan)
156+
configurationMonitor, err := monitor.New(config.ConfigMapName, config.Namespace, configurationRegistry)
157+
if err != nil {
158+
return nil, errors.Wrap(err, "Unable to start configuration monitor")
159+
}
160+
go configurationMonitor.Start(context.Background())
161+
watcherNotifier := manager.NewWatcherNotifier(configurationRegistry, configurationEventChan)
162+
go watcherNotifier.Start(context.Background())
163+
return manager.NewServer(watcherNotifier), nil
164+
}

pkg/nsp/next/builder.go

+33
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
/*
2+
Copyright (c) 2022 Nordix Foundation
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package next
18+
19+
// BuildNextTargetRegistryChain chains the target registry servers together.
20+
// Each NextTargetRegistryServer must have a non nil NextTargetRegistryServerImpl.
21+
// If the list of nextTargetRegistryServers is nil or empty, a nil value will be returned.
22+
func BuildNextTargetRegistryChain(nextTargetRegistryServers ...NextTargetRegistryServer) NextTargetRegistryServer {
23+
if len(nextTargetRegistryServers) <= 0 {
24+
return nil
25+
}
26+
for i, ntrs := range nextTargetRegistryServers {
27+
if i >= (len(nextTargetRegistryServers) - 1) {
28+
break
29+
}
30+
ntrs.setNext(nextTargetRegistryServers[i+1])
31+
}
32+
return nextTargetRegistryServers[0]
33+
}

pkg/nsp/next/builder_test.go

+83
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
/*
2+
Copyright (c) 2022 Nordix Foundation
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package next_test
18+
19+
import (
20+
"reflect"
21+
"testing"
22+
23+
"github.com/nordix/meridio/pkg/nsp/next"
24+
)
25+
26+
type fakeNextTargetRegistryServerImpl struct {
27+
*next.NextTargetRegistryServerImpl
28+
name string
29+
}
30+
31+
func TestBuildNextTargetRegistryChain(t *testing.T) {
32+
fakeA := &fakeNextTargetRegistryServerImpl{
33+
NextTargetRegistryServerImpl: &next.NextTargetRegistryServerImpl{},
34+
name: "a",
35+
}
36+
type args struct {
37+
nextTargetRegistryServers []next.NextTargetRegistryServer
38+
}
39+
tests := []struct {
40+
name string
41+
args args
42+
want next.NextTargetRegistryServer
43+
}{
44+
{
45+
name: "empty",
46+
args: args{},
47+
want: nil,
48+
},
49+
{
50+
name: "one",
51+
args: args{
52+
[]next.NextTargetRegistryServer{
53+
fakeA,
54+
},
55+
},
56+
want: fakeA,
57+
},
58+
{
59+
name: "multiple",
60+
args: args{
61+
[]next.NextTargetRegistryServer{
62+
fakeA,
63+
&fakeNextTargetRegistryServerImpl{
64+
NextTargetRegistryServerImpl: &next.NextTargetRegistryServerImpl{},
65+
name: "b",
66+
},
67+
&fakeNextTargetRegistryServerImpl{
68+
NextTargetRegistryServerImpl: &next.NextTargetRegistryServerImpl{},
69+
name: "c",
70+
},
71+
},
72+
},
73+
want: fakeA,
74+
},
75+
}
76+
for _, tt := range tests {
77+
t.Run(tt.name, func(t *testing.T) {
78+
if got := next.BuildNextTargetRegistryChain(tt.args.nextTargetRegistryServers...); !reflect.DeepEqual(got, tt.want) {
79+
t.Errorf("BuildNextTargetRegistryChain() = %v, want %v", got, tt.want)
80+
}
81+
})
82+
}
83+
}

pkg/nsp/next/next.go

+66
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
/*
2+
Copyright (c) 2021 Nordix Foundation
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package next
18+
19+
import (
20+
"context"
21+
22+
"github.com/golang/protobuf/ptypes/empty"
23+
nspAPI "github.com/nordix/meridio/api/nsp/v1"
24+
)
25+
26+
// NextTargetRegistryServer is the interface representing TargetRegistryServer with the
27+
// support of the chaining feature.
28+
type NextTargetRegistryServer interface {
29+
nspAPI.TargetRegistryServer
30+
setNext(NextTargetRegistryServer)
31+
}
32+
33+
type NextTargetRegistryServerImpl struct {
34+
next NextTargetRegistryServer
35+
}
36+
37+
// Register will call the Register function of the next chain element
38+
// If the next element is nil, then &empty.Empty{}, nil will be returned.
39+
func (ntrsi *NextTargetRegistryServerImpl) Register(ctx context.Context, target *nspAPI.Target) (*empty.Empty, error) {
40+
if ntrsi.next == nil {
41+
return &empty.Empty{}, nil
42+
}
43+
return ntrsi.next.Register(ctx, target)
44+
}
45+
46+
// Unregister will call the Unregister function of the next chain element
47+
// If the next element is nil, then &empty.Empty{}, nil will be returned.
48+
func (ntrsi *NextTargetRegistryServerImpl) Unregister(ctx context.Context, target *nspAPI.Target) (*empty.Empty, error) {
49+
if ntrsi.next == nil {
50+
return &empty.Empty{}, nil
51+
}
52+
return ntrsi.next.Unregister(ctx, target)
53+
}
54+
55+
// Watch will call the Watch function of the next chain element
56+
// If the next element is nil, then nil will be returned.
57+
func (ntrsi *NextTargetRegistryServerImpl) Watch(t *nspAPI.Target, watcher nspAPI.TargetRegistry_WatchServer) error {
58+
if ntrsi.next == nil {
59+
return nil
60+
}
61+
return ntrsi.next.Watch(t, watcher)
62+
}
63+
64+
func (ntrsi *NextTargetRegistryServerImpl) setNext(ntrs NextTargetRegistryServer) {
65+
ntrsi.next = ntrs
66+
}

pkg/nsp/registry/server.go

+59
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
/*
2+
Copyright (c) 2021 Nordix Foundation
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package registry
18+
19+
import (
20+
"context"
21+
22+
"github.com/golang/protobuf/ptypes/empty"
23+
nspAPI "github.com/nordix/meridio/api/nsp/v1"
24+
"github.com/nordix/meridio/pkg/nsp/next"
25+
"github.com/nordix/meridio/pkg/nsp/types"
26+
)
27+
28+
type registry struct {
29+
TargetRegistry types.TargetRegistry
30+
*next.NextTargetRegistryServerImpl
31+
}
32+
33+
// NewServer provides an implementation of TargetRegistryServer with the
34+
// support of the chaining feature. This implementation handles Register
35+
// and Unregister calls by adding or removing data into a storage (e.g
36+
// memory or sqlite)
37+
func NewServer(targetRegistry types.TargetRegistry) *registry {
38+
r := &registry{
39+
TargetRegistry: targetRegistry,
40+
NextTargetRegistryServerImpl: &next.NextTargetRegistryServerImpl{},
41+
}
42+
return r
43+
}
44+
45+
func (r *registry) Register(ctx context.Context, target *nspAPI.Target) (*empty.Empty, error) {
46+
err := r.TargetRegistry.Set(ctx, target)
47+
if err != nil {
48+
return &empty.Empty{}, err
49+
}
50+
return r.NextTargetRegistryServerImpl.Register(ctx, target)
51+
}
52+
53+
func (r *registry) Unregister(ctx context.Context, target *nspAPI.Target) (*empty.Empty, error) {
54+
err := r.TargetRegistry.Remove(ctx, target)
55+
if err != nil {
56+
return &empty.Empty{}, err
57+
}
58+
return r.NextTargetRegistryServerImpl.Unregister(ctx, target)
59+
}

pkg/nsp/server.go

+3-49
Original file line numberDiff line numberDiff line change
@@ -17,57 +17,11 @@ limitations under the License.
1717
package nsp
1818

1919
import (
20-
"context"
21-
22-
"github.com/golang/protobuf/ptypes/empty"
2320
nspAPI "github.com/nordix/meridio/api/nsp/v1"
24-
"github.com/nordix/meridio/pkg/nsp/types"
25-
"github.com/sirupsen/logrus"
21+
"github.com/nordix/meridio/pkg/nsp/next"
2622
)
2723

28-
type Server struct {
29-
TargetRegistry types.TargetRegistry
30-
}
31-
3224
// NewServer -
33-
func NewServer(targetRegistry types.TargetRegistry) nspAPI.TargetRegistryServer {
34-
networkServicePlateformService := &Server{
35-
TargetRegistry: targetRegistry,
36-
}
37-
38-
return networkServicePlateformService
39-
}
40-
41-
func (s *Server) Register(ctx context.Context, target *nspAPI.Target) (*empty.Empty, error) {
42-
return &empty.Empty{}, s.TargetRegistry.Set(ctx, target)
43-
}
44-
45-
func (s *Server) Unregister(ctx context.Context, target *nspAPI.Target) (*empty.Empty, error) {
46-
return &empty.Empty{}, s.TargetRegistry.Remove(ctx, target)
47-
}
48-
49-
func (s *Server) Watch(t *nspAPI.Target, watcher nspAPI.TargetRegistry_WatchServer) error {
50-
targetWatcher, err := s.TargetRegistry.Watch(context.TODO(), t)
51-
if err != nil {
52-
return err
53-
}
54-
s.watcher(watcher, targetWatcher.ResultChan())
55-
targetWatcher.Stop()
56-
return nil
57-
}
58-
59-
func (s *Server) watcher(watcher nspAPI.TargetRegistry_WatchServer, ch <-chan []*nspAPI.Target) {
60-
for {
61-
select {
62-
case event := <-ch:
63-
err := watcher.Send(&nspAPI.TargetResponse{
64-
Targets: event,
65-
})
66-
if err != nil {
67-
logrus.Errorf("err sending TrenchResponse: %v", err)
68-
}
69-
case <-watcher.Context().Done():
70-
return
71-
}
72-
}
25+
func NewServer(nextTargetRegistryServers ...next.NextTargetRegistryServer) nspAPI.TargetRegistryServer {
26+
return next.BuildNextTargetRegistryChain(nextTargetRegistryServers...)
7327
}

0 commit comments

Comments
 (0)