Skip to content

Commit 15e438b

Browse files
author
Joji Mekkatt
committed
Add service proxy for k8s
1 parent 0c152e0 commit 15e438b

File tree

7 files changed

+746
-4
lines changed

7 files changed

+746
-4
lines changed

core/core.go

+10
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,10 @@ limitations under the License.
2121
// hardware/kernel/device specific programming implementation, if any.
2222
package core
2323

24+
import (
25+
"github.com/contiv/ofnet"
26+
)
27+
2428
// Address is a string represenation of a network address (mac, ip, dns-name, url etc)
2529
type Address struct {
2630
addr string
@@ -101,6 +105,12 @@ type NetworkDriver interface {
101105
DeleteMaster(node ServiceInfo) error
102106
AddBgp(id string) error
103107
DeleteBgp(id string) error
108+
// Add a service spec to proxy
109+
AddSvcSpec(svcName string, spec *ofnet.ServiceSpec) error
110+
// Remove a service spec from proxy
111+
DelSvcSpec(svcName string, spec *ofnet.ServiceSpec)
112+
// Service Proxy Back End update
113+
SvcProviderUpdate(svcName string, providers []string)
104114
}
105115

106116
// WatchState is used to provide a difference between core.State structs by

drivers/ovsSwitch.go

+16
Original file line numberDiff line numberDiff line change
@@ -598,3 +598,19 @@ func (sw *OvsSwitch) DeleteBgp() error {
598598
}
599599
return nil
600600
}
601+
602+
// AddSvcSpec invokes ofnetAgent api
603+
func (sw *OvsSwitch) AddSvcSpec(svcName string, spec *ofnet.ServiceSpec) error {
604+
log.Infof("OvsSwitch AddSvcSpec %s", svcName)
605+
return sw.ofnetAgent.AddSvcSpec(svcName, spec)
606+
}
607+
608+
// DelSvcSpec invokes ofnetAgent api
609+
func (sw *OvsSwitch) DelSvcSpec(svcName string, spec *ofnet.ServiceSpec) {
610+
sw.ofnetAgent.DelSvcSpec(svcName, spec)
611+
}
612+
613+
// SvcProviderUpdate invokes ofnetAgent api
614+
func (sw *OvsSwitch) SvcProviderUpdate(svcName string, providers []string) {
615+
sw.ofnetAgent.SvcProviderUpdate(svcName, providers)
616+
}

drivers/ovsdriver.go

+35
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,13 @@ package drivers
1717

1818
import (
1919
"encoding/json"
20+
"errors"
2021
"fmt"
2122
"strconv"
2223
"strings"
2324

2425
log "github.com/Sirupsen/logrus"
26+
"github.com/contiv/ofnet"
2527
"github.com/vishvananda/netlink"
2628

2729
"github.com/contiv/netplugin/core"
@@ -461,3 +463,36 @@ func (d *OvsDriver) DeleteBgp(id string) error {
461463
return sw.DeleteBgp()
462464

463465
}
466+
467+
// AddSvcSpec invokes switch api
468+
func (d *OvsDriver) AddSvcSpec(svcName string, spec *ofnet.ServiceSpec) error {
469+
log.Infof("AddSvcSpec: %s", svcName)
470+
errs := ""
471+
for _, sw := range d.switchDb {
472+
log.Infof("sw AddSvcSpec: %s", svcName)
473+
err := sw.AddSvcSpec(svcName, spec)
474+
if err != nil {
475+
errs += err.Error()
476+
}
477+
}
478+
479+
if errs != "" {
480+
return errors.New(errs)
481+
}
482+
483+
return nil
484+
}
485+
486+
// DelSvcSpec invokes switch api
487+
func (d *OvsDriver) DelSvcSpec(svcName string, spec *ofnet.ServiceSpec) {
488+
for _, sw := range d.switchDb {
489+
sw.DelSvcSpec(svcName, spec)
490+
}
491+
}
492+
493+
// SvcProviderUpdate invokes switch api
494+
func (d *OvsDriver) SvcProviderUpdate(svcName string, providers []string) {
495+
for _, sw := range d.switchDb {
496+
sw.SvcProviderUpdate(svcName, providers)
497+
}
498+
}

mgmtfn/k8splugin/cniserver.go

+53
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,58 @@ func makeHTTPHandler(handlerFunc restAPIFunc) http.HandlerFunc {
103103
}
104104
}
105105

