1
1
# SPDX-License-Identifier: MIT
2
2
import typing
3
- from typing import List , Optional , runtime_checkable
3
+ from typing import Optional , runtime_checkable
4
4
5
5
from .decodestate import DecodeState
6
6
from .encodestate import EncodeState
7
- from .exceptions import EncodeError , odxraise
8
7
from .odxtypes import ParameterValue
9
- from .parameters .codedconstparameter import CodedConstParameter
10
- from .parameters .matchingrequestparameter import MatchingRequestParameter
11
- from .parameters .parameter import Parameter
12
- from .parameters .physicalconstantparameter import PhysicalConstantParameter
13
8
14
9
15
10
@runtime_checkable
@@ -31,181 +26,3 @@ def decode_from_pdu(self, decode_state: DecodeState) -> ParameterValue:
31
26
32
27
def get_static_bit_length (self ) -> Optional [int ]:
33
28
...
34
-
35
-
36
- @runtime_checkable
37
- class CompositeCodec (Codec , typing .Protocol ):
38
- """Any object which can be en- or decoded to be transferred over
39
- the wire which is composed of multiple parameter implements this
40
- API.
41
-
42
- """
43
-
44
- @property
45
- def parameters (self ) -> List [Parameter ]:
46
- return []
47
-
48
- @property
49
- def required_parameters (self ) -> List [Parameter ]:
50
- return []
51
-
52
- @property
53
- def free_parameters (self ) -> List [Parameter ]:
54
- return []
55
-
56
-
57
- # some helper functions useful for composite codec objects
58
- def composite_codec_get_static_bit_length (codec : CompositeCodec ) -> Optional [int ]:
59
- """Compute the length of a composite codec object in bits
60
-
61
- This is basically the sum of the lengths of all parameters. If the
62
- length of any parameter can only determined at runtime, `None` is
63
- returned.
64
- """
65
-
66
- cursor = 0
67
- byte_length = 0
68
- for param in codec .parameters :
69
- param_bit_length = param .get_static_bit_length ()
70
- if param_bit_length is None :
71
- # We were not able to calculate a static bit length
72
- return None
73
- elif param .byte_position is not None :
74
- cursor = param .byte_position
75
-
76
- cursor += ((param .bit_position or 0 ) + param_bit_length + 7 ) // 8
77
- byte_length = max (byte_length , cursor )
78
-
79
- return byte_length * 8
80
-
81
-
82
- def composite_codec_get_required_parameters (codec : CompositeCodec ) -> List [Parameter ]:
83
- """Return the list of parameters which are required to be
84
- specified for encoding the composite codec object
85
-
86
- I.e., all free parameters that do not exhibit a default value.
87
- """
88
- return [p for p in codec .parameters if p .is_required ]
89
-
90
-
91
- def composite_codec_get_free_parameters (codec : CompositeCodec ) -> List [Parameter ]:
92
- """Return the list of parameters which can be freely specified by
93
- the user when encoding the composite codec object
94
-
95
- This means all required parameters plus parameters that can be
96
- omitted because they specify a default.
97
- """
98
- return [p for p in codec .parameters if p .is_settable ]
99
-
100
-
101
- def composite_codec_get_coded_const_prefix (codec : CompositeCodec ,
102
- request_prefix : bytes = b'' ) -> bytes :
103
- encode_state = EncodeState (coded_message = bytearray (), triggering_request = request_prefix )
104
-
105
- for param in codec .parameters :
106
- if (isinstance (param , MatchingRequestParameter ) and param .request_byte_position < len (request_prefix )) or \
107
- isinstance (param , (CodedConstParameter , PhysicalConstantParameter )):
108
- param .encode_into_pdu (physical_value = None , encode_state = encode_state )
109
- else :
110
- break
111
-
112
- return encode_state .coded_message
113
-
114
-
115
- def composite_codec_encode_into_pdu (codec : CompositeCodec , physical_value : Optional [ParameterValue ],
116
- encode_state : EncodeState ) -> None :
117
- from .parameters .lengthkeyparameter import LengthKeyParameter
118
- from .parameters .tablekeyparameter import TableKeyParameter
119
-
120
- if not isinstance (physical_value , dict ):
121
- odxraise (
122
- f"Expected a dictionary for the values of { codec .short_name } , "
123
- f"got { type (physical_value ).__name__ } " , EncodeError )
124
- elif encode_state .cursor_bit_position != 0 :
125
- odxraise (
126
- f"Compositional codec objecs must be byte aligned, but "
127
- f"{ codec .short_name } requested to be at bit position "
128
- f"{ encode_state .cursor_bit_position } " , EncodeError )
129
- encode_state .bit_position = 0
130
-
131
- orig_origin = encode_state .origin_byte_position
132
- encode_state .origin_byte_position = encode_state .cursor_byte_position
133
-
134
- orig_is_end_of_pdu = encode_state .is_end_of_pdu
135
- encode_state .is_end_of_pdu = False
136
-
137
- # ensure that no values for unknown parameters are specified.
138
- if not encode_state .allow_unknown_parameters :
139
- param_names = {param .short_name for param in codec .parameters }
140
- for param_value_name in physical_value :
141
- if param_value_name not in param_names :
142
- odxraise (f"Value for unknown parameter '{ param_value_name } ' specified "
143
- f"for composite codec object { codec .short_name } " )
144
-
145
- for param in codec .parameters :
146
- if id (param ) == id (codec .parameters [- 1 ]):
147
- # The last parameter of the composite codec object is at
148
- # the end of the PDU if the codec object itself is at the
149
- # end of the PDU.
150
- #
151
- # TODO: This assumes that the last parameter specified in
152
- # the ODX is located last in the PDU...
153
- encode_state .is_end_of_pdu = orig_is_end_of_pdu
154
-
155
- if isinstance (param , (LengthKeyParameter , TableKeyParameter )):
156
- # At this point, we encode a placeholder value for length-
157
- # and table keys, since these can be specified
158
- # implicitly (i.e., by means of parameters that use
159
- # these keys). To avoid getting an "overlapping
160
- # parameter" warning, we must encode a value of zero
161
- # into the PDU here and add the real value of the
162
- # parameter in a post-processing step.
163
- param .encode_placeholder_into_pdu (
164
- physical_value = physical_value .get (param .short_name ), encode_state = encode_state )
165
-
166
- continue
167
-
168
- if param .is_required and param .short_name not in physical_value :
169
- odxraise (f"No value for required parameter { param .short_name } specified" , EncodeError )
170
-
171
- param_phys_value = physical_value .get (param .short_name )
172
- param .encode_into_pdu (physical_value = param_phys_value , encode_state = encode_state )
173
-
174
- encode_state .journal .append ((param , param_phys_value ))
175
-
176
- encode_state .is_end_of_pdu = False
177
-
178
- # encode the length- and table keys. This cannot be done above
179
- # because we allow these to be defined implicitly (i.e. they
180
- # are defined by their respective users)
181
- for param in codec .parameters :
182
- if not isinstance (param , (LengthKeyParameter , TableKeyParameter )):
183
- # the current parameter is neither a length- nor a table key
184
- continue
185
-
186
- # Encode the value of the key parameter into the message
187
- param .encode_value_into_pdu (encode_state = encode_state )
188
-
189
- encode_state .origin_byte_position = orig_origin
190
-
191
-
192
- def composite_codec_decode_from_pdu (codec : CompositeCodec ,
193
- decode_state : DecodeState ) -> ParameterValue :
194
- # move the origin since positions specified by sub-parameters of
195
- # composite codec objects are relative to the beginning of the
196
- # object.
197
- orig_origin = decode_state .origin_byte_position
198
- decode_state .origin_byte_position = decode_state .cursor_byte_position
199
-
200
- result = {}
201
- for param in codec .parameters :
202
- value = param .decode_from_pdu (decode_state )
203
-
204
- decode_state .journal .append ((param , value ))
205
- result [param .short_name ] = value
206
-
207
- # decoding of the composite codec object finished. go back the
208
- # original origin.
209
- decode_state .origin_byte_position = orig_origin
210
-
211
- return result
0 commit comments