Skip to content

Commit 034aa41

Browse files
authored
Merge pull request #293 from andlaus/refactor_encoding3
Refactor encoding, part 3 (parameters)
2 parents 0f12e21 + e8b4612 commit 034aa41

18 files changed

+430
-286
lines changed

odxtools/basicstructure.py

+61-47
Original file line numberDiff line numberDiff line change
@@ -74,13 +74,13 @@ def get_static_bit_length(self) -> Optional[int]:
7474
return ((length + 7) // 8) * 8
7575

7676
def coded_const_prefix(self, request_prefix: bytes = b'') -> bytes:
77-
prefix = b''
7877
encode_state = EncodeState(
79-
bytearray(prefix), parameter_values={}, triggering_request=request_prefix)
78+
coded_message=bytearray(), parameter_values={}, triggering_request=request_prefix)
79+
8080
for param in self.parameters:
81-
if isinstance(param, (CodedConstParameter, NrcConstParameter, MatchingRequestParameter,
82-
PhysicalConstantParameter)):
83-
encode_state.coded_message = bytearray(param.encode_into_pdu(encode_state))
81+
if (isinstance(param, MatchingRequestParameter) and param.request_byte_position < len(request_prefix)) or \
82+
isinstance(param, (CodedConstParameter, NrcConstParameter, PhysicalConstantParameter)):
83+
param.encode_into_pdu(physical_value=None, encode_state=encode_state)
8484
else:
8585
break
8686
return encode_state.coded_message
@@ -122,52 +122,63 @@ def convert_physical_to_internal(self,
122122
triggering_coded_request: Optional[bytes],
123123
is_end_of_pdu: bool = True) -> bytes:
124124

125+
encode_state = EncodeState(
126+
bytearray(),
127+
parameter_values=cast(Dict[str, ParameterValue], param_value),
128+
triggering_request=triggering_coded_request,
129+
is_end_of_pdu=False)
130+
125131
if not isinstance(param_value, dict):
126-
raise EncodeError(
132+
odxraise(
127133
f"Expected a dictionary for the values of structure {self.short_name}, "
128-
f"got {type(param_value)}")
134+
f"got {type(param_value).__name__}", EncodeError)
135+
elif encode_state.cursor_bit_position != 0:
136+
odxraise(
137+
f"Structures must be byte aligned, but "
138+
f"{self.short_name} requested to be at bit position "
139+
f"{encode_state.cursor_bit_position}", EncodeError)
140+
encode_state.bit_position = 0
129141

130142
# in strict mode, ensure that no values for unknown parameters are specified.
131143
if strict_mode:
132-
param_names = [param.short_name for param in self.parameters]
133-
for param_key in param_value:
134-
if param_key not in param_names:
135-
odxraise(f"Value for unknown parameter '{param_key}' specified")
136-
137-
encode_state = EncodeState(
138-
bytearray(),
139-
dict(param_value),
140-
triggering_request=triggering_coded_request,
141-
is_end_of_pdu=False)
144+
param_names = {param.short_name for param in self.parameters}
145+
for param_value_name in param_value:
146+
if param_value_name not in param_names:
147+
odxraise(f"Value for unknown parameter '{param_value_name}' specified "
148+
f"for structure {self.short_name}")
142149

143150
for param in self.parameters:
144-
if param == self.parameters[-1]:
145-
# The last parameter is at the end of the PDU if the
146-
# structure itself is at the end of the PDU. TODO:
147-
# This assumes that the last parameter specified in
148-
# the ODX is located last in the PDU...
151+
if id(param) == id(self.parameters[-1]):
152+
# The last parameter of the structure is at the end of
153+
# the PDU if the structure itself is at the end of the
154+
# PDU. TODO: This assumes that the last parameter
155+
# specified in the ODX is located last in the PDU...
149156
encode_state.is_end_of_pdu = is_end_of_pdu
150157

