Skip to content

Commit 1f455dc

Browse files
authored
Merge pull request #773 from jettero/fix-py3-hec-dq
Fix py3 hec dq
2 parents 9802ad1 + 2ac4013 commit 1f455dc

File tree

7 files changed

+111
-29
lines changed

7 files changed

+111
-29
lines changed

.pipeline

+2-1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11

2-
def imgname = 'hubblestack/jenkins:centos-v1.0.8'
2+
def imgname = 'hubblestack/jenkins:centos-v1.0.9'
33

44
pipeline {
55
agent { docker { image "${imgname}" } }
@@ -13,6 +13,7 @@ pipeline {
1313
environment {
1414
PY_COLORS = 1
1515
HS_PROFILE = 1
16+
PY_V = '3.6.10'
1617
}
1718

1819
stages {

hubblestack/hec/dq.py

+12-15
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@
77
import shutil
88
import json
99
from collections import deque
10+
from hubblestack.utils.misc import numbered_file_split_key
11+
from hubblestack.utils.encoding import encode_something_to_bytes, decode_something_to_string
1012

1113
__all__ = [
1214
'QueueTypeError', 'QueueCapacityError', 'MemQueue', 'DiskQueue',
@@ -37,6 +39,7 @@ def check_type(self, item):
3739
if not isinstance(item, self.ok_types):
3840
raise QueueTypeError('type({0}) is not ({1})'.format(type(item), self.ok_types))
3941

42+
4043
class NoQueue(object):
4144
cn = 0
4245
def put(self, *a, **kw):
@@ -69,6 +72,7 @@ def __bool__(self):
6972
__nonzero__ = __bool__ # stupid python2
7073

7174
def compress(self, dat):
75+
dat = encode_something_to_bytes(dat)
7276
if not self.compression:
7377
return dat
7478
def _bz2(x):
@@ -84,7 +88,8 @@ def unlink_(self, fname):
8488
os.unlink(name)
8589

8690
def decompress(self, dat):
87-
if str(dat).startswith('BZ'):
91+
dat = encode_something_to_bytes(dat)
92+
if dat.startswith(b'BZ'):
8893
try:
8994
return bz2.BZ2Decompressor().decompress(dat)
9095
except IOError:
@@ -131,8 +136,6 @@ def put(self, item, **meta):
131136
f = os.path.join(d, remainder)
132137
with open(f, 'wb') as fh:
133138
log.debug('writing item to disk cache')
134-
if isinstance(bstr, str):
135-
bstr = str.encode(bstr)
136139
fh.write(bstr)
137140
if meta:
138141
with open(f + '.meta', 'w') as fh:
@@ -160,23 +163,23 @@ def peek(self):
160163
"""
161164
for fname in self.files:
162165
with open(fname, 'rb') as fh:
163-
return self.decompress(fh.read()), self.read_meta(fname)
166+
return decode_something_to_string(self.decompress(fh.read())), self.read_meta(fname)
164167

165168
def get(self):
166169
""" get the next item from the queue
167170
returns: data_octets, meta_data_dict
168171
"""
169172
for fname in self.files:
170173
with open(fname, 'rb') as fh:
171-
ret = self.decompress(fh.read())
172-
ret = ret, self.read_meta(fname)
174+
dat = self.decompress(fh.read())
175+
mdat = self.read_meta(fname)
173176
sz = os.stat(fname).st_size
174177
self.unlink_(fname)
175178
self.cn -= 1
176179
self.sz -= sz
177180
if self.double_check_cnsz:
178181
self._count(double_check_only=True, tag='get')
179-
return ret
182+
return decode_something_to_string(dat), mdat
180183

181184
def getz(self, sz=SPLUNK_MAX_MSG):
182185
""" fetch items from the queue and concatenate them together using the
@@ -219,7 +222,7 @@ def getz(self, sz=SPLUNK_MAX_MSG):
219222
#
220223
# occasionally this will return something pessimistic
221224
meta_data[k] = max(meta_data[k])
222-
return ret, meta_data
225+
return decode_something_to_string(ret), meta_data
223226

224227
def pop(self):
225228
""" remove the next item from the queue (do not return it); useful with .peek() """
@@ -235,14 +238,8 @@ def pop(self):
235238
@property
236239
def files(self):
237240
""" generate all filenames in the diskqueue (returns iterable) """
238-
def _k(x):
239-
try:
240-
return [int(i) for i in x.split('.')]
241-
except:
242-
pass
243-
return x
244241
for path, dirs, files in sorted(os.walk(self.directory)):
245-
for fname in [os.path.join(path, f) for f in sorted(files, key=_k)]:
242+
for fname in [os.path.join(path, f) for f in sorted(files, key=numbered_file_split_key)]:
246243
if fname.endswith('.meta'):
247244
continue
248245
yield fname

hubblestack/hec/obj.py

+4-3
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
from . dq import DiskQueue, NoQueue, QueueCapacityError
2020
from hubblestack.utils.stdrec import update_payload
21+
from hubblestack.utils.encoding import encode_something_to_bytes
2122

2223
__version__ = '1.0'
2324

@@ -259,7 +260,7 @@ def __init__(self, token, index, http_event_server, host='', http_event_port='80
259260
md5 = hashlib.md5()
260261
uril = sorted([ x.uri for x in self.server_uri ])
261262
for u in uril:
262-
md5.update(u)
263+
md5.update(encode_something_to_bytes(u))
263264
actual_disk_queue = os.path.join(disk_queue, md5.hexdigest())
264265
log.debug("disk_queue for %s: %s", uril, actual_disk_queue)
265266
self.queue = DiskQueue(actual_disk_queue, size=disk_queue_size, compression=disk_queue_compression)
@@ -299,12 +300,12 @@ def _queue_event(self, payload, meta_data=None):
299300
log.error("disk queue is full, dropping payload")
300301

301302

302-
def queueEvent(self, dat, eventtime=''):
303+
def queueEvent(self, dat, eventtime='', no_queue=False):
303304
if not isinstance(dat, Payload):
304305
dat = Payload(dat, eventtime, no_queue=no_queue)
305306
if dat.no_queue: # here you silly hec, queue this no_queue payload...
306307
return
307-
count_input(payload)
308+
count_input(dat)
308309
self._queue_event(dat)
309310

310311
def flushQueue(self):

hubblestack/utils/encoding.py

+12
Original file line numberDiff line numberDiff line change
@@ -40,3 +40,15 @@ def encode_base64(starting_string, format_chained=True, chained=None, chained_st
4040
ret = base64.b64encode(starting_string)
4141

4242
return bool(ret), ret
43+
44+
def encode_something_to_bytes(x):
45+
""" take strings or bytes or whatever and convert to bytes """
46+
if isinstance(x, (bytes,bytearray)):
47+
return x
48+
return x.encode('utf-8')
49+
50+
def decode_something_to_string(x):
51+
""" take strings or bytes or whatever and convert to string """
52+
if isinstance(x, (bytes,bytearray)):
53+
return x.decode('utf-8')
54+
return x

hubblestack/utils/misc.py

+18
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
# coding: utf-8
2+
3+
def numbered_file_split_key(x):
4+
""" for sorting purposes, split filenames like '238048.11', '238048.17',
5+
'238048.0' into lists of integers. E.g.:
6+
7+
for fname in sorted(filenames, key=numbered_file_split_key):
8+
do_things_ordered_by_integer_sort()
9+
"""
10+
try:
11+
return [int(i) for i in x.split('.')]
12+
except:
13+
pass
14+
try:
15+
return [int(x)]
16+
except:
17+
pass
18+
return list()

tests/unittests/test_hec_dq.py

+30-10
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
# coding: utf-8
2+
13
import pytest
24
import os
35

@@ -12,20 +14,33 @@ def samp():
1214

1315
@pytest.fixture
1416
def dq():
15-
return DiskQueue(TEST_DQ_DIR, size=100, fresh=True)
17+
return DiskQueue(TEST_DQ_DIR, fresh=True)
1618

17-
def test_disk_queue(dq):
19+
@pytest.fixture
20+
def dqc():
21+
return DiskQueue(TEST_DQ_DIR + ".bz2", fresh=True, compression=9)
22+
23+
def _test_disk_queue(dq):
1824
borked = False
1925

2026
dq.put('one', testinator=3)
2127
dq.put('two', testinator=4)
2228
dq.put('three', testinator=5)
2329

24-
assert len(dq) == 13
25-
assert dq.peek() == (b'one', {'testinator': 3})
26-
assert dq.get() == (b'one', {'testinator': 3})
27-
assert dq.peek() == (b'two', {'testinator': 4})
28-
assert len(dq) == 9
30+
if not dq.compression:
31+
# NOTE: with the huffman headers (or whatever), the size of the dq is
32+
# probably quite a lot larger than the expected 13. the test is
33+
# essentially meaningless unless the uncompressed message was large
34+
# enough… then we could test to see if the dq len was smaller than
35+
# expected or something… let's just skip this for the compressed dq
36+
assert len(dq) == 13
37+
38+
assert dq.peek() == ('one', {'testinator': 3})
39+
assert dq.get() == ('one', {'testinator': 3})
40+
assert dq.peek() == ('two', {'testinator': 4})
41+
42+
if not dq.compression:
43+
assert len(dq) == 9
2944

3045
assert dq.getz() == ('two three', {'testinator': 5})
3146
assert len(dq) == 0
@@ -37,18 +52,23 @@ def test_disk_queue(dq):
3752
assert dq.getz(8) == ('one two', {})
3853
assert dq.getz(8) == ('three', {})
3954

55+
def test_disk_queue(dq):
56+
_test_disk_queue(dq)
57+
58+
def test_disk_queue_with_compression(dqc):
59+
_test_disk_queue(dqc)
60+
4061
def _test_pop(samp,q):
4162
for i in samp:
4263
q.put(i)
4364
for i in samp:
44-
assert q.peek() == (str.encode(i), {})
65+
assert q.peek() == (i, {})
4566
q.pop()
4667

4768
def test_dq_pop(samp,dq):
4869
_test_pop(samp,dq)
4970

50-
def test_disk_queue_put_estimator():
51-
dq = DiskQueue(TEST_DQ_DIR, fresh=True)
71+
def test_disk_queue_put_estimator(dq):
5272
for item in ['hi-there-{}'.format(x) for x in range(20)]:
5373
pre = dq.cn, dq.sz
5474
dq.put(item)

tests/unittests/test_hec_obj.py

+33
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
# coding: utf-8
2+
3+
import os
4+
import json
5+
import mock
6+
from hubblestack.hec import HEC
7+
8+
TEST_DQ_DIR = os.environ.get('TEST_DQ_DIR', '/tmp/dq.{0}'.format(os.getuid()))
9+
10+
@mock.patch.object(HEC, '_send')
11+
def test_hec__send_trivially(mock_send):
12+
hec = HEC('token', 'index', 'server')
13+
hec.sendEvent({'test': 'test-tacular'})
14+
assert json.loads(mock_send.call_args.args[0].dat)['test'] == 'test-tacular'
15+
16+
@mock.patch.object(HEC, '_send') # just in case, not actually used
17+
def test_queue_things_with_compression(mock_send, __opts__, __salt__):
18+
hec = HEC('token', 'index', 'server',
19+
disk_queue=TEST_DQ_DIR, disk_queue_size=1000,
20+
disk_queue_compression=9)
21+
22+
results_of_side_effect = list()
23+
def side_effect(x):
24+
results_of_side_effect.append(x)
25+
mock_send.side_effect = side_effect
26+
27+
gz = list()
28+
for i in range(100):
29+
dat = {f'event{i}': f'test{i}'}
30+
hec.queueEvent(dat)
31+
gz.append( json.dumps(dat) )
32+
hec.flushQueue()
33+
assert ' '.join(results_of_side_effect) == ' '.join(gz)

0 commit comments

Comments
 (0)