106+
// InitKubServiceWatch initializes the k8s service watch
107+
func InitKubServiceWatch(np *plugin.NetPlugin) {
108+
109+
watchClient := setUpAPIClient()
110+
if watchClient == nil {
111+
log.Fatalf("Could not init kubernetes API client")
112+
}
113+
114+
svcCh := make(chan SvcWatchResp, 1)
115+
epCh := make(chan EpWatchResp, 1)
116+
go func() {
117+
for {
118+
select {
119+
case svcEvent := <-svcCh:
120+
switch svcEvent.opcode {
121+
case "WARN":
122+
break
123+
case "FATAL":
124+
break
125+
case "ERROR":
126+
log.Errorf("svcWatch : %s", svcEvent.errStr)
127+
watchClient.WatchServices(svcCh)
128+
break
129+
130+
case "DELETED":
131+
np.NetworkDriver.DelSvcSpec(svcEvent.svcName, &svcEvent.svcSpec)
132+
break
133+
default:
134+
np.NetworkDriver.AddSvcSpec(svcEvent.svcName, &svcEvent.svcSpec)
135+
}
136+
case epEvent := <-epCh:
137+
switch epEvent.opcode {
138+
case "WARN":
139+
break
140+
case "FATAL":
141+
break
142+
case "ERROR":
143+
log.Errorf("epWatch : %s", epEvent.errStr)
144+
watchClient.WatchSvcEps(epCh)
145+
break
146+
147+
default:
148+
np.NetworkDriver.SvcProviderUpdate(epEvent.svcName, epEvent.providers)
149+
}
150+
}
151+
}
152+
}()
153+
154+
watchClient.WatchServices(svcCh)
155+
watchClient.WatchSvcEps(epCh)
156+
}
157+
106158
// InitCNIServer initializes the k8s cni server
107159
func InitCNIServer(netplugin *plugin.NetPlugin) error {
108160

@@ -146,6 +198,7 @@ func InitCNIServer(netplugin *plugin.NetPlugin) error {
146198
log.Infof("k8s plugin closing %s", driverPath)
147199
}()
148200

201+
//InitKubServiceWatch(netplugin)
149202
return nil
150203
}
151204

mgmtfn/k8splugin/kubeClient.go

+177-4
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,15 @@ limitations under the License.
1616
package k8splugin
1717

1818
import (
19+
"bufio"
1920
"crypto/tls"
2021
"crypto/x509"
2122
"encoding/json"
2223
"fmt"
2324
log "github.com/Sirupsen/logrus"
25+
"github.com/contiv/ofnet"
26+
"golang.org/x/net/context"
27+
"golang.org/x/net/context/ctxhttp"
2428
"io/ioutil"
2529
"net/http"
2630
)
@@ -29,11 +33,42 @@ const (
2933
nsURL = "/api/v1/namespaces/"
3034
)
3135

32-
// APIClient defines informatio needed for the k8s api client
36+
// APIClient defines information needed for the k8s api client
3337
type APIClient struct {
34-
baseURL string
35-
client *http.Client
36-
podCache podInfo
38+
baseURL string
39+
watchBase string
40+
client *http.Client
41+
podCache podInfo
42+
}
43+
44+
// SvcWatchResp is the response to a service watch
45+
type SvcWatchResp struct {
46+
opcode string
47+
errStr string
48+
svcName string
49+
svcSpec ofnet.ServiceSpec
50+
}
51+
52+
// EpWatchResp is the response to service endpoints watch
53+
type EpWatchResp struct {
54+
opcode string
55+
errStr string
56+
svcName string
57+
providers []string
58+
}
59+
60+
type watchSvcStatus struct {
61+
// The type of watch update contained in the message
62+
Type string `json:"type"`
63+
// Pod details
64+
Object Service `json:"object"`
65+
}
66+
67+
type watchSvcEpStatus struct {
68+
// The type of watch update contained in the message
69+
Type string `json:"type"`
70+
// Pod details
71+
Object Endpoints `json:"object"`
3772
}
3873

