Skip to content

Commit 71349cc

Browse files
author
Matthias Endler
committed
Test reconnect on error
1 parent 08192c9 commit 71349cc

File tree

3 files changed

+37
-12
lines changed

3 files changed

+37
-12
lines changed

kafka_influxdb/tests/helpers/__init__.py

Whitespace-only changes.
+16
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
from functools import wraps
2+
from multiprocessing import Process
3+
4+
#class TimeoutError(Exception):
5+
# pass
6+
7+
def timeout(seconds=5):
8+
def decorator(func):
9+
def wrapper(*args, **kwargs):
10+
process = Process(None, func, None, args, kwargs)
11+
process.start()
12+
process.join(seconds)
13+
if process.is_alive():
14+
process.terminate()
15+
return wraps(func)(wrapper)
16+
return decorator

kafka_influxdb/tests/reader_test/test_kafka_reader.py

+21-12
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,14 @@
11
import unittest
22
import mock
33
import itertools
4+
import time
5+
from nose.tools import *
46
from collections import namedtuple
57
from kafka_influxdb.reader import kafka_reader
68
from kafka.client import KafkaClient
79
from kafka.common import ConnectionError
810
from kafka.common import Message
11+
from kafka_influxdb.tests.helpers.timeout import timeout
912

1013
class TestKafkaReader(unittest.TestCase):
1114

@@ -31,16 +34,22 @@ def sample_messages(self, payload, count):
3134
def test_handle_read(self):
3235
sample_messages, extracted_messages = self.sample_messages("hello", 3)
3336
self.reader.consumer.__iter__.return_value = sample_messages
34-
real_messages = list(self.reader.handle_read())
35-
self.assertEquals(real_messages, extracted_messages)
37+
received_messages = list(self.reader.handle_read())
38+
self.assertEquals(received_messages, extracted_messages)
3639

37-
# TODO: Run in separate process
38-
#def test_reconnect(self):
39-
#""" In case of a connection error, the client should reconnect and
40-
#start receiving messages again without interruption """
41-
# sample_messages1, extracted_messages1 = self.sample_messages("hi", 3)
42-
# sample_messages2, extracted_messages2 = self.sample_messages("world", 3)
43-
# sample_messages = sample_messages1 + [ConnectionError] + sample_messages2
44-
# self.reader.consumer.__iter__.return_value = sample_messages
45-
# real_messages = list(self.reader.handle_read())
46-
# self.assertEquals(real_messages, extracted_messages1 + extracted_messages2)
40+
@timeout(0.1)
41+
def test_reconnect(self):
42+
"""
43+
In case of a connection error, the client should reconnect and
44+
start receiving messages again without interruption
45+
"""
46+
sample_messages1, extracted_messages1 = self.sample_messages("hi", 3)
47+
sample_messages2, extracted_messages2 = self.sample_messages("world", 3)
48+
sample_messages = sample_messages1 + [ConnectionError] + sample_messages2
49+
self.reader.consumer.__iter__.return_value = sample_messages
50+
received_messages = list(self.receive_messages())
51+
self.assertEquals(received_messages, extracted_messages1 + extracted_messages2)
52+
53+
def receive_messages(self):
54+
for message in self.reader.read():
55+
yield message

0 commit comments

Comments
 (0)