7
7
import shutil
8
8
import json
9
9
from collections import deque
10
+ from hubblestack .utils .misc import numbered_file_split_key
11
+ from hubblestack .utils .encoding import encode_something_to_bytes , decode_something_to_string
10
12
11
13
__all__ = [
12
14
'QueueTypeError' , 'QueueCapacityError' , 'MemQueue' , 'DiskQueue' ,
@@ -37,6 +39,7 @@ def check_type(self, item):
37
39
if not isinstance (item , self .ok_types ):
38
40
raise QueueTypeError ('type({0}) is not ({1})' .format (type (item ), self .ok_types ))
39
41
42
+
40
43
class NoQueue (object ):
41
44
cn = 0
42
45
def put (self , * a , ** kw ):
@@ -49,22 +52,6 @@ def __bool__(self):
49
52
return False
50
53
__nonzero__ = __bool__ # stupid python2
51
54
52
- def numbered_file_split_key (x ):
53
- """ for sorting purposes, split filenames like '238048.11', '238048.17',
54
- '238048.0' into lists of integers. E.g.:
55
-
56
- for fname in sorted(filenames, key=numbered_file_split_key):
57
- do_things_ordered_by_integer_sort()
58
- """
59
- try :
60
- return [int (i ) for i in x .split ('.' )]
61
- except :
62
- pass
63
- try :
64
- return [int (x )]
65
- except :
66
- pass
67
- return list ()
68
55
69
56
class DiskQueue (OKTypesMixin ):
70
57
sep = b' '
@@ -85,6 +72,7 @@ def __bool__(self):
85
72
__nonzero__ = __bool__ # stupid python2
86
73
87
74
def compress (self , dat ):
75
+ dat = encode_something_to_bytes (dat )
88
76
if not self .compression :
89
77
return dat
90
78
def _bz2 (x ):
@@ -100,7 +88,8 @@ def unlink_(self, fname):
100
88
os .unlink (name )
101
89
102
90
def decompress (self , dat ):
103
- if str (dat ).startswith ('BZ' ):
91
+ dat = encode_something_to_bytes (dat )
92
+ if dat .startswith (b'BZ' ):
104
93
try :
105
94
return bz2 .BZ2Decompressor ().decompress (dat )
106
95
except IOError :
@@ -147,8 +136,6 @@ def put(self, item, **meta):
147
136
f = os .path .join (d , remainder )
148
137
with open (f , 'wb' ) as fh :
149
138
log .debug ('writing item to disk cache' )
150
- if isinstance (bstr , str ):
151
- bstr = str .encode (bstr )
152
139
fh .write (bstr )
153
140
if meta :
154
141
with open (f + '.meta' , 'w' ) as fh :
@@ -176,23 +163,23 @@ def peek(self):
176
163
"""
177
164
for fname in self .files :
178
165
with open (fname , 'rb' ) as fh :
179
- return self .decompress (fh .read ()), self .read_meta (fname )
166
+ return decode_something_to_string ( self .decompress (fh .read () )), self .read_meta (fname )
180
167
181
168
def get (self ):
182
169
""" get the next item from the queue
183
170
returns: data_octets, meta_data_dict
184
171
"""
185
172
for fname in self .files :
186
173
with open (fname , 'rb' ) as fh :
187
- ret = self .decompress (fh .read ())
188
- ret = ret , self .read_meta (fname )
174
+ dat = self .decompress (fh .read ())
175
+ mdat = self .read_meta (fname )
189
176
sz = os .stat (fname ).st_size
190
177
self .unlink_ (fname )
191
178
self .cn -= 1
192
179
self .sz -= sz
193
180
if self .double_check_cnsz :
194
181
self ._count (double_check_only = True , tag = 'get' )
195
- return ret
182
+ return decode_something_to_string ( dat ), mdat
196
183
197
184
def getz (self , sz = SPLUNK_MAX_MSG ):
198
185
""" fetch items from the queue and concatenate them together using the
@@ -235,7 +222,7 @@ def getz(self, sz=SPLUNK_MAX_MSG):
235
222
#
236
223
# occasionally this will return something pessimistic
237
224
meta_data [k ] = max (meta_data [k ])
238
- return ret , meta_data
225
+ return decode_something_to_string ( ret ) , meta_data
239
226
240
227
def pop (self ):
241
228
""" remove the next item from the queue (do not return it); useful with .peek() """
0 commit comments