Skip to content

Commit 77df9c4

Browse files
committed
added raw output, cleanup
1 parent ad3d520 commit 77df9c4

File tree

3 files changed

+8
-16
lines changed

3 files changed

+8
-16
lines changed

daliuge-engine/dlg/apps/pyfunc.py

+2
Original file line numberDiff line numberDiff line change
@@ -702,6 +702,8 @@ def write_results(self, result):
702702
o.write(repr(r).encode("utf-8"))
703703
elif self.output_parser is DropParser.NPY:
704704
drop_loaders.save_npy(o, r)
705+
elif self.output_parser is DropParser.RAW:
706+
o.write(r)
705707
else:
706708
ValueError(self.output_parser.__repr__())
707709

daliuge-engine/dlg/data/drops/data_base.py

+4-8
Original file line numberDiff line numberDiff line change
@@ -168,15 +168,13 @@ def read(self, descriptor, count=65536, **kwargs):
168168
def _checkStateAndDescriptor(self, descriptor):
169169
if self.status != DROPStates.COMPLETED:
170170
raise Exception(
171-
"%r is in state %s (!=COMPLETED), cannot be read"
172-
% (self, self.status)
171+
"%r is in state %s (!=COMPLETED), cannot be read" % (self, self.status)
173172
)
174173
if descriptor is None:
175174
raise ValueError("Illegal empty descriptor given")
176175
if descriptor not in self._rios:
177176
raise Exception(
178-
"Illegal descriptor %d given, remember to open() first"
179-
% (descriptor)
177+
"Illegal descriptor %d given, remember to open() first" % (descriptor)
180178
)
181179

182180
def isBeingRead(self):
@@ -200,10 +198,8 @@ def write(self, data: Union[bytes, memoryview], **kwargs):
200198
if self.status not in [DROPStates.INITIALIZED, DROPStates.WRITING]:
201199
raise Exception("No more writing expected")
202200

203-
if not isinstance(data, (bytes, memoryview)):
204-
raise Exception(
205-
"Data type not of binary type: %s", type(data).__name__
206-
)
201+
if not isinstance(data, (bytes, memoryview, str)):
202+
raise Exception("Data type not of binary type: ", type(data).__name__)
207203

208204
# We lazily initialize our writing IO instance because the data of this
209205
# DROP might not be written through this DROP

daliuge-engine/dlg/droputils.py

+2-8
Original file line numberDiff line numberDiff line change
@@ -23,12 +23,10 @@
2323
Utility methods and classes to be used when interacting with DROPs
2424
"""
2525

26-
import base64
2726
import collections
2827
import io
2928
import time
3029
import logging
31-
import pickle
3230
import re
3331
import threading
3432
import traceback
@@ -139,9 +137,7 @@ def allDropContents(drop, bufsize=65536) -> bytes:
139137
return buf.getvalue()
140138

141139

142-
def copyDropContents(
143-
source: "DataDROP", target: "DataDROP", bufsize: int = 65536
144-
):
140+
def copyDropContents(source: "DataDROP", target: "DataDROP", bufsize: int = 65536):
145141
"""
146142
Manually copies data from one DROP into another, in bufsize steps
147143
"""
@@ -151,9 +147,7 @@ def copyDropContents(
151147
logger.debug("Read %d bytes from %s", len(buf), repr(source))
152148
st = time.time()
153149
ssize = source.size if source.size is not None else -1
154-
logger.debug(
155-
"Source size: %s; Source checksum: %s", ssize, source.checksum
156-
)
150+
logger.debug("Source size: %s; Source checksum: %s", ssize, source.checksum)
157151
tot_w = 0
158152
ofl = True
159153
# target._expectedSize = ssize

0 commit comments

Comments
 (0)