Skip to content

Commit 989f07e

Browse files
authored
Merge pull request #310 from ICRAR/binary_fix
Fix writing of BytesIO data
2 parents 76436f8 + a991c1a commit 989f07e

File tree

3 files changed

+41
-34
lines changed

3 files changed

+41
-34
lines changed

daliuge-engine/dlg/data/drops/memory.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ def parse_pydata(pd_dict: dict) -> bytes:
4242
:returns a byte encoded value
4343
"""
4444
pydata = pd_dict["value"]
45-
logger.debug(f"pydata value provided: {pydata}, {type(pydata)}")
45+
logger.debug(f"pydata value provided: {pydata}, {pd_dict['type'].lower()}")
4646

4747
if pd_dict["type"].lower() == "json":
4848
try:

daliuge-engine/dlg/drop_loaders.py

Lines changed: 22 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,8 @@ def load_numpy(drop: "DataDROP", allow_pickle=True):
102102

103103

104104
import dill
105+
106+
105107
def load_dill(drop: "DataDROP"):
106108
"""
107109
Load dill
@@ -116,28 +118,30 @@ def load_dill(drop: "DataDROP"):
116118
drop.close(desc)
117119
return dill.loads(buf.getbuffer())
118120

119-
def load_binary(drop: "DataDROP"):
120-
"""
121-
Load binary
122-
"""
123-
buf = io.BytesIO()
124-
desc = drop.open()
125-
read = True
126-
while read:
127-
data = drop.read(desc)
128-
if data:
129-
buf.write(data)
130-
drop.close(desc)
131-
return buf.getvalue().decode()
132-
133-
return 0
121+
122+
def load_binary(drop: "DataDROP"):
123+
"""
124+
Load binary
125+
"""
126+
buf = io.BytesIO()
127+
desc = drop.open()
128+
read = True
129+
while read:
130+
data = drop.read(desc)
131+
if data:
132+
buf.write(data)
133+
drop.close(desc)
134+
return buf.getvalue().decode()
135+
136+
return 0
137+
134138

135139
def save_binary(drop: "DataDROP", data: bytes):
136140
"""
137-
Save binary
141+
Save binary
138142
"""
139143
bytes_data = io.BytesIO(data)
140144
dropio = drop.getIO()
141145
dropio.open(OpenMode.OPEN_WRITE)
142-
dropio.write(bytes_data)
143-
dropio.close()
146+
dropio.write(bytes_data.getbuffer())
147+
dropio.close()

daliuge-engine/dlg/named_port_utils.py

Lines changed: 18 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
class DropParser(Enum):
1313
RAW = "raw"
1414
PICKLE = "pickle"
15-
EVAL = "eval"
15+
EVAL = "eval"
1616
NPY = "npy"
1717
DILL = "dill"
1818
# JSON = "json"
@@ -68,9 +68,7 @@ def serialize_applicationArgs(applicationArgs, prefix="--", separator=" "):
6868
positional arguments and one for kw arguments that can be used to construct
6969
the final command line.
7070
"""
71-
applicationArgs = clean_applicationArgs(
72-
applicationArgs
73-
)
71+
applicationArgs = clean_applicationArgs(applicationArgs)
7472
pargs = []
7573
kwargs = {}
7674
for name, vdict in applicationArgs.items():
@@ -144,7 +142,7 @@ def identify_named_ports(
144142
if value is None:
145143
value = "" # make sure we are passing NULL drop events
146144
if key in positionalArgs:
147-
encoding = DropParser(positionalPortArgs[key]['encoding'])
145+
encoding = DropParser(positionalPortArgs[key]["encoding"])
148146
parser = get_port_reader_function(encoding)
149147
if parser:
150148
logger.debug("Reading from port using %s", parser.__repr__())
@@ -156,7 +154,7 @@ def identify_named_ports(
156154
if addPositionalToKeyword:
157155
keywordPortArgs.update({key: positionalPortArgs[key]})
158156
elif key in keywordArgs:
159-
encoding = DropParser(keywordArgs[key]['encoding'])
157+
encoding = DropParser(keywordArgs[key]["encoding"])
160158
parser = get_port_reader_function(encoding)
161159
if parser:
162160
logger.debug("Reading from port using %s", parser.__repr__())
@@ -299,12 +297,14 @@ def replace_named_ports(
299297
keywordArgs = _get_args(appArgs, positional=False)
300298

301299
# Extract values from dictionaries - "encoding" etc. are irrelevant
302-
appArgs = {arg: subdict['value'] for arg, subdict in appArgs.items()}
303-
positionalArgs = {arg: subdict['value'] for arg, subdict in positionalArgs.items()}
304-
keywordArgs = {arg: subdict['value'] for arg, subdict in keywordArgs.items()}
305-
keywordPortArgs = {arg: subdict['value'] for arg, subdict in keywordPortArgs.items()}
306-
307-
# Construct the final keywordArguments and positionalPortArguments
300+
appArgs = {arg: subdict["value"] for arg, subdict in appArgs.items()}
301+
positionalArgs = {arg: subdict["value"] for arg, subdict in positionalArgs.items()}
302+
keywordArgs = {arg: subdict["value"] for arg, subdict in keywordArgs.items()}
303+
keywordPortArgs = {
304+
arg: subdict["value"] for arg, subdict in keywordPortArgs.items()
305+
}
306+
307+
# Construct the final keywordArguments and positionalPortArguments
308308
for k, v in keywordPortArgs.items():
309309
if v not in [None, ""]:
310310
keywordArgs.update({k: v})
@@ -376,8 +376,10 @@ def _get_args(appArgs, positional=False):
376376
Separate out the arguments dependening on if we want positional or keyword style
377377
"""
378378
args = {
379-
arg: {"value": appArgs[arg]["value"],
380-
"encoding": appArgs[arg].get("encoding", "dill")}
379+
arg: {
380+
"value": appArgs[arg]["value"],
381+
"encoding": appArgs[arg].get("encoding", "dill"),
382+
}
381383
for arg in appArgs
382384
if (appArgs[arg]["positional"] == positional)
383385
}
@@ -386,6 +388,7 @@ def _get_args(appArgs, positional=False):
386388
logger.debug("%s arguments: %s", argType, args)
387389
return args
388390

391+
389392
def get_port_reader_function(input_parser: DropParser):
390393
"""
391394
Return the function used to read input from a named port
@@ -413,7 +416,7 @@ def optionalEval(x):
413416
elif input_parser is DropParser.DILL:
414417
reader = drop_loaders.load_dill
415418
elif input_parser is DropParser.BINARY:
416-
reader = drop_loaders.load_binary
419+
reader = drop_loaders.load_binary
417420
else:
418421
raise ValueError(input_parser.__repr__())
419422
return reader

0 commit comments

Comments
 (0)