Skip to content

Commit 387045f

Browse files
committed
Merge pull request #4 from mre/refactoring
Rewrite. Add line protocol support for InfluxDB 0.9.1
2 parents 98d1520 + a887bf0 commit 387045f

29 files changed

+19984
-273
lines changed

.gitignore

+3
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ __pycache__/
77

88
# Distribution / packaging
99
.Python
10+
virtualenv
1011
env/
1112
bin/
1213
build/
@@ -52,3 +53,5 @@ coverage.xml
5253
# Sphinx documentation
5354
docs/_build/
5455

56+
# Custom ignores
57+
tags

Dockerfile

+5
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
FROM python:2.7
2+
ADD . /code
3+
WORKDIR /code
4+
RUN pip install -r requirements.txt
5+
CMD python app.py

README.md

+87-29
Original file line numberDiff line numberDiff line change
@@ -2,46 +2,104 @@ Kafka-InfluxDB
22
==============
33

44
A Kafka consumer for InfluxDB written in Python.
5-
All messages sent to Kafka on a certain topic will be relayed to Influxdb.
6-
Supports Influxdb 0.8.x and 0.9.x.
5+
All messages sent to Kafka on a certain topic will be relayed to Influxdb.
6+
Supports InfluxDB 0.9.x. For InfluxDB 0.8 support look check out the 0.3.0 tag.
77

88
## Usage
99

10-
To run a simple test, you can provide config flags on the commandline like so:
10+
### Manual installation
1111

12-
python kafka_influxdb.py --influxdb_host 127.0.0.1 --influxdb_port 8086
12+
If you want to run a local instance, you can do so with the following commands:
1313

14-
You can also specify a config file which is more convenient for running as a daemon:
14+
pip install -r requirements.txt
15+
# You might need to adjust the config.yaml
16+
python kafka_influxdb.py -c config.yaml
1517

16-
python kafka_influxdb.py --configfile my_config.yaml
18+
### Starting inside a container
19+
20+
The most convenient way to get started is running this inside a Docker container like so:
21+
22+
docker build -t mre/kafka_influxdb .
23+
docker run -it --rm mre/kafka_influxdb
24+
25+
To run a simple test setup, you can run a collectd -> kafka -> kafka_influxdb -> influxdb toolchain with the following command:
26+
27+
docker-compose up
28+
29+
## Supported input and output formats
30+
31+
### Input formats
32+
33+
* Collectd Graphite, e.g. "mydatacenter.myhost.load.load.shortterm 0.45 1436357630"
34+
35+
### Output formats
36+
37+
* InfluxDB 0.9 line protocol output (e.g. `load_load_shortterm,datacenter=mydatacenter,host=myhost value="0.45" 1436357630`)
38+
* InfluxDB 0.8 JSON output (deprecated)
39+
40+
### Extending
41+
42+
You can write a custom encoder to support any other format (even fancy things like Protobuf).
43+
Look at the examples in the `encoder` folder to get started.
1744

1845
## Configuration
1946

20-
kafka_influxdb.py [-h] [--kafka_host KAFKA_HOST]
21-
[--kafka_port KAFKA_PORT]
22-
[--kafka_topic KAFKA_TOPIC]
23-
[--kafka_group KAFKA_GROUP]
24-
[--influxdb_host INFLUXDB_HOST]
25-
[--influxdb_port INFLUXDB_PORT]
26-
[--influxdb_user INFLUXDB_USER]
27-
[--influxdb_password INFLUXDB_PASSWORD]
28-
[--influxdb_dbname INFLUXDB_DBNAME]
29-
[--influxdb_data_name INFLUXDB_DATA_NAME]
30-
[--influxdb_columns INFLUXDB_COLUMNS]
31-
[--influxdb_version DB_VERSION]
32-
[--influxdb_retention_policy RETENTION_POLICY]
33-
[--buffer_size BUFFER_SIZE]
34-
[--verbose BOOLEAN]
35-
[--statistics BOOLEAN]
36-
[--configfile CONFIG_FILE]
47+
usage: kafka_influxdb.py [-h] [--kafka_host KAFKA_HOST]
48+
[--kafka_port KAFKA_PORT] [--kafka_topic KAFKA_TOPIC]
49+
[--kafka_group KAFKA_GROUP]
50+
[--influxdb_host INFLUXDB_HOST]
51+
[--influxdb_port INFLUXDB_PORT]
52+
[--influxdb_user INFLUXDB_USER]
53+
[--influxdb_password INFLUXDB_PASSWORD]
54+
[--influxdb_dbname INFLUXDB_DBNAME]
55+
[--influxdb_retention_policy INFLUXDB_RETENTION_POLICY]
56+
[--influxdb_time_precision INFLUXDB_TIME_PRECISION]
57+
[--encoder ENCODER] [--buffer_size BUFFER_SIZE]
58+
[-c CONFIGFILE] [-v]
3759

