Skip to content

Commit 6400e61

Browse files
committed
make some minor repairs to the hec.dq
1 parent 8cd07f4 commit 6400e61

File tree

4 files changed

+69
-34
lines changed

4 files changed

+69
-34
lines changed

hubblestack/hec/dq.py

+11-24
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@
77
import shutil
88
import json
99
from collections import deque
10+
from hubblestack.utils.misc import numbered_file_split_key
11+
from hubblestack.utils.encoding import encode_something_to_bytes, decode_something_to_string
1012

1113
__all__ = [
1214
'QueueTypeError', 'QueueCapacityError', 'MemQueue', 'DiskQueue',
@@ -37,6 +39,7 @@ def check_type(self, item):
3739
if not isinstance(item, self.ok_types):
3840
raise QueueTypeError('type({0}) is not ({1})'.format(type(item), self.ok_types))
3941

42+
4043
class NoQueue(object):
4144
cn = 0
4245
def put(self, *a, **kw):
@@ -49,22 +52,6 @@ def __bool__(self):
4952
return False
5053
__nonzero__ = __bool__ # stupid python2
5154

52-
def numbered_file_split_key(x):
53-
""" for sorting purposes, split filenames like '238048.11', '238048.17',
54-
'238048.0' into lists of integers. E.g.:
55-
56-
for fname in sorted(filenames, key=numbered_file_split_key):
57-
do_things_ordered_by_integer_sort()
58-
"""
59-
try:
60-
return [int(i) for i in x.split('.')]
61-
except:
62-
pass
63-
try:
64-
return [int(x)]
65-
except:
66-
pass
67-
return list()
6855

6956
class DiskQueue(OKTypesMixin):
7057
sep = b' '
@@ -85,6 +72,7 @@ def __bool__(self):
8572
__nonzero__ = __bool__ # stupid python2
8673

8774
def compress(self, dat):
75+
dat = encode_something_to_bytes(dat)
8876
if not self.compression:
8977
return dat
9078
def _bz2(x):
@@ -100,7 +88,8 @@ def unlink_(self, fname):
10088
os.unlink(name)
10189

10290
def decompress(self, dat):
103-
if str(dat).startswith('BZ'):
91+
dat = encode_something_to_bytes(dat)
92+
if dat.startswith(b'BZ'):
10493
try:
10594
return bz2.BZ2Decompressor().decompress(dat)
10695
except IOError:
@@ -147,8 +136,6 @@ def put(self, item, **meta):
147136
f = os.path.join(d, remainder)
148137
with open(f, 'wb') as fh:
149138
log.debug('writing item to disk cache')
150-
if isinstance(bstr, str):
151-
bstr = str.encode(bstr)
152139
fh.write(bstr)
153140
if meta:
154141
with open(f + '.meta', 'w') as fh:
@@ -176,23 +163,23 @@ def peek(self):
176163
"""
177164
for fname in self.files:
178165
with open(fname, 'rb') as fh:
179-
return self.decompress(fh.read()), self.read_meta(fname)
166+
return decode_something_to_string(self.decompress(fh.read())), self.read_meta(fname)
180167

181168
def get(self):
182169
""" get the next item from the queue
183170
returns: data_octets, meta_data_dict
184171
"""
185172
for fname in self.files:
186173
with open(fname, 'rb') as fh:
187-
ret = self.decompress(fh.read())
188-
ret = ret, self.read_meta(fname)
174+
dat = self.decompress(fh.read())
175+
mdat = self.read_meta(fname)
189176
sz = os.stat(fname).st_size
190177
self.unlink_(fname)
191178
self.cn -= 1
192179
self.sz -= sz
193180
if self.double_check_cnsz:
194181
self._count(double_check_only=True, tag='get')
195-
return ret
182+
return decode_something_to_string(dat), mdat
196183

197184
def getz(self, sz=SPLUNK_MAX_MSG):
198185
""" fetch items from the queue and concatenate them together using the
@@ -235,7 +222,7 @@ def getz(self, sz=SPLUNK_MAX_MSG):
235222
#
236223
# occasionally this will return something pessimistic
237224
meta_data[k] = max(meta_data[k])
238-
return ret, meta_data
225+
return decode_something_to_string(ret), meta_data
239226

240227
def pop(self):
241228
""" remove the next item from the queue (do not return it); useful with .peek() """

hubblestack/utils/encoding.py

+12
Original file line numberDiff line numberDiff line change
@@ -40,3 +40,15 @@ def encode_base64(starting_string, format_chained=True, chained=None, chained_st
4040
ret = base64.b64encode(starting_string)
4141

4242
return bool(ret), ret
43+
44+
def encode_something_to_bytes(x):
45+
""" take strings or bytes or whatever and convert to bytes """
46+
if isinstance(x, (bytes,bytearray)):
47+
return x
48+
return x.encode('utf-8')
49+
50+
def decode_something_to_string(x):
51+
""" take strings or bytes or whatever and convert to string """
52+
if isinstance(x, (bytes,bytearray)):
53+
return x.decode('utf-8')
54+
return x

hubblestack/utils/misc.py

+18
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
# coding: utf-8
2+
3+
def numbered_file_split_key(x):
4+
""" for sorting purposes, split filenames like '238048.11', '238048.17',
5+
'238048.0' into lists of integers. E.g.:
6+
7+
for fname in sorted(filenames, key=numbered_file_split_key):
8+
do_things_ordered_by_integer_sort()
9+
"""
10+
try:
11+
return [int(i) for i in x.split('.')]
12+
except:
13+
pass
14+
try:
15+
return [int(x)]
16+
except:
17+
pass
18+
return list()

tests/unittests/test_hec_dq.py

+28-10
Original file line numberDiff line numberDiff line change
@@ -12,20 +12,33 @@ def samp():
1212

1313
@pytest.fixture
1414
def dq():
15-
return DiskQueue(TEST_DQ_DIR, size=100, fresh=True)
15+
return DiskQueue(TEST_DQ_DIR, fresh=True)
1616

17-
def test_disk_queue(dq):
17+
@pytest.fixture
18+
def dqc():
19+
return DiskQueue(TEST_DQ_DIR + ".bz2", fresh=True, compression=9)
20+
21+
def _test_disk_queue(dq):
1822
borked = False
1923

2024
dq.put('one', testinator=3)
2125
dq.put('two', testinator=4)
2226
dq.put('three', testinator=5)
2327

24-
assert len(dq) == 13
25-
assert dq.peek() == (b'one', {'testinator': 3})
26-
assert dq.get() == (b'one', {'testinator': 3})
27-
assert dq.peek() == (b'two', {'testinator': 4})
28-
assert len(dq) == 9
28+
if not dq.compression:
29+
# NOTE: with the huffman headers (or whatever), the size of the dq is
30+
# probably quite a lot larger than the expected 13. the test is
31+
# essentially meaningless unless the uncompressed message was large
32+
# enough… then we could test to see if the dq len was smaller than
33+
# expected or something… let's just skip this for the compressed dq
34+
assert len(dq) == 13
35+
36+
assert dq.peek() == ('one', {'testinator': 3})
37+
assert dq.get() == ('one', {'testinator': 3})
38+
assert dq.peek() == ('two', {'testinator': 4})
39+
40+
if not dq.compression:
41+
assert len(dq) == 9
2942

3043
assert dq.getz() == ('two three', {'testinator': 5})
3144
assert len(dq) == 0
@@ -37,18 +50,23 @@ def test_disk_queue(dq):
3750
assert dq.getz(8) == ('one two', {})
3851
assert dq.getz(8) == ('three', {})
3952

53+
def test_disk_queue(dq):
54+
_test_disk_queue(dq)
55+
56+
def test_disk_queue_with_compression(dqc):
57+
_test_disk_queue(dqc)
58+
4059
def _test_pop(samp,q):
4160
for i in samp:
4261
q.put(i)
4362
for i in samp:
44-
assert q.peek() == (str.encode(i), {})
63+
assert q.peek() == (i, {})
4564
q.pop()
4665

4766
def test_dq_pop(samp,dq):
4867
_test_pop(samp,dq)
4968

50-
def test_disk_queue_put_estimator():
51-
dq = DiskQueue(TEST_DQ_DIR, fresh=True)
69+
def test_disk_queue_put_estimator(dq):
5270
for item in ['hi-there-{}'.format(x) for x in range(20)]:
5371
pre = dq.cn, dq.sz
5472
dq.put(item)

0 commit comments

Comments
 (0)