Skip to content

Commit 29cb4f1

Browse files
author
Stephen Asbury
committed
Updated doc
Added docker file Fixed tests, all now run
1 parent fc97722 commit 29cb4f1

10 files changed

+156
-143
lines changed

Dockerfile

+20
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
FROM golang:1.12.4 AS builder
2+
3+
WORKDIR /src/nats-replicator
4+
5+
LABEL maintainer "Stephen Asbury <[email protected]>"
6+
7+
COPY . .
8+
9+
RUN go mod download
10+
RUN CGO_ENABLED=0 go build -v -a -tags netgo -installsuffix netgo -o /nats-replicator
11+
12+
FROM alpine:3.9
13+
14+
RUN mkdir -p /nats/bin && mkdir /nats/conf
15+
16+
COPY --from=builder /nats-replicator /nats/bin/nats-replicator
17+
18+
RUN ln -ns /nats/bin/nats-replicator /bin/nats-replicator
19+
20+
ENTRYPOINT ["/bin/nats-replicator"]

README.md

+2-5
Original file line numberDiff line numberDiff line change
@@ -11,11 +11,8 @@ This project implements a multi-connector bridge between NATS and NATS streaming
1111

1212
## WARNING DO NOT USE THIS SERVER YET
1313

14-
* This is the initial checkin
15-
* This code was just started
16-
* Tests do not pass
17-
* Doc is not right
18-
* Docker is not provided
14+
* Stan incoming performance needs to be tweaked to allow multiple outstanding acks
15+
* Doc is a first draft
1916

2017
## Features
2118

docs/buildandrun.md

-2
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,6 @@ Use `make test` to run the tests, and `make install` to install. The nats and na
2828

2929
## Docker
3030

31-
!!! FIX THIS !!!
32-
3331
You can build the docker image using:
3432