38-
Command line settings have precedence over config file provided settings. See the sample at `config.yaml` to get an idea on the format.
60+
A Kafka consumer for InfluxDB
61+
62+
optional arguments:
63+
-h, --help show this help message and exit
64+
--kafka_host KAFKA_HOST
65+
Hostname or IP of Kafka message broker (default:
66+
localhost)
67+
--kafka_port KAFKA_PORT
68+
Port of Kafka message broker (default: 9092)
69+
--kafka_topic KAFKA_TOPIC
70+
Topic for metrics (default: test)
71+
--kafka_group KAFKA_GROUP
72+
Kafka consumer group (default: my_group)
73+
--influxdb_host INFLUXDB_HOST
74+
InfluxDB hostname or IP (default: localhost)
75+
--influxdb_port INFLUXDB_PORT
76+
InfluxDB API port (default: 8086)
77+
--influxdb_user INFLUXDB_USER
78+
InfluxDB username (default: root)
79+
--influxdb_password INFLUXDB_PASSWORD
80+
InfluxDB password (default: root)
81+
--influxdb_dbname INFLUXDB_DBNAME
82+
InfluXDB database to write metrics into (default:
83+
metrics)
84+
--influxdb_retention_policy INFLUXDB_RETENTION_POLICY
85+
Retention policy for incoming metrics (default:
86+
default)
87+
--influxdb_time_precision INFLUXDB_TIME_PRECISION
88+
Precision of incoming metrics. Can be one of 's', 'm',
89+
'ms', 'u' (default: s)
90+
--encoder ENCODER Input encoder which converts an incoming message to
91+
dictionary (default: collectd_graphite_encoder)
92+
--buffer_size BUFFER_SIZE
93+
Maximum number of messages that will be collected
94+
before flushing to the backend (default: 1000)
95+
-c CONFIGFILE, --configfile CONFIGFILE
96+
Configfile path (default: None)
97+
-v, --verbose Show info and debug messages while running (default:
98+
False)
3999

40-
## Dependencies
100+
Command line settings have precedence over config file provided settings. See the sample at `config.yaml` to get an idea on the format.
41101

