5
5
from typing import Any
6
6
7
7
import cacholote
8
+ import cads_adaptors
8
9
import cads_broker .database
9
10
import distributed .worker
10
11
import structlog
@@ -170,8 +171,6 @@ def submit_workflow(
170
171
form : dict [str , Any ] = {},
171
172
metadata : dict [str , Any ] = {},
172
173
):
173
- import cads_adaptors
174
-
175
174
job_id = distributed .worker .thread_state .key # type: ignore
176
175
# send event with worker address and pid of the job
177
176
worker = get_worker ()
@@ -185,7 +184,9 @@ def submit_workflow(
185
184
message = socket .gethostname (),
186
185
session = session ,
187
186
)
188
- system_request = cads_broker .database .get_request (request_uid = job_id , session = session )
187
+ system_request = cads_broker .database .get_request (
188
+ request_uid = job_id , session = session
189
+ )
189
190
request = system_request .request_body .get ("request" , {})
190
191
form = system_request .adaptor_properties .form
191
192
config .update (system_request .adaptor_properties .config )
@@ -209,7 +210,9 @@ def submit_workflow(
209
210
result = cacholote .cacheable (adaptor .retrieve )(request = request )
210
211
except Exception as err :
211
212
logger .exception (job_id = job_id , event_type = "EXCEPTION" )
212
- context .add_user_visible_error (f"The job failed with: { err .__class__ .__name__ } " )
213
+ context .add_user_visible_error (
214
+ f"The job failed with: { err .__class__ .__name__ } "
215
+ )
213
216
context .error (f"{ err .__class__ .__name__ } : { str (err )} " )
214
217
raise
215
218
finally :
0 commit comments