151-
if isinstance(
152-
param,
153-
(LengthKeyParameter, TableKeyParameter)) and param.short_name in param_value:
154-
# This is a hack to always encode a dummy value for
155-
# length- and table keys. since these can be specified
158+
if isinstance(param, (LengthKeyParameter, TableKeyParameter)):
159+
# At this point, we encode a placeholder value for length-
160+
# and table keys, since these can be specified
156161
# implicitly (i.e., by means of parameters that use
157-
# these keys), to avoid getting an "overlapping
162+
# these keys). To avoid getting an "overlapping
158163
# parameter" warning, we must encode a value of zero
159164
# into the PDU here and add the real value of the
160-
# parameter in a post processing step.
161-
tmp = encode_state.parameter_values.pop(param.short_name)
162-
encode_state.coded_message = bytearray(param.encode_into_pdu(encode_state))
163-
encode_state.parameter_values[param.short_name] = tmp
165+
# parameter in a post-processing step.
166+
param.encode_placeholder_into_pdu(
167+
physical_value=param_value.get(param.short_name), encode_state=encode_state)
168+
164169
continue
165170

166-
encode_state.coded_message = bytearray(param.encode_into_pdu(encode_state))
171+
if param.is_required and param.short_name not in param_value:
172+
odxraise(f"No value for required parameter {param.short_name} specified",
173+
EncodeError)
167174

168-
if self.byte_size is not None and len(encode_state.coded_message) < self.byte_size:
169-
# Padding bytes needed
170-
encode_state.coded_message = encode_state.coded_message.ljust(self.byte_size, b"\0")
175+
param.encode_into_pdu(
176+
physical_value=param_value.get(param.short_name), encode_state=encode_state)
177+
178+
if self.byte_size is not None:
179+
if len(encode_state.coded_message) < self.byte_size:
180+
# Padding bytes needed
181+
encode_state.coded_message = encode_state.coded_message.ljust(self.byte_size, b"\0")
171182

