Skip to content

Commit 9bcf605

Browse files
author
Emmanuel T Odeke
authored
interceptor: add zipkin /v2 interceptor (#111)
* interceptor: add zipkin interceptor * Zipkin interceptor multiplexing + running on 9411 * Can receive spans from diverse sources and successfully intercept traces from various serviceNames and IPv* combinations. Verified by examining the data on the Zipkin UI itself by resending to a server running on a different port. * End-to-end serialization tests where we intercept JSON from a wild Zipkin reporter, transform it into trace-proto then to OpenCensus-Go SpanData and then export it with the OpenCensus-Go Zipkin exporter, inspect the JSON and compare it with the final/expected JSON data. Fixes #24 * interceptor/zipkin: add remoteEndpoint to Node.Attributes Zipkin's RemoteEndpoint field is necessary when later on building the dependency graph for Zipkin services. This change ensures that that field is added as an attribute to the Node with keys whose prefix is: "zipkin.remoteEndpoint." e.g. - "zipkin.remoteEndpoint.ipv4" - "zipkin.remoteEndpoint.serviceName" - "zipkin.remoteEndpoint.port" * Fail if Zipkin interceptor and exporter run on same address Fail spectacularly if the Zipkin interceptor and the Zipkin exporter will all be run on the same address. This is because this is a self DOS vector as intercepted spans will be processed by the interceptor, then exported out by the exporter but instead those exported spans will then be sent back to the interceptor and this goes on indefinitely and will just consume memory and resources. To perform this check: a) Compare HostPorts of interceptor vs exporter b) If say "127.0.0.1", "localhost", "" for host and the ports match c) Resolve the IPs for the hosts if any of those steps match return a logical conflict error which will be thrown later. The crash will look something like this ```shell 2018/10/23 18:54:41 Configuration logical error: ZipkinInterceptor address ("127.0.0.1:9411") aka (127.0.0.1 on port 9411) is the same as the interceptor address ("localhost:9411") aka (127.0.0.1 on port 9411) ``` We can now detect any of these clashing configurations: ```yaml interceptors: zipkin: address: "127.0.0.1:9411" exporters: zipkin: endpoint: "http://localhost:9411/api/v2/spans" ``` ```yaml interceptors: zipkin: address: ":9411" exporters: zipkin: endpoint: "http://localhost:9411/api/v2/spans" ``` ```yaml interceptors: zipkin: address: "localhost:9411" exporters: zipkin: endpoint: "http://localhost:9411/api/v2/spans" ``` ```yaml interceptors: zipkin: address: "localhost:9411" exporters: zipkin: endpoint: "http://:9411/api/v2/spans" ``` ```yaml interceptors: zipkin: address: "localhost:9411" exporters: zipkin: endpoint: "http://127.0.0.1:9411/api/v2/spans" ``` and in a case where IP resolution helped, "10.0.0.147" is my local address and I sneakishly tried to get the interceptor on "localhost" which clashes with my local address as per: ```yaml interceptors: zipkin: address: "localhost:9411" exporters: zipkin: endpoint: "http://10.0.0.147:9411/api/v2/spans" ``` we caught it ```shell 2018/10/23 19:01:01 Configuration logical error: ZipkinInterceptor address ("10.0.0.147:9411") aka (10.0.0.147 on port 9411) is the same as the interceptor address ("localhost:9411") aka (10.0.0.147 on port 9411) ``` * Go fmt with Go1.10 to match TravisCI Go1.11's gofmt seems to be more conservating with aligning literal assignments while Go1.10 seems more extravagant. This subtle difference was causing TravisCI failures since TravisCI uses Go1.10. * interceptor/zipkin: handle compressed HTTP bodies Some clients such as Zipkin-Java send HTTP bodies with JSON that's been compressed as "gzip". This change handles "Content-Encoding" values of: * "gzip" * "deflate", "zlib" and decompresses them accordingly * interceptor/zipkin: tests use anagram signature A test that checked for raffled output before used an "xorChecksum" but that's not really always unique. The better way is to use an anagram counter, then create a signature from the serialized map of counted runes. * interceptor/zipkin: address review feedback * Package rename to "zipkinterceptor" --> "zipkininterceptor" * Fix error names * Address feedback from review * move "zipkinRoute" const to the top to make it easily discoverable and not be buried deep in code * set status of zipkin interceptor if we encounter an error while parsing spans. We set the span status to: StatusInvalidArgument and the error message * *zipkin: address feedback from review * Add a comment ot exporter/exporterparse.zipkinExporter that the mutex "mu" protects all the constituent fields * Fix and ensure that node uniqueness is clear, with the proper check from a map lookup but also rename the variable to store nodes to "uniqueNodes" to make for better code readability * README.md + default config.yaml: update interceptors docs and information Added a section in the README.md on the various interceptors and how to change the Zipkin interceptor address * sync.Mutex instead of sync.RWMutex for zipkinExporter.mu
1 parent f366180 commit 9bcf605

File tree

10 files changed

+1411
-37
lines changed

10 files changed

+1411
-37
lines changed

README.md

+28-4
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@
2222
- [Configuration file](#agent-configuration-file)
2323
- [Exporters](#agent-config-exporters)
2424
- [Interceptors](#agent-config-interceptors)
25+
- [OpenCensus](#details-interceptors-opencensus)
26+
- [Zipkin](#details-interceptors-zipkin)
2527
- [End-to-end example](#agent-config-end-to-end-example)
2628
- [Docker image](#agent-docker-image)
2729
- [OpenCensus Collector](#opencensus-collector)
@@ -271,14 +273,36 @@ exporters:
271273
```
272274

273275
#### <a name="agent-config-interceptors"></a>Interceptors
276+
As previously mentioned, the agent provides a couple of interceptors
277+
278+
#### <a name="details-interceptors-opencensus"> OpenCensus
279+
280+
This interceptor receives spans from OpenCensus instrumented applications and translates them into the internal span types that
281+
are then sent to the collector/exporters.
282+
283+
Its address can be configured in the YAML configuration file under section "intercpetors", subsection "opencensus" and field "address".
274284

275-
To modify the address that the OpenCensus interceptor runs on, please use the
276-
YAML field name `opencensus_interceptor` and it takes fields like `address`.
277285
For example:
286+
```yaml
287+
interceptors:
288+
opencensus:
289+
address: "127.0.0.1:55678"
290+
```
291+
292+
By default this interceptor is ALWAYS started since it is the point of the "OpenCensus agent"
278293

294+
#### <a name="details-interceptors-zipkin"> Zipkin
295+
296+
This interceptor receives spans from Zipkin "/v2" API HTTP uploads and translates them into the internal span types that are then
297+
sent to the collector/exporters.
298+
299+
Its address can be configured in the YAML configuration file under section "intercpetors", subsection "zipkin" and field "address".
300+
301+
For example:
279302
```yaml
280-
opencensus_interceptor:
281-
address: "localhost:55678"
303+
interceptors:
304+
zipkin:
305+
address: "localhost:9411"
282306
```
283307

284308
### <a name="agent-config-end-to-end-example"></a>Running an end-to-end example/demo

cmd/ocagent/config.go

+94-4
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,12 @@
1515
package main
1616

1717
import (
18+
"fmt"
1819
"log"
20+
"net"
21+
"net/url"
22+
"reflect"
23+
"strings"
1924

2025
yaml "gopkg.in/yaml.v2"
2126

@@ -52,7 +57,8 @@ type config struct {
5257
}
5358

5459
type interceptors struct {
55-
OpenCensusInterceptor *interceptorConfig `yaml:"opencensus"`
60+
OpenCensus *interceptorConfig `yaml:"opencensus"`
61+
Zipkin *interceptorConfig `yaml:"zipkin"`
5662
}
5763

5864
type interceptorConfig struct {
@@ -70,10 +76,10 @@ func (c *config) ocInterceptorAddress() string {
7076
return defaultOCInterceptorAddress
7177
}
7278
inCfg := c.Interceptors
73-
if inCfg.OpenCensusInterceptor == nil || inCfg.OpenCensusInterceptor.Address == "" {
79+
if inCfg.OpenCensus == nil || inCfg.OpenCensus.Address == "" {
7480
return defaultOCInterceptorAddress
7581
}
76-
return inCfg.OpenCensusInterceptor.Address
82+
return inCfg.OpenCensus.Address
7783
}
7884

7985
func (c *config) zPagesDisabled() bool {
@@ -94,6 +100,24 @@ func (c *config) zPagesPort() (int, bool) {
94100
return port, true
95101
}
96102

103+
func (c *config) zipkinInterceptorEnabled() bool {
104+
if c == nil {
105+
return false
106+
}
107+
return c.Interceptors != nil && c.Interceptors.Zipkin != nil
108+
}
109+
110+
func (c *config) zipkinInterceptorAddress() string {
111+
if c == nil || c.Interceptors == nil {
112+
return exporterparser.DefaultZipkinEndpointHostPort
113+
}
114+
inCfg := c.Interceptors
115+
if inCfg.Zipkin == nil || inCfg.Zipkin.Address == "" {
116+
return exporterparser.DefaultZipkinEndpointHostPort
117+
}
118+
return inCfg.Zipkin.Address
119+
}
120+
97121
func parseOCAgentConfig(yamlBlob []byte) (*config, error) {
98122
var cfg config
99123
if err := yaml.Unmarshal(yamlBlob, &cfg); err != nil {
@@ -102,7 +126,72 @@ func parseOCAgentConfig(yamlBlob []byte) (*config, error) {
102126
return &cfg, nil
103127
}
104128

105-
type exporterParser func(yamlConfig []byte) (te []exporter.TraceExporter, err error)
129+
// The goal of this function is to catch logical errors such as
130+
// if the Zipkin interceptor port conflicts with that of the exporter
131+
// lest we'll have a self DOS because spans will be exported "out" from
132+
// the exporter, yet be received from the interceptor, then sent back out
133+
// and back in a never ending loop.
134+
func (c *config) checkLogicalConflicts(blob []byte) error {
135+
var cfg struct {
136+
Exporters *struct {
137+
Zipkin *exporterparser.ZipkinConfig `yaml:"zipkin"`
138+
} `yaml:"exporters"`
139+
}
140+
if err := yaml.Unmarshal(blob, &cfg); err != nil {
141+
return err
142+
}
143+
144+
if cfg.Exporters == nil || cfg.Exporters.Zipkin == nil {
145+
return nil
146+
}
147+
148+
zc := cfg.Exporters.Zipkin
149+
150+
zExporterAddr := zc.EndpointURL()
151+
zExporterURL, err := url.Parse(zExporterAddr)
152+
if err != nil {
153+
return fmt.Errorf("parsing ZipkinExporter address %q got error: %v", zExporterAddr, err)
154+
}
155+
156+
zInterceptorHostPort := c.zipkinInterceptorAddress()
157+
158+
zExporterHostPort := zExporterURL.Host
159+
if zInterceptorHostPort == zExporterHostPort {
160+
return fmt.Errorf("ZipkinExporter address (%q) is the same as the interceptor address (%q)",
161+
zExporterHostPort, zInterceptorHostPort)
162+
}
163+
zExpHost, zExpPort, _ := net.SplitHostPort(zExporterHostPort)
164+
zInterceptorHost, zInterceptorPort, _ := net.SplitHostPort(zExporterHostPort)
165+
if eqHosts(zExpHost, zInterceptorHost) && zExpPort == zInterceptorPort {
166+
return fmt.Errorf("ZipkinExporter address (%q) aka (%s on port %s)\nis the same as the interceptor address (%q) aka (%s on port %s)",
167+
zExporterHostPort, zExpHost, zExpPort, zInterceptorHostPort, zInterceptorHost, zInterceptorPort)
168+
}
169+
170+
// Otherwise, now let's resolve the IPs and ensure that they aren't the same
171+
zExpIPAddr, _ := net.ResolveIPAddr("ip", zExpHost)
172+
zInterceptorIPAddr, _ := net.ResolveIPAddr("ip", zInterceptorHost)
173+
if zExpIPAddr != nil && zInterceptorIPAddr != nil && reflect.DeepEqual(zExpIPAddr, zInterceptorIPAddr) {
174+
return fmt.Errorf("ZipkinExporter address (%q) aka (%+v)\nis the same as the\ninterceptor address (%q) aka (%+v)",
175+
zExporterHostPort, zExpIPAddr, zInterceptorHostPort, zInterceptorIPAddr)
176+
}
177+
return nil
178+
}
179+
180+
func eqHosts(host1, host2 string) bool {
181+
if host1 == host2 {
182+
return true
183+
}
184+
return eqLocalHost(host1) && eqLocalHost(host2)
185+
}
186+
187+
func eqLocalHost(host string) bool {
188+
switch strings.ToLower(host) {
189+
case "", "localhost", "127.0.0.1":
190+
return true
191+
default:
192+
return false
193+
}
194+
}
106195

107196
// exportersFromYAMLConfig parses the config yaml payload and returns the respective exporters
108197
func exportersFromYAMLConfig(config []byte) (traceExporters []exporter.TraceExporter, doneFns []func() error) {
@@ -128,6 +217,7 @@ func exportersFromYAMLConfig(config []byte) (traceExporters []exporter.TraceExpo
128217
nonNilExporters += 1
129218
}
130219
}
220+
131221
if nonNilExporters > 0 {
132222
pluralization := "exporter"
133223
if nonNilExporters > 1 {

cmd/ocagent/config.yaml

+3
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,9 @@ interceptors:
22
opencensus:
33
address: "127.0.0.1:55678"
44

5+
zipkin:
6+
address: "localhost:9411"
7+
58
exporters:
69
stackdriver:
710
project: "project-id"

cmd/ocagent/main.go

+45-1
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ import (
2929
agenttracepb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/trace/v1"
3030
"github.com/census-instrumentation/opencensus-service/exporter"
3131
"github.com/census-instrumentation/opencensus-service/interceptor/opencensus"
32+
"github.com/census-instrumentation/opencensus-service/interceptor/zipkin"
3233
"github.com/census-instrumentation/opencensus-service/internal"
3334
"github.com/census-instrumentation/opencensus-service/spanreceiver"
3435
"go.opencensus.io/plugin/ocgrpc"
@@ -39,6 +40,8 @@ import (
3940
var configYAMLFile string
4041
var ocInterceptorPort int
4142

43+
const zipkinRoute = "/api/v2/spans"
44+
4245
func main() {
4346
if err := rootCmd.Execute(); err != nil {
4447
log.Fatal(err)
@@ -54,6 +57,14 @@ func runOCAgent() {
5457
if err != nil {
5558
log.Fatalf("Failed to parse own configuration %v error: %v", configYAMLFile, err)
5659
}
60+
61+
// Ensure that we check and catch any logical errors with the
62+
// configuration e.g. if an interceptor shares the same address
63+
// as an exporter which would cause a self DOS and waste resources.
64+
if err := agentConfig.checkLogicalConflicts(yamlBlob); err != nil {
65+
log.Fatalf("Configuration logical error: %v", err)
66+
}
67+
5768
ocInterceptorAddr := agentConfig.ocInterceptorAddress()
5869

5970
traceExporters, closeFns := exportersFromYAMLConfig(yamlBlob)
@@ -64,7 +75,6 @@ func runOCAgent() {
6475
if err != nil {
6576
log.Fatal(err)
6677
}
67-
6878
closeFns = append(closeFns, ocInterceptorDoneFn)
6979

7080
// If zPages are enabled, run them
@@ -74,6 +84,16 @@ func runOCAgent() {
7484
closeFns = append(closeFns, zCloseFn)
7585
}
7686

87+
// If the Zipkin interceptor is enabled, then run it
88+
if agentConfig.zipkinInterceptorEnabled() {
89+
zipkinInterceptorAddr := agentConfig.zipkinInterceptorAddress()
90+
zipkinInterceptorDoneFn, err := runZipkinInterceptor(zipkinInterceptorAddr, commonSpanReceiver)
91+
if err != nil {
92+
log.Fatal(err)
93+
}
94+
closeFns = append(closeFns, zipkinInterceptorDoneFn)
95+
}
96+
7797
// Always cleanup finally
7898
defer func() {
7999
for _, closeFn := range closeFns {
@@ -141,3 +161,27 @@ func runOCInterceptor(addr string, sr spanreceiver.SpanReceiver) (doneFn func()
141161
doneFn = ln.Close
142162
return doneFn, nil
143163
}
164+
165+
func runZipkinInterceptor(addr string, sr spanreceiver.SpanReceiver) (doneFn func() error, err error) {
166+
zi, err := zipkininterceptor.New(sr)
167+
if err != nil {
168+
return nil, fmt.Errorf("Failed to create the Zipkin interceptor: %v", err)
169+
}
170+
171+
ln, err := net.Listen("tcp", addr)
172+
if err != nil {
173+
return nil, fmt.Errorf("Cannot bind Zipkin interceptor to address %q: %v", addr, err)
174+
}
175+
mux := http.NewServeMux()
176+
mux.Handle(zipkinRoute, zi)
177+
go func() {
178+
fullAddr := addr + zipkinRoute
179+
log.Printf("Running the Zipkin interceptor at %q", fullAddr)
180+
if err := http.Serve(ln, mux); err != nil {
181+
log.Fatalf("Failed to serve the Zipkin interceptor: %v", err)
182+
}
183+
}()
184+
185+
doneFn = ln.Close
186+
return doneFn, nil
187+
}

0 commit comments

Comments
 (0)