1
1
from collections .abc import Callable , Sequence
2
- from typing import Literal
2
+ from typing import Literal , Union
3
3
4
4
from core .file import File
5
5
from core .variables import ArrayFileSegment , ArrayNumberSegment , ArrayStringSegment
9
9
from models .workflow import WorkflowNodeExecutionStatus
10
10
11
11
from .entities import ListOperatorNodeData
12
+ from .exc import InvalidConditionError , InvalidFilterValueError , InvalidKeyError , ListOperatorError
12
13
13
14
14
15
class ListOperatorNode (BaseNode [ListOperatorNodeData ]):
@@ -26,7 +27,17 @@ def _run(self):
26
27
return NodeRunResult (
27
28
status = WorkflowNodeExecutionStatus .FAILED , error = error_message , inputs = inputs , outputs = outputs
28
29
)
29
- if variable .value and not isinstance (variable , ArrayFileSegment | ArrayNumberSegment | ArrayStringSegment ):
30
+ if not variable .value :
31
+ inputs = {"variable" : []}
32
+ process_data = {"variable" : []}
33
+ outputs = {"result" : [], "first_record" : None , "last_record" : None }
34
+ return NodeRunResult (
35
+ status = WorkflowNodeExecutionStatus .SUCCEEDED ,
36
+ inputs = inputs ,
37
+ process_data = process_data ,
38
+ outputs = outputs ,
39
+ )
40
+ if not isinstance (variable , ArrayFileSegment | ArrayNumberSegment | ArrayStringSegment ):
30
41
error_message = (
31
42
f"Variable { self .node_data .variable } is not an ArrayFileSegment, ArrayNumberSegment "
32
43
"or ArrayStringSegment"
@@ -36,78 +47,106 @@ def _run(self):
36
47
)
37
48
38
49
if isinstance (variable , ArrayFileSegment ):
50
+ inputs = {"variable" : [item .to_dict () for item in variable .value ]}
39
51
process_data ["variable" ] = [item .to_dict () for item in variable .value ]
40
52
else :
53
+ inputs = {"variable" : variable .value }
41
54
process_data ["variable" ] = variable .value
42
55
43
- # Filter
44
- if self .node_data .filter_by .enabled :
45
- for condition in self .node_data .filter_by .conditions :
46
- if isinstance (variable , ArrayStringSegment ):
47
- if not isinstance (condition .value , str ):
48
- raise ValueError (f"Invalid filter value: { condition .value } " )
49
- value = self .graph_runtime_state .variable_pool .convert_template (condition .value ).text
50
- filter_func = _get_string_filter_func (condition = condition .comparison_operator , value = value )
51
- result = list (filter (filter_func , variable .value ))
52
- variable = variable .model_copy (update = {"value" : result })
53
- elif isinstance (variable , ArrayNumberSegment ):
54
- if not isinstance (condition .value , str ):
55
- raise ValueError (f"Invalid filter value: { condition .value } " )
56
- value = self .graph_runtime_state .variable_pool .convert_template (condition .value ).text
57
- filter_func = _get_number_filter_func (condition = condition .comparison_operator , value = float (value ))
58
- result = list (filter (filter_func , variable .value ))
59
- variable = variable .model_copy (update = {"value" : result })
60
- elif isinstance (variable , ArrayFileSegment ):
61
- if isinstance (condition .value , str ):
62
- value = self .graph_runtime_state .variable_pool .convert_template (condition .value ).text
63
- else :
64
- value = condition .value
65
- filter_func = _get_file_filter_func (
66
- key = condition .key ,
67
- condition = condition .comparison_operator ,
68
- value = value ,
69
- )
70
- result = list (filter (filter_func , variable .value ))
71
- variable = variable .model_copy (update = {"value" : result })
72
-
73
- # Order
74
- if self .node_data .order_by .enabled :
56
+ try :
57
+ # Filter
58
+ if self .node_data .filter_by .enabled :
59
+ variable = self ._apply_filter (variable )
60
+
61
+ # Order
62
+ if self .node_data .order_by .enabled :
63
+ variable = self ._apply_order (variable )
64
+
65
+ # Slice
66
+ if self .node_data .limit .enabled :
67
+ variable = self ._apply_slice (variable )
68
+
69
+ outputs = {
70
+ "result" : variable .value ,
71
+ "first_record" : variable .value [0 ] if variable .value else None ,
72
+ "last_record" : variable .value [- 1 ] if variable .value else None ,
73
+ }
74
+ return NodeRunResult (
75
+ status = WorkflowNodeExecutionStatus .SUCCEEDED ,
76
+ inputs = inputs ,
77
+ process_data = process_data ,
78
+ outputs = outputs ,
79
+ )
80
+ except ListOperatorError as e :
81
+ return NodeRunResult (
82
+ status = WorkflowNodeExecutionStatus .FAILED ,
83
+ error = str (e ),
84
+ inputs = inputs ,
85
+ process_data = process_data ,
86
+ outputs = outputs ,
87
+ )
88
+
89
+ def _apply_filter (
90
+ self , variable : Union [ArrayFileSegment , ArrayNumberSegment , ArrayStringSegment ]
91
+ ) -> Union [ArrayFileSegment , ArrayNumberSegment , ArrayStringSegment ]:
92
+ for condition in self .node_data .filter_by .conditions :
75
93
if isinstance (variable , ArrayStringSegment ):
76
- result = _order_string (order = self .node_data .order_by .value , array = variable .value )
94
+ if not isinstance (condition .value , str ):
95
+ raise InvalidFilterValueError (f"Invalid filter value: { condition .value } " )
96
+ value = self .graph_runtime_state .variable_pool .convert_template (condition .value ).text
97
+ filter_func = _get_string_filter_func (condition = condition .comparison_operator , value = value )
98
+ result = list (filter (filter_func , variable .value ))
77
99
variable = variable .model_copy (update = {"value" : result })
78
100
elif isinstance (variable , ArrayNumberSegment ):
79
- result = _order_number (order = self .node_data .order_by .value , array = variable .value )
101
+ if not isinstance (condition .value , str ):
102
+ raise InvalidFilterValueError (f"Invalid filter value: { condition .value } " )
103
+ value = self .graph_runtime_state .variable_pool .convert_template (condition .value ).text
104
+ filter_func = _get_number_filter_func (condition = condition .comparison_operator , value = float (value ))
105
+ result = list (filter (filter_func , variable .value ))
80
106
variable = variable .model_copy (update = {"value" : result })
81
107
elif isinstance (variable , ArrayFileSegment ):
82
- result = _order_file (
83
- order = self .node_data .order_by .value , order_by = self .node_data .order_by .key , array = variable .value
108
+ if isinstance (condition .value , str ):
109
+ value = self .graph_runtime_state .variable_pool .convert_template (condition .value ).text
110
+ else :
111
+ value = condition .value
112
+ filter_func = _get_file_filter_func (
113
+ key = condition .key ,
114
+ condition = condition .comparison_operator ,
115
+ value = value ,
84
116
)
117
+ result = list (filter (filter_func , variable .value ))
85
118
variable = variable .model_copy (update = {"value" : result })
119
+ return variable
86
120
87
- # Slice
88
- if self .node_data .limit .enabled :
89
- result = variable .value [: self .node_data .limit .size ]
121
+ def _apply_order (
122
+ self , variable : Union [ArrayFileSegment , ArrayNumberSegment , ArrayStringSegment ]
123
+ ) -> Union [ArrayFileSegment , ArrayNumberSegment , ArrayStringSegment ]:
124
+ if isinstance (variable , ArrayStringSegment ):
125
+ result = _order_string (order = self .node_data .order_by .value , array = variable .value )
126
+ variable = variable .model_copy (update = {"value" : result })
127
+ elif isinstance (variable , ArrayNumberSegment ):
128
+ result = _order_number (order = self .node_data .order_by .value , array = variable .value )
129
+ variable = variable .model_copy (update = {"value" : result })
130
+ elif isinstance (variable , ArrayFileSegment ):
131
+ result = _order_file (
132
+ order = self .node_data .order_by .value , order_by = self .node_data .order_by .key , array = variable .value
133
+ )
90
134
variable = variable .model_copy (update = {"value" : result })
135
+ return variable
91
136
92
- outputs = {
93
- "result" : variable .value ,
94
- "first_record" : variable .value [0 ] if variable .value else None ,
95
- "last_record" : variable .value [- 1 ] if variable .value else None ,
96
- }
97
- return NodeRunResult (
98
- status = WorkflowNodeExecutionStatus .SUCCEEDED ,
99
- inputs = inputs ,
100
- process_data = process_data ,
101
- outputs = outputs ,
102
- )
137
+ def _apply_slice (
138
+ self , variable : Union [ArrayFileSegment , ArrayNumberSegment , ArrayStringSegment ]
139
+ ) -> Union [ArrayFileSegment , ArrayNumberSegment , ArrayStringSegment ]:
140
+ result = variable .value [: self .node_data .limit .size ]
141
+ return variable .model_copy (update = {"value" : result })
103
142
104
143
105
144
def _get_file_extract_number_func (* , key : str ) -> Callable [[File ], int ]:
106
145
match key :
107
146
case "size" :
108
147
return lambda x : x .size
109
148
case _:
110
- raise ValueError (f"Invalid key: { key } " )
149
+ raise InvalidKeyError (f"Invalid key: { key } " )
111
150
112
151
113
152
def _get_file_extract_string_func (* , key : str ) -> Callable [[File ], str ]:
@@ -125,7 +164,7 @@ def _get_file_extract_string_func(*, key: str) -> Callable[[File], str]:
125
164
case "url" :
126
165
return lambda x : x .remote_url or ""
127
166
case _:
128
- raise ValueError (f"Invalid key: { key } " )
167
+ raise InvalidKeyError (f"Invalid key: { key } " )
129
168
130
169
131
170
def _get_string_filter_func (* , condition : str , value : str ) -> Callable [[str ], bool ]:
@@ -151,7 +190,7 @@ def _get_string_filter_func(*, condition: str, value: str) -> Callable[[str], bo
151
190
case "not empty" :
152
191
return lambda x : x != ""
153
192
case _:
154
- raise ValueError (f"Invalid condition: { condition } " )
193
+ raise InvalidConditionError (f"Invalid condition: { condition } " )
155
194
156
195
157
196
def _get_sequence_filter_func (* , condition : str , value : Sequence [str ]) -> Callable [[str ], bool ]:
@@ -161,7 +200,7 @@ def _get_sequence_filter_func(*, condition: str, value: Sequence[str]) -> Callab
161
200
case "not in" :
162
201
return lambda x : not _in (value )(x )
163
202
case _:
164
- raise ValueError (f"Invalid condition: { condition } " )
203
+ raise InvalidConditionError (f"Invalid condition: { condition } " )
165
204
166
205
167
206
def _get_number_filter_func (* , condition : str , value : int | float ) -> Callable [[int | float ], bool ]:
@@ -179,7 +218,7 @@ def _get_number_filter_func(*, condition: str, value: int | float) -> Callable[[
179
218
case "≥" :
180
219
return _ge (value )
181
220
case _:
182
- raise ValueError (f"Invalid condition: { condition } " )
221
+ raise InvalidConditionError (f"Invalid condition: { condition } " )
183
222
184
223
185
224
def _get_file_filter_func (* , key : str , condition : str , value : str | Sequence [str ]) -> Callable [[File ], bool ]:
@@ -193,7 +232,7 @@ def _get_file_filter_func(*, key: str, condition: str, value: str | Sequence[str
193
232
extract_func = _get_file_extract_number_func (key = key )
194
233
return lambda x : _get_number_filter_func (condition = condition , value = float (value ))(extract_func (x ))
195
234
else :
196
- raise ValueError (f"Invalid key: { key } " )
235
+ raise InvalidKeyError (f"Invalid key: { key } " )
197
236
198
237
199
238
def _contains (value : str ):
0 commit comments