42-
Please note that you must install the version of the influxdb python client matching your influxdb version (for 0.9 see https://github.com/influxdb/influxdb-python/tree/0.9.0_support )
102+
## TODO
43103

44-
## Todo
45104
* flush buffer if not full but some period has elapsed (safety net for low frequency input)
46-
* offset management, if not already supported in kafka client
47-
* create error log
105+
* Provide environment variables for docker

config.yaml

+12-10
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,18 @@
1-
---
1+
---
22
kafka:
3-
host: "10.1.3.235"
3+
host: "kafka"
44
port: 9092
5-
topic: "perfmonspring1"
5+
topic: "metrics"
66
influxdb:
7-
host: "springfield02.local"
7+
host: "influxdb"
88
port: 8086
9-
user: 'root'
10-
password: 'root'
11-
dbname: 'mydb'
12-
version: 0.9
13-
retention_policy: "my_rp"
9+
user: "root"
10+
password: "root"
11+
dbname: "metrics"
12+
retention_policy: "default"
13+
# Possible encoders (see encoder directory for details)
14+
# * collectd_graphite_encoder
15+
encoder: "collectd_graphite_encoder"
1416
buffer_size: 1000
1517
verbose: false
16-
statistics: true
18+
statistics: true

contrib/collectd/collectd.conf

+25
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
FQDNLookup false
2+
LoadPlugin syslog
3+
LoadPlugin logfile
4+
LoadPlugin cpu
5+
LoadPlugin load
6+
LoadPlugin memory
7+
LoadPlugin write_kafka
8+
9+
<Plugin logfile>
10+
LogLevel info
11+
File STDOUT
12+
Timestamp true
13+
PrintSeverity false
14+
</Plugin>
15+
16+
<Plugin syslog>
17+
LogLevel info
18+
</Plugin>
19+
20+
<Plugin write_kafka>
21+
Property "metadata.broker.list" "kafka:9092"
22+
<Topic "metrics">
23+
Format Graphite
24+
</Topic>
25+
</Plugin>

contrib/influxdb/config.toml

+187
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,187 @@
1+
# Once every 24 hours InfluxDB will report anonymous data to m.influxdb.com
2+
reporting-disabled = true
3+
4+
###
5+
### [meta]
6+
###
7+
### Controls the parameters for the Raft consensus group that stores metadata
8+
### about the InfluxDB cluster.
9+
###
10+
11+
[meta]
12+
dir = "/var/opt/influxdb/meta"
13+
hostname = "localhost"
14+
bind-address = ":8088"
15+
retention-autocreate = true
16+
election-timeout = "1s"
17+
heartbeat-timeout = "1s"
18+
leader-lease-timeout = "500ms"
19+
commit-timeout = "50ms"
20+
21+
###
22+
### [data]
23+
###
24+
### Controls where the actual shard data for InfluxDB lives and how it is
25+
### flushed from the WAL. "dir" may need to be changed to a suitable place
26+
### for your system, but the WAL settings are an advanced configuration. The
27+
### defaults should work for most systems.
28+
###
29+
30+
[data]
31+
dir = "/data/db"
32+
MaxWALSize = 104857600 # Maximum size the WAL can reach before a flush. Defaults to 100MB.
33+
WALFlushInterval = "10m" # Maximum time data can sit in WAL before a flush.
34+
WALPartitionFlushDelay = "2s" # The delay time between each WAL partition being flushed.
35+
36+
###
37+
### [cluster]
38+
###
39+
### Controls non-Raft cluster behavior, which generally includes how data is
40+
### shared across shards.
41+
###
42+
43+
[cluster]
44+
shard-writer-timeout = "5s"
45+
46+
###
47+
### [retention]
48+
###
49+
### Controls the enforcement of retention policies for evicting old data.
50+
###
51+
52+
[retention]
53+
enabled = true
54+
check-interval = "10m"
55+
56+
###
57+
### [admin]
58+
###
59+
### Controls the availability of the built-in, web-based admin interface.
60+
###
61+
62+
[admin]
63+
enabled = true
64+
bind-address = ":8083"
65+
66+
###
67+
### [http]
68+
###
69+
### Controls how the HTTP endpoints are configured. These are the primary
70+
### mechanism for getting data into and out of InfluxDB.
71+
###
72+
73+
[http]
74+
enabled = true
75+
bind-address = ":8086"
76+
auth-enabled = false
77+
log-enabled = true
78+
write-tracing = false
79+
pprof-enabled = false
80+
81+
###
82+
### [[graphite]]
83+
###
84+
### Controls one or many listeners for Graphite data.
85+
###
86+
87+
[[graphite]]
88+
enabled = true
89+
bind-address = ":2003"
90+
protocol = "tcp"
91+
consistency-level = "one"
92+
name-separator = "."
93+
94+
## "name-schema" configures tag names for parsing the metric name from graphite protocol;
95+
## separated by `name-separator`.
96+
## The "measurement" tag is special and the corresponding field will become
97+
## the name of the metric.
98+
## e.g. "type.host.measurement.device" will parse "server.localhost.cpu.cpu0" as
99+
## {
100+
## measurement: "cpu",
101+
## tags: {
102+
## "type": "server",
103+
## "host": "localhost,
104+
## "device": "cpu0"
105+
## }
106+
## }
107+
name-schema = "host.measurement.device"
108+
109+
## If set to true, when the input metric name has more fields than `name-schema` specified,
110+
## the extra fields will be ignored.
111+
## Otherwise an error will be logged and the metric rejected.
112+
ignore-unnamed = true
113+
114+
###
115+
### [collectd]
116+
###
117+
### Controls the listener for collectd data.
118+
###
119+
120+
[collectd]
121+
enabled = false
122+
# bind-address = ""
123+
# database = ""
124+
# typesdb = ""
125+
126+
###
127+
### [opentsdb]
128+
###
129+
### Controls the listener for OpenTSDB data.
130+
###
131+
132+
[opentsdb]
133+
enabled = false
134+
# bind-address = ""
135+
# database = ""
136+
# retention-policy = ""
137+
138+
###
139+
### [udp]
140+
###
141+
### Controls the listener for InfluxDB line protocol data via UDP.
142+
###
143+
144+
[udp]
145+
enabled = false
146+
# bind-address = ""
147+
# database = ""
148+
# batch-size = 0
149+
# batch-timeout = "0"
150+
151+
###
152+
### [monitoring]
153+
###
154+
155+
[monitoring]
156+
enabled = true
157+
write-interval = "24h"
158+
159+
###
160+
### [continuous_queries]
161+
###
162+
### Controls how continuous queries are run within InfluxDB.
163+
###
164+
165+
[continuous_queries]
166+
enabled = false
167+
recompute-previous-n = 2
168+
recompute-no-older-than = "10m"
169+
compute-runs-per-interval = 10
170+
compute-no-more-than = "2m"
171+
172+
###
173+
### [hinted-handoff]
174+
###
175+
### Controls the hinted handoff feature, which allows nodes to temporarily
176+
### store queued data when one node of a cluster is down for a short period
177+
### of time.
178+
###
179+
180+
[hinted-handoff]
181+
enabled = false
182+
dir = "/var/opt/influxdb/hh"
183+
max-size = 1073741824
184+
max-age = "168h"
185+
retry-rate-limit = 0
186+
retry-interval = "1s"
187+

0 commit comments

Comments
 (0)