172183
# encode the length- and table keys. This cannot be done above
173184
# because we allow these to be defined implicitly (i.e. they
@@ -177,22 +188,25 @@ def convert_physical_to_internal(self,
177188
# the current parameter is neither a length- nor a table key
178189
continue
179190

180-
# Encode the key parameter into the message
181-
encode_state.coded_message = bytearray(param.encode_into_pdu(encode_state))
191+
# Encode the value of the key parameter into the message
192+
param.encode_value_into_pdu(encode_state=encode_state)
182193

183194
# Assert that length is as expected
184-
self._validate_coded_message(encode_state.coded_message)
195+
self._validate_coded_message_size(encode_state.cursor_byte_position -
196+
encode_state.origin_byte_position)
185197

186-
return bytearray(encode_state.coded_message)
198+
return encode_state.coded_message
187199

188-
def _validate_coded_message(self, coded_message: bytes) -> None:
200+
def _validate_coded_message_size(self, coded_byte_len: int) -> None:
189201

190202
if self.byte_size is not None:
191203
# We definitely broke something if we didn't respect the explicit byte_size
192-
odxassert(
193-
len(coded_message) == self.byte_size,
194-
"Verification of coded message {coded_message.hex()} failed: Incorrect size.")
195-
# No need to check further
204+
if self.byte_size != coded_byte_len:
205+
warnings.warn(
206+
"Verification of coded message failed: Incorrect size.",
207+
OdxWarning,
208+
stacklevel=1)
209+
196210
return
197211

198212
bit_length = self.get_static_bit_length()
@@ -201,12 +215,12 @@ def _validate_coded_message(self, coded_message: bytes) -> None:
201215
# Nothing to check
202216
return
203217

204-
if len(coded_message) * 8 != bit_length:
218+
if coded_byte_len * 8 != bit_length:
205219
# We may have broke something
206220
# but it could be that bit_length was mis calculated and not the actual bytes are wrong
207221
# Could happen with overlapping parameters and parameters with gaps
208222
warnings.warn(
209-
f"Verification of coded message '{coded_message.hex()}' possibly failed: Size may be incorrect.",
223+
"Verification of coded message possibly failed: Size may be incorrect.",
210224
OdxWarning,
211225
stacklevel=1)
212226

odxtools/dynamiclengthfield.py

+22-14
Original file line numberDiff line numberDiff line change
@@ -60,30 +60,38 @@ def convert_physical_to_bytes(
6060
f"Expected a list of values for dynamic length field {self.short_name}, "
6161
f"got {type(physical_value)}", EncodeError)
6262

63+
tmp_state = EncodeState(
64+
coded_message=bytearray(),
65+
parameter_values={},
66+
triggering_request=encode_state.triggering_request,
67+
origin_byte_position=0,
68+
cursor_byte_position=0,
69+
cursor_bit_position=0)
70+
6371
det_num_items = self.determine_number_of_items
64-
field_len = det_num_items.dop.convert_physical_to_bytes(
65-
len(physical_value), encode_state, det_num_items.bit_position or 0)
72+
field_len_bytes = det_num_items.dop.convert_physical_to_bytes(
73+
len(physical_value), tmp_state, det_num_items.bit_position or 0)
6674

6775
# hack to emplace the length specifier at the correct location
68-
tmp = encode_state.coded_message
69-
encode_state.coded_message = bytearray()
70-
encode_state.cursor_byte_position = det_num_items.byte_position
71-
encode_state.emplace_atomic_value(field_len, self.short_name + ".num_items")
72-
result = encode_state.coded_message
73-
encode_state.coded_message = tmp
76+
tmp_state.cursor_byte_position = det_num_items.byte_position
77+
tmp_state.cursor_bit_position = det_num_items.bit_position or 0
78+
tmp_state.emplace_atomic_value(field_len_bytes, self.short_name + ".num_items")
7479

7580
# if required, add padding between the length specifier and
7681
# the first item
77-
if len(result) < self.offset:
78-
result.extend([0] * (self.offset - len(result)))
79-
elif len(result) > self.offset:
82+
if len(tmp_state.coded_message) < self.offset:
83+
tmp_state.coded_message += b'\00' * (self.offset - len(tmp_state.coded_message))
84+
tmp_state.cursor_byte_position = self.offset
85+
elif len(field_len_bytes) > self.offset:
8086
odxraise(f"The length specifier of field {self.short_name} overlaps "
8187
f"with the first item!")
88+
tmp_state.cursor_byte_position = self.offset
8289

83-
for value in physical_value:
84-
result += self.structure.convert_physical_to_bytes(value, encode_state)
90+
for i, value in enumerate(physical_value):
91+
tmp_bytes = self.structure.convert_physical_to_bytes(value, tmp_state)
92+
tmp_state.emplace_atomic_value(tmp_bytes, f"{self.short_name}{i}")
8593

86-
return result
94+
return tmp_state.coded_message
8795

8896
def decode_from_pdu(self, decode_state: DecodeState) -> ParameterValue:
8997

odxtools/parameters/codedconstparameter.py

+14-21
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,9 @@
1010
from ..decodestate import DecodeState
1111
from ..diagcodedtype import DiagCodedType
1212
from ..encodestate import EncodeState
13-
from ..exceptions import DecodeError, odxrequire
13+
from ..exceptions import DecodeError, EncodeError, odxraise, odxrequire
1414
from ..odxlink import OdxDocFragment, OdxLinkDatabase, OdxLinkId
15-
from ..odxtypes import AtomicOdxType, DataType
15+
from ..odxtypes import AtomicOdxType, DataType, ParameterValue
1616
from ..utils import dataclass_fields_asdict
1717
from .parameter import Parameter, ParameterType
1818

@@ -81,30 +81,23 @@ def is_settable(self) -> bool:
8181
return False
8282

8383
@override
84-
def get_coded_value_as_bytes(self, encode_state: EncodeState) -> bytes:
85-
if (self.short_name in encode_state.parameter_values and
86-
encode_state.parameter_values[self.short_name] != self.coded_value):
87-
raise TypeError(f"The parameter '{self.short_name}' is constant {self._coded_value_str}"
88-
" and thus can not be changed.")
89-
90-
tmp_state = EncodeState(
91-
bytearray(),
92-
encode_state.parameter_values,
93-
triggering_request=encode_state.triggering_request,
94-
is_end_of_pdu=False,
95-
cursor_byte_position=0,
96-
cursor_bit_position=0,
97-
origin_byte_position=0)
98-
encode_state.cursor_bit_position = self.bit_position or 0
99-
self.diag_coded_type.encode_into_pdu(self.coded_value, encode_state=tmp_state)
100-
101-
return tmp_state.coded_message
84+
def _encode_positioned_into_pdu(self, physical_value: Optional[ParameterValue],
85+
encode_state: EncodeState) -> None:
86+
if physical_value is not None and physical_value != self.coded_value:
87+
odxraise(
88+
f"Value for constant parameter `{self.short_name}` name can "
89+
f"only be specified as {self.coded_value!r} (is: {physical_value!r})", EncodeError)
90+
91+
internal_value = self.coded_value
92+
93+
self.diag_coded_type.encode_into_pdu(
94+
internal_value=internal_value, encode_state=encode_state)
10295

10396
@override
10497
def _decode_positioned_from_pdu(self, decode_state: DecodeState) -> AtomicOdxType:
10598
coded_val = self.diag_coded_type.decode_from_pdu(decode_state)
10699

107-
# Check if the coded value in the message is correct.
100+
# Check if the coded value contained by the message is correct.
108101
if self.coded_value != coded_val:
109102
warnings.warn(
110103
f"Coded constant parameter does not match! "

odxtools/parameters/dynamicparameter.py

+5-4
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
# SPDX-License-Identifier: MIT
22
from dataclasses import dataclass
3-
from typing import List
3+
from typing import List, Optional
44
from xml.etree import ElementTree
55

66
from typing_extensions import override
@@ -41,9 +41,10 @@ def is_settable(self) -> bool:
4141
raise NotImplementedError(".is_settable for a DynamicParameter")
4242

4343
@override
44-
def get_coded_value_as_bytes(self, encode_state: EncodeState) -> bytes:
45-
raise NotImplementedError("Encoding a DynamicParameter is not implemented yet.")
44+
def _encode_positioned_into_pdu(self, physical_value: Optional[ParameterValue],
45+
encode_state: EncodeState) -> None:
46+
raise NotImplementedError("Encoding DynamicParameter is not implemented yet.")
4647

4748
@override
4849
def _decode_positioned_from_pdu(self, decode_state: DecodeState) -> ParameterValue:
49-
raise NotImplementedError("Decoding a DynamicParameter is not implemented yet.")
50+
raise NotImplementedError("Decoding DynamicParameter is not implemented yet.")

odxtools/parameters/lengthkeyparameter.py

+68-14
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,13 @@
11
# SPDX-License-Identifier: MIT
22
from dataclasses import dataclass
3-
from typing import TYPE_CHECKING, Any, Dict, List
3+
from typing import TYPE_CHECKING, Any, Dict, List, Optional
44
from xml.etree import ElementTree
55

6-
from typing_extensions import override
6+
from typing_extensions import final, override
77

88
from ..decodestate import DecodeState
99
from ..encodestate import EncodeState
10-
from ..exceptions import odxraise, odxrequire
10+
from ..exceptions import EncodeError, odxraise, odxrequire
1111
from ..odxlink import OdxDocFragment, OdxLinkDatabase, OdxLinkId
1212
from ..odxtypes import ParameterValue
1313
from ..utils import dataclass_fields_asdict
@@ -77,19 +77,73 @@ def is_settable(self) -> bool:
7777
return True
7878

7979
@override
80-
def get_coded_value_as_bytes(self, encode_state: EncodeState) -> bytes:
81-
physical_value = encode_state.length_keys.get(self.short_name)
82-
if physical_value is None:
83-
physical_value = encode_state.parameter_values.get(self.short_name, 0)
80+
@final
81+
def _encode_positioned_into_pdu(self, physical_value: Optional[ParameterValue],
82+
encode_state: EncodeState) -> None:
83+
# if you get this exception, you ought to use
84+
# `.encode_placeholder_into_pdu()` followed by (after the
85+
# value of the length key has been determined)
86+
# `.encode_value_into_pdu()`.
87+
raise RuntimeError("_encode_positioned_into_pdu() cannot be called for length keys.")
88+
89+
def encode_placeholder_into_pdu(self, physical_value: Optional[ParameterValue],
90+
encode_state: EncodeState) -> None:
91+
92+
if physical_value is not None:
93+
if not self.dop.is_valid_physical_value(physical_value):
94+
odxraise(f"Invalid explicitly specified physical value '{physical_value!r}' "
95+
f"for length key '{self.short_name}'.")
96+
97+
lkv = encode_state.length_keys.get(self.short_name)
98+
if lkv is not None and lkv != physical_value:
99+
odxraise(f"Got conflicting values for length key {self.short_name}: "
100+
f"{lkv} and {physical_value!r}")
101+
102+
if not isinstance(physical_value, int):
103+
odxraise(
104+
f"Value of length key {self.short_name} is of type {type(physical_value).__name__} "
105+
f"instead of int")
106+
107+
encode_state.length_keys[self.short_name] = physical_value
108+
109+
orig_cursor = encode_state.cursor_byte_position
110+
pos = encode_state.cursor_byte_position
111+
if self.byte_position is not None:
112+
pos = encode_state.origin_byte_position + self.byte_position
113+
encode_state.key_pos[self.short_name] = pos
114+
encode_state.cursor_byte_position = pos
84115

85116
bit_pos = self.bit_position or 0
86-
dop = odxrequire(super().dop,
87-
f"A DOP is required for length key parameter {self.short_name}")
88-
return dop.convert_physical_to_bytes(physical_value, encode_state, bit_position=bit_pos)
89-
90-
@override
91-
def encode_into_pdu(self, encode_state: EncodeState) -> bytes:
92-
return super().encode_into_pdu(encode_state)
117+
bit_size = self.dop.get_static_bit_length()
118+
if bit_size is None:
119+
odxraise("The DOP of length key {self.short_name} must exhibit a fixed size.",
120+
EncodeError)
121+
return
122+
123+
raw_data = b'\x00' * ((bit_pos + bit_size + 7) // 8)
124+
encode_state.emplace_atomic_value(raw_data, self.short_name)
125+
126+
encode_state.cursor_byte_position = max(encode_state.cursor_byte_position, orig_cursor)
127+
encode_state.cursor_bit_position = 0
128+
129+
def encode_value_into_pdu(self, encode_state: EncodeState) -> None:
130+
131+
if self.short_name not in encode_state.length_keys:
132+
odxraise(
133+
f"Length key {self.short_name} has not been defined before "
134+
f"it is required.", EncodeError)
135+
return
136+
else:
137+
physical_value = encode_state.length_keys[self.short_name]
138+
139+
encode_state.cursor_byte_position = encode_state.key_pos[self.short_name]
140+
encode_state.cursor_bit_position = self.bit_position or 0
141+
142+
raw_data = self.dop.convert_physical_to_bytes(
143+
physical_value=odxrequire(physical_value),
144+
encode_state=encode_state,
145+
bit_position=encode_state.cursor_bit_position)
146+
encode_state.emplace_atomic_value(raw_data, self.short_name)
93147

94148
@override
95149
def _decode_positioned_from_pdu(self, decode_state: DecodeState) -> ParameterValue:

0 commit comments

Comments
 (0)