3533
```bash

docs/config.md

+60-71
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
# NATS-Replicator Configuration
22

3-
The bridge uses a single configuration file passed on the command line or environment variable. Configuration is organized into a root section and several blocks.
3+
The replicator uses a single configuration file passed on the command line or environment variable. Configuration is organized into a root section and several blocks.
44

55
* [Specifying the Configuration File](#specify)
66
* [Shared](#root)
@@ -24,10 +24,10 @@ include "./includes/connectors.conf"
2424
To set the configuration on the command line, use:
2525

2626
```bash
27-
% nats-kafka -c <path to config file>
27+
% nats-replicator -c <path to config file>
2828
```
2929

30-
To set the configuration file using an environment variable, export `NATS_KAFKA_BRIDGE_CONFIG` with the path to the configuration.
30+
To set the configuration file using an environment variable, export `NATS_REPLICATOR_CONFIG` with the path to the configuration.
3131

3232
<a name="root"></a>
3333

@@ -37,17 +37,15 @@ The root section:
3737

3838
```yaml
3939
reconnectinterval: 5000,
40-
connecttimeout: 5000,
4140
```
4241
4342
can currently contain settings for:
4443
45-
* `reconnectinterval` - this value, in milliseconds, is the time used in between reconnection attempts for a connector when it fails. For example, if a connector loses access to NATS, the bridge will try to restart it every `reconnectinterval` milliseconds.
46-
* `connecttimeout` - this value, in milliseconds, is the time used when trying to connect to Kafka.
44+
* `reconnectinterval` - this value, in milliseconds, is the time used in between reconnection attempts for a connector when it fails. For example, if a connector loses access to NATS, the replicator will try to restart it every `reconnectinterval` milliseconds.
4745

4846
## TLS <a name="tls"></a>
4947

50-
NATS, streaming, Kafka and HTTP configurations all take an optional TLS setting. The TLS configuration takes three possible settings:
48+
NATS, streaming and HTTP configurations all take an optional TLS setting. The TLS configuration takes three possible settings:
5149

5250
* `root` - file path to a CA root certificate store, used for NATS connections
5351
* `cert` - file path to a server certificate, used for HTTPS monitoring and optionally for client side certificates with NATS
@@ -95,57 +93,69 @@ monitoring: {
9593

9694
Is used to configure an HTTP or HTTPS port, as well as TLS settings when HTTPS is used.
9795

98-
* `httphost` - the network interface to publish monitoring on, valid for HTTP or HTTPS. An empty value will tell the server to use all available network interfaces.
99-
* `httpport` - the port for HTTP monitoring, no TLS configuration is expected, a value of -1 will tell the server to use an ephemeral port, the port will be logged on startup.
96+
* `httphost` - the network interface to publish monitoring on, valid for HTTP or HTTPS. An empty value will tell the replicator to use all available network interfaces.
97+
* `httpport` - the port for HTTP monitoring, no TLS configuration is expected, a value of -1 will tell the replicator to use an ephemeral port, the port will be logged on startup.
10098

10199
`2019/03/20 12:06:38.027822 [INF] starting http monitor on :59744`
102100

103101
* `httpsport` - the port for HTTPS monitoring, a TLS configuration is expected, a value of -1 will tell the server to use an ephemeral port, the port will be logged on startup.
104102
* `tls` - a [TLS configuration](#tls).
105103

106-
The `httpport` and `httpsport` settings are mutually exclusive, if both are set to a non-zero value the bridge will not start.
104+
The `httpport` and `httpsport` settings are mutually exclusive, if both are set to a non-zero value the replicator will not start.
107105

108106
<a name="nats"></a>
109107

110108
## NATS
111109

112-
The bridge makes a single connection to NATS. This connection is shared by all connectors. Configuration is through the `nats` section of the config file:
110+
The replicator can make multiple connections to NATS. Each connection has a name so that connectors can refer to it. Streaming connections also use the name of a NATS connection to refer to it. Configuration is through the `nats` section of the config file, which defines an array of connection parameters:
113111

114112
```yaml
115-
nats: {
116-
Servers: ["localhost:4222"],
117-
ConnectTimeout: 5000,
118-
MaxReconnects: 5,
119-
ReconnectWait: 5000,
120-
}
113+
nats: [
114+
{
115+
Name: "connection_one",
116+
Servers: ["localhost:4222"],
117+
ConnectTimeout: 5000,
118+
MaxReconnects: 5,
119+
ReconnectWait: 5000,
120+
}
121+
]
121122
```
122123

123124
NATS can be configured with the following properties:
124125

126+
* `name` - the unique name used to refer to this configuration/connection
125127
* `servers` - an array of server URLS
126128
* `connecttimeout` - the time, in milliseconds, to wait before failing to connect to the NATS server
127129
* `reconnectwait` - the time, in milliseconds, to wait between reconnect attempts
128-
* `maxreconnects` - the maximum number of reconnects to try before exiting the bridge with an error.
130+
* `maxreconnects` - the maximum number of reconnects to try before exiting the replicator with an error.
129131
* `tls` - (optional) [TLS configuration](#tls). If the NATS server uses unverified TLS with a valid certificate, this setting isn't required.
130132
* `usercredentials` - (optional) the path to a credentials file for connecting to NATs.
131133

132134
<a name="stan"></a>
133135

134136
## NATS Streaming
135137

136-
The bridge makes a single connection to a NATS streaming server. This connection is shared by all connectors. Configuration is through the `stan` section of the config file:
138+
The replicator can make multiple connections to streaming servers. Each streaming connection must refer to a NATS connection by name. This reduces the configuration clutter by keeping NATS parameters in the NATS section. Configuration is through the `stan` section of the config file, which defines an array:
137139

138140
```yaml
139-
stan: {
140-
ClusterID: "test-cluster"
141-
ClientID: "kafkabridge"
142-
}
141+
stan: [
142+
{
143+
Name: "stan_one",
144+
NATSConnection: "connection_one",
145+
ClusterID: "test-cluster"
146+
ClientID: "replicator_one"
147+
}
148+
]
143149
```
144150

151+
Multiple streaming connections could use the same NATS connection.
152+
145153
NATS streaming can be configured with the following properties:
146154

155+
* `name` - the unique name used to refer to this configuration/connection
156+
* `natsconnection` - the unique name of the nats connection to use for this streaming connection
147157
* `clusterid` - the cluster id for the NATS streaming server.
148-
* `clientid` - the client id for the bridge, shared by all connections.
158+
* `clientid` - the client id for the connection.
149159
* `pubackwait` - the time, in milliseconds, to wait before a publish fails due to a timeout.
150160
* `discoverprefix` - the discover prefix for the streaming server.
151161
* `maxpubacksinflight` - maximum pub ACK messages that can be in flight for this connection.
@@ -155,72 +165,51 @@ NATS streaming can be configured with the following properties:
155165

156166
## Connectors
157167

158-
The final piece of the bridge configuration is the `connect` section. Connect specifies an array of connector configurations. All connector configs use the same format, relying on optional settings to determine what the do.
168+
The final piece of the replicator configuration is the `connect` section. Connect specifies an array of connector configurations. All connector configs use the same format, relying on optional settings to determine what the do.
159169

160170
```yaml
161171
connect: [
162172
{
163-
type: "KafkaToNATS",
164-
brokers: ["localhost:9092"]
165-
id: "zip",
166-
topic: "test",
167-
subject: "one",
168-
},{
169-
type: "NATSToKafka",
170-
brokers: ["localhost:9092"]
171-
id: "zap",
172-
topic: "test2",
173-
subject: "two",
173+
type: NATSToNATS,
174+
id: "alpha",
175+
IncomingSubject: "in",
176+
IncomingConnection: "connection_one",
177+
OutgoingSubject: "out",
178+
OutgoingConnection: "connection_one",
174179
},
175180
],
176181
```
177182

178183
The most important property in the connector configuration is the `type`. The type determines which kind of connector is instantiated. Available, uni-directional, types include:
179184

180-
* `KafkaToNATS` - a topic to NATS connector
181-
* `KafkaToStan` - a topic to streaming connector
182-
* `NATSToKafka` - a streaming to topic connector
183-
* `STANToKafka` - a NATS to topic connector
185+
* `NATSToNATS` - a subject to subject connector
186+
* `NATSToStan` - a subject to streaming connector
187+
* `StanToNATS` - a streaming to subject connector
188+
* `STANToSTAN` - a streaming to streaming connector
184189

185190
All connectors can have an optional id, which is used in monitoring:
186191

187192
* `id` - (optional) user defined id that will tag the connection in monitoring JSON.
188193

189-
For NATS connections, specify:
190-
191-
* `subject` - the subject to subscribe/publish to, depending on the connections direction.
192-
* `natsqueue` - the queue group to use in subscriptions, this is optional but useful for load balancing.
194+
All connectors require a configuration for the connection to use:
193195

194-
Keep in mind that NATS queue groups do not guarantee ordering, since the queue subscribers can be on different nats-servers in a cluster. So if you have to bridges running with connectors on the same NATS queue/subject pair and have a high message rate you may get messages in the Kafka topic out of order.
196+
* `incomingconnection` - the name of a NATS or streaming connection to subscribe to
197+
* `outgoingconnection` - the name of a NATS or streaming connection to publish to
195198

196-
For streaming connections, there is a single required setting and several optional ones:
197-
198-
* `channel` - the streaming channel to subscribe/publish to.
199-
* `durablename` - (optional) durable name for the streaming subscription (if appropriate.)
200-
* `startatsequence` - (optional) start position, use -1 for start with last received, 0 for deliver all available (the default.)
201-
* `startattime` - (optional) the start position as a time, in Unix seconds since the epoch, mutually exclusive with `startatsequence`.
202-
203-
All connectors must specify Kafka connection properties, with a few optional settings available as well:
199+
For NATS connections, specify:
204200

205-
* `brokers` - a string array of broker host:port settings
206-
* `topic` - the Kafka topic to listen/send to
207-
* `tls` - A tls config for the connection
208-
* `balancer` - required for a writer, should be "hash" or "leastbytes"
209-
* `groupid` - (exclusive with partition) used by the reader to set a group id
210-
* `partition` - (exclusive with groupid) used by the reader to set a partition
211-
* `minbytes` - (optional) used by Kafka readers to set the minimum bytes for a read
212-
* `maxbytes` - (optional) used by a Kafka reader to set the maximum bytes for a read
213-
* `keytype` - (optional) defines the way keys are assigned to messages coming from NATS (see below)
214-
* `keyvalue` - (optional) extra data that may be used depending on the key type
201+
* `incomingsubject` - the subject to subscribe to, depending on the connections direction.
202+
* `incomingqueuename` - the queue group to use in subscriptions, this is optional but useful for load balancing.
203+
* `outgoingsubject` - the subject to publish to, depending on the connections direction.
215204

216-
Available key types are:
205+
Keep in mind that NATS queue groups do not guarantee ordering, since the queue subscribers can be on different nats-servers in a cluster. So if you have to replicators running with connectors on the same NATS queue/subject pair and have a high message rate you may get messages to the receiver "out of order." Also, note that there is no outgoing queue.
217206

218-
* `fixed` - the value in the `keyvalue` field is assigned to all messages
219-
* `subject` - the subject the incoming NATS message was sent on is used as the key
220-
* `reply` - the reply-to sent with the incoming NATS messages is used as the key
221-
* `subjectre` - the value in the `keyvalue` field used as a regular expression and the first capture group, when matched to the subject, is used as the key
222-
* `replyre` - the value in the `keyvalue` field used as a regular expression and the first capture group, when matched to the reply-to, is used as the key
207+
These settings are directional depending so a `NATSToStan` connector would use an `incomingsubject` while a `StanToNATS` connector would use an `outgoingsubject`. Connectors ignore settings they don't need.
223208

224-
If unset, an empty key is assigned during translation from NATS to Kafka. If the regex types are used and they don't match, an empty key is used.
209+
For streaming connections, the channel setting is required (directionality dependent), the others are optional:
225210

226-
For nats streaming connections channel is treated as the subject and durable name is treated as the reply to, so that reply key type will use the durable name as the key.
211+
* `incomingchannel` - the streaming channel to subscribe to.
212+
* `outgoingchannel` - the streaming channel to publish to.
213+
* `incomingdurablename` - (optional) durable name for the streaming subscription (if appropriate.)
214+
* `incomingstartatsequence` - (optional) start position, use -1 for start with last received, 0 for deliver all available (the default.)
215+
* `incomingstartattime` - (optional) the start position as a time, in Unix seconds since the epoch, mutually exclusive with `startatsequence`.

docs/monitoring.md

+6-6
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
1-
# Monitoring the NATS-Kafka Bridge
1+
# Monitoring the NATS-Replicator
22

3-
The nats-kafka bridge provides optional HTTP/s monitoring. When [configured with a monitoring port](config.md#monitoring) the server will provide two HTTP endpoints:
3+
The nats-replicator provides optional HTTP/s monitoring. When [configured with a monitoring port](config.md#monitoring) the server will provide two HTTP endpoints:
44

55
* [/varz](#varz)
66
* [/healthz](#healthz)
@@ -11,9 +11,9 @@ The nats-kafka bridge provides optional HTTP/s monitoring. When [configured with
1111

1212
The `/varz` endpoint returns a JSON encoded set of statistics for the server. These statistics are wrapped in a root level object with the following properties:
1313

14-
* `start_time` - the start time of the bridge, in the bridge's timezone.
15-
* `current_time` - the current time, in the bridge's timezone.
16-
* `uptime` - a string representation of the server's up time.
14+
* `start_time` - the start time of the replicator, in the replicator's timezone.
15+
* `current_time` - the current time, in the replicator's timezone.
16+
* `uptime` - a string representation of the replicator's up time.
1717
* `http_requests` - a map of request paths to counts, the keys are `/`, `/varz` and `/healthz`.
1818
* `connectors` - an array of statistics for each connector.
1919

@@ -38,4 +38,4 @@ Each object in the connectors array, one per connector, will contain the followi
3838

3939
## /healthz
4040

41-
The `/healthz` endpoint is provided for automated up/down style checks. The server returns an HTTP/200 when running and won't respond if it is down.
41+
The `/healthz` endpoint is provided for automated up/down style checks. The server returns an HTTP/200 when running and won't respond if it is down.

server/core/server_test.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -241,8 +241,8 @@ func (tbs *TestEnv) StartNATSandStan(port int, clusterID string, clientID string
241241
return nil
242242
}
243243

244-
// StopBridge stops the bridge
245-
func (tbs *TestEnv) StopBridge() {
244+
// StopReplicator stops the bridge
245+
func (tbs *TestEnv) StopReplicator() {
246246
if tbs.Bridge != nil {
247247
tbs.Bridge.Stop()
248248
tbs.Bridge = nil

server/core/stan2nats.go

+8-4
Original file line numberDiff line numberDiff line change
@@ -50,11 +50,11 @@ func (conn *Stan2NATSConnector) Start() error {
5050
return fmt.Errorf("%s connector is improperly configured, incoming and outgoing settings are required", conn.String())
5151
}
5252

53-
if !conn.bridge.CheckNATS(incoming) {
53+
if !conn.bridge.CheckStan(incoming) {
5454
return fmt.Errorf("%s connector requires nats connection named %s to be available", conn.String(), incoming)
5555
}
5656

57-
if !conn.bridge.CheckStan(outgoing) {
57+
if !conn.bridge.CheckNATS(outgoing) {
5858
return fmt.Errorf("%s connector requires stan connection named %s to be available", conn.String(), outgoing)
5959
}
6060

@@ -127,7 +127,11 @@ func (conn *Stan2NATSConnector) Start() error {
127127
conn.sub = sub
128128

129129
conn.stats.AddConnect()
130-
conn.bridge.Logger().Tracef("opened and reading %s", conn.config.IncomingSubject)
130+
if config.IncomingDurableName != "" {
131+
conn.bridge.Logger().Tracef("opened and reading %s with durable name %s", conn.config.IncomingChannel, config.IncomingDurableName)
132+
} else {
133+
conn.bridge.Logger().Tracef("opened and reading %s", conn.config.IncomingChannel)
134+
}
131135
conn.bridge.Logger().Noticef("started connection %s", conn.String())
132136

133137
return nil
@@ -145,7 +149,7 @@ func (conn *Stan2NATSConnector) Shutdown() error {
145149
conn.sub = nil
146150

147151
if sub != nil {
148-
if err := sub.Unsubscribe(); err != nil {
152+
if err := sub.Close(); err != nil {
149153
conn.bridge.Logger().Noticef("error unsubscribing for %s, %s", conn.String(), err.Error())
150154
}
151155
}

0 commit comments

Comments
 (0)