3974
type podInfo struct {
@@ -46,6 +81,7 @@ type podInfo struct {
4681
func NewAPIClient(serverURL, caFile, keyFile, certFile string) *APIClient {
4782
c := APIClient{}
4883
c.baseURL = serverURL + "/api/v1/namespaces/"
84+
c.watchBase = serverURL + "/api/v1/watch/"
4985

5086
// Read client cert
5187
cert, err := tls.LoadX509KeyPair(certFile, keyFile)
@@ -172,3 +208,140 @@ func (c *APIClient) GetPodLabel(ns, name, label string) (string, error) {
172208
log.Infof("label %s not found in podSpec for %s.%s", label, ns, name)
173209
return "", nil
174210
}
211+
212+
// WatchServices watches the services object on the api server
213+
func (c *APIClient) WatchServices(respCh chan SvcWatchResp) {
214+
ctx, _ := context.WithCancel(context.Background())
215+
216+
go func() {
217+
// Make request to Kubernetes API
218+
getURL := c.watchBase + "services"
219+
req, err := http.NewRequest("GET", getURL, nil)
220+
if err != nil {
221+
respCh <- SvcWatchResp{opcode: "FATAL", errStr: fmt.Sprintf("Req %v", err)}
222+
return
223+
}
224+
res, err := ctxhttp.Do(ctx, c.client, req)
225+
defer res.Body.Close()
226+
if err != nil {
227+
respCh <- SvcWatchResp{opcode: "FATAL", errStr: fmt.Sprintf("Do %v", err)}
228+
return
229+
}
230+
231+
var wss watchSvcStatus
232+
reader := bufio.NewReader(res.Body)
233+
234+
// bufio.Reader.ReadBytes is blocking, so we watch for
235+
// context timeout or cancellation in a goroutine
236+
// and close the response body when see see it. The
237+
// response body is also closed via defer when the
238+
// request is made, but closing twice is OK.
239+
go func() {
240+
<-ctx.Done()
241+
res.Body.Close()
242+
}()
243+
244+
for {
245+
line, err := reader.ReadBytes('\n')
246+
if ctx.Err() != nil {
247+
respCh <- SvcWatchResp{opcode: "ERROR", errStr: fmt.Sprintf("ctx %v", err)}
248+
return
249+
}
250+
if err != nil {
251+
respCh <- SvcWatchResp{opcode: "ERROR", errStr: fmt.Sprintf("read %v", err)}
252+
return
253+
}
254+
if err := json.Unmarshal(line, &wss); err != nil {
255+
respCh <- SvcWatchResp{opcode: "WARN", errStr: fmt.Sprintf("unmarshal %v", err)}
256+
continue
257+
}
258+
259+
//if wss.Object.ObjectMeta.Namespace != "default" {
260+
// continue
261+
//}
262+
resp := SvcWatchResp{opcode: wss.Type}
263+
resp.svcName = wss.Object.ObjectMeta.Name
264+
sSpec := ofnet.ServiceSpec{}
265+
sSpec.Ports = make([]ofnet.PortSpec, 0, 1)
266+
sSpec.IpAddress = wss.Object.Spec.ClusterIP
267+
for _, port := range wss.Object.Spec.Ports {
268+
ps := ofnet.PortSpec{Protocol: string(port.Protocol),
269+
SvcPort: uint16(port.Port),
270+
ProvPort: uint16(port.TargetPort),
271+
}
272+
sSpec.Ports = append(sSpec.Ports, ps)
273+
}
274+
275+
resp.svcSpec = sSpec
276+
log.Infof("resp: %+v", resp)
277+
278+
respCh <- resp
279+
}
280+
}()
281+
}
282+
283+
// WatchSvcEps watches the service endpoints object
284+
func (c *APIClient) WatchSvcEps(respCh chan EpWatchResp) {
285+
ctx, _ := context.WithCancel(context.Background())
286+
287+
go func() {
288+
// Make request to Kubernetes API
289+
getURL := c.watchBase + "endpoints"
290+
req, err := http.NewRequest("GET", getURL, nil)
291+
if err != nil {
292+
respCh <- EpWatchResp{opcode: "FATAL", errStr: fmt.Sprintf("Req %v", err)}
293+
return
294+
}
295+
res, err := ctxhttp.Do(ctx, c.client, req)
296+
defer res.Body.Close()
297+
if err != nil {
298+
respCh <- EpWatchResp{opcode: "FATAL", errStr: fmt.Sprintf("Do %v", err)}
299+
return
300+
}
301+
302+
var weps watchSvcEpStatus
303+
reader := bufio.NewReader(res.Body)
304+
305+
// bufio.Reader.ReadBytes is blocking, so we watch for
306+
// context timeout or cancellation in a goroutine
307+
// and close the response body when see see it. The
308+
// response body is also closed via defer when the
309+
// request is made, but closing twice is OK.
310+
go func() {
311+
<-ctx.Done()
312+
res.Body.Close()
313+
}()
314+
315+
for {
316+
line, err := reader.ReadBytes('\n')
317+
if ctx.Err() != nil {
318+
respCh <- EpWatchResp{opcode: "ERROR", errStr: fmt.Sprintf("ctx %v", err)}
319+
return
320+
}
321+
if err != nil {
322+
respCh <- EpWatchResp{opcode: "ERROR", errStr: fmt.Sprintf("read %v", err)}
323+
return
324+
}
325+
if err := json.Unmarshal(line, &weps); err != nil {
326+
respCh <- EpWatchResp{opcode: "WARN", errStr: fmt.Sprintf("unmarshal %v", err)}
327+
continue
328+
}
329+
//if weps.Object.ObjectMeta.Namespace != "default" {
330+
// continue
331+
//}
332+
333+
resp := EpWatchResp{opcode: weps.Type}
334+
resp.svcName = weps.Object.ObjectMeta.Name
335+
resp.providers = make([]string, 0, 1)
336+
for _, subset := range weps.Object.Subsets {
337+
// TODO: handle partially ready providers
338+
for _, addr := range subset.Addresses {
339+
resp.providers = append(resp.providers, addr.IP)
340+
}
341+
}
342+
343+
log.Infof("kube ep watch: %v", resp)
344+
respCh <- resp
345+
}
346+
}()
347+
}

0 commit comments

Comments
 (0)