15
15
16
16
17
17
import argparse
18
+ import asyncio
19
+ import dataclasses
18
20
import importlib
19
21
import logging
22
+ import math
20
23
import os
21
24
import pathlib
22
25
import re
@@ -44,10 +47,11 @@ def get_args():
44
47
return pars .parse_args ()
45
48
46
49
47
- def run_cmd (cmd ):
48
- try :
49
- subprocess .run ([str (c ) for c in cmd ], check = True )
50
- except subprocess .CalledProcessError :
50
+ async def run_cmd (cmd ):
51
+ proc = await asyncio .create_subprocess_exec (
52
+ str (cmd [0 ]), * [str (arg ) for arg in cmd [1 :]])
53
+ await proc .wait ()
54
+ if proc .returncode :
51
55
logging .error (
52
56
"Failed to run command '%s'" , " " .join ([str (c ) for c in cmd ]))
53
57
sys .exit (1 )
@@ -69,48 +73,52 @@ def has_phony_add_jobs(module_file):
69
73
return False
70
74
71
75
72
- def litani_add (litani , counter , * args , ** kwargs ):
76
+ async def litani_add (litani , ** kwargs ):
73
77
cmd = [litani , "add-job" ]
74
- for arg in args :
75
- switch = re .sub ("_" , "-" , arg )
76
- cmd .append (f"--{ switch } " )
77
78
for arg , value in kwargs .items ():
78
79
switch = re .sub ("_" , "-" , arg )
79
80
cmd .append (f"--{ switch } " )
80
81
if isinstance (value , list ):
81
82
cmd .extend (value )
82
83
else :
83
84
cmd .append (value )
84
- run_cmd (cmd )
85
- counter ["added" ] += 1
86
- print_counter (counter )
85
+ await run_cmd (cmd )
87
86
88
87
89
88
def collapse (string ):
90
89
return re .sub (r"\s+" , " " , string )
91
90
92
91
93
- def has_line (test_file , line ):
94
- with open (test_file ) as handle :
95
- for current_line in handle :
96
- if current_line .strip ().startswith (line ):
97
- return True
98
- return False
92
+
93
+ @dataclasses .dataclass
94
+ class TestFileMethodChecker :
95
+ test_file : pathlib .Path
96
+
97
+ def __call__ (self , meth_name ):
98
+ pat = f"def { meth_name } (" # ) <- for syntax highlighting
99
+ with open (self .test_file ) as handle :
100
+ for line in handle :
101
+ if line .strip ().startswith (pat ):
102
+ return True
103
+ return False
99
104
100
105
101
- def add_e2e_tests (litani , test_dir , counter , output_dir , fast ):
106
+ def enqueue_job (queue , ** kwargs ):
107
+ queue .put_nowait (kwargs )
108
+
109
+
110
+
111
+ def add_e2e_tests (test_dir , output_dir , fast , queue , litani ):
102
112
e2e_test_dir = test_dir / "e2e"
103
113
# 4 jobs per test (init, add-jobs, run-build, check-run)
104
114
# skip __init__.py and __pycache__
105
- counter ["total" ] += (len (os .listdir (e2e_test_dir / "tests" )) - 2 ) * 4
106
115
sys .path .insert (1 , str (e2e_test_dir / "tests" ))
116
+
107
117
for test_file in (e2e_test_dir / "tests" ).iterdir ():
108
118
if test_file .name in ["__init__.py" , "__pycache__" ]:
109
119
continue
110
120
111
- add_transform_jobs = has_line (test_file , "def transform_jobs(" )
112
- add_get_jobs = has_line (test_file , "def check_get_jobs(" )
113
- add_set_jobs = has_line (test_file , "def get_set_jobs_args(" )
121
+ test_has_method = TestFileMethodChecker (test_file )
114
122
115
123
if fast and is_slow_test (test_file ):
116
124
continue
@@ -119,8 +127,8 @@ def add_e2e_tests(litani, test_dir, counter, output_dir, fast):
119
127
120
128
timeout = 10 if fast else 0
121
129
122
- litani_add (
123
- litani , counter ,
130
+ enqueue_job (
131
+ queue ,
124
132
command = collapse (f"""
125
133
{ e2e_test_dir / 'run' }
126
134
--test-file { test_file }
@@ -144,8 +152,8 @@ def add_e2e_tests(litani, test_dir, counter, output_dir, fast):
144
152
else :
145
153
add_jobs_kwargs ["outputs" ] = [jobs_output ]
146
154
147
- litani_add (
148
- litani , counter ,
155
+ enqueue_job (
156
+ queue ,
149
157
command = collapse (f"""
150
158
{ e2e_test_dir / 'run' }
151
159
--test-file { test_file }
@@ -161,10 +169,10 @@ def add_e2e_tests(litani, test_dir, counter, output_dir, fast):
161
169
** add_jobs_kwargs )
162
170
163
171
run_build_dependencies = [add_jobs_output ]
164
- if add_transform_jobs :
172
+ if test_has_method ( "transform_jobs" ) :
165
173
transform_jobs_output = str (uuid .uuid4 ())
166
- litani_add (
167
- litani , counter ,
174
+ enqueue_job (
175
+ queue ,
168
176
command = collapse (f"""
169
177
{ e2e_test_dir / 'run' }
170
178
--test-file { test_file }
@@ -179,10 +187,10 @@ def add_e2e_tests(litani, test_dir, counter, output_dir, fast):
179
187
cwd = run_dir )
180
188
run_build_dependencies .append (transform_jobs_output )
181
189
182
- if add_get_jobs :
190
+ if test_has_method ( "check_get_jobs" ) :
183
191
get_jobs_output = str (uuid .uuid4 ())
184
- litani_add (
185
- litani , counter ,
192
+ enqueue_job (
193
+ queue ,
186
194
command = collapse (f"""
187
195
{ e2e_test_dir / 'run' }
188
196
--test-file { test_file }
@@ -197,10 +205,10 @@ def add_e2e_tests(litani, test_dir, counter, output_dir, fast):
197
205
cwd = run_dir )
198
206
run_build_dependencies .append (get_jobs_output )
199
207
200
- if add_set_jobs :
208
+ if test_has_method ( "get_set_jobs_args" ) :
201
209
set_jobs_output = str (uuid .uuid4 ())
202
- litani_add (
203
- litani , counter ,
210
+ enqueue_job (
211
+ queue ,
204
212
command = collapse (f"""
205
213
{ e2e_test_dir / 'run' }
206
214
--test-file { test_file }
@@ -215,8 +223,8 @@ def add_e2e_tests(litani, test_dir, counter, output_dir, fast):
215
223
cwd = run_dir )
216
224
run_build_dependencies .append (set_jobs_output )
217
225
218
- litani_add (
219
- litani , counter ,
226
+ enqueue_job (
227
+ queue ,
220
228
command = collapse (f"""
221
229
{ e2e_test_dir / 'run' }
222
230
--test-file { test_file }
@@ -231,8 +239,8 @@ def add_e2e_tests(litani, test_dir, counter, output_dir, fast):
231
239
cwd = run_dir ,
232
240
timeout = timeout )
233
241
234
- litani_add (
235
- litani , counter ,
242
+ enqueue_job (
243
+ queue ,
236
244
command = collapse (f"""
237
245
{ e2e_test_dir / 'run' }
238
246
--test-file { test_file }
@@ -247,25 +255,55 @@ def add_e2e_tests(litani, test_dir, counter, output_dir, fast):
247
255
timeout = timeout )
248
256
249
257
250
- def add_unit_tests (litani , test_dir , root_dir , counter ):
258
+ def add_unit_tests (test_dir , root_dir , queue ):
251
259
for fyle in (test_dir / "unit" ).iterdir ():
252
260
if fyle .name in ["__init__.py" , "__pycache__" ]:
253
261
continue
254
- litani_add (
255
- litani , counter ,
262
+ enqueue_job (
263
+ queue ,
256
264
command = f"python3 -m unittest test.unit.{ fyle .stem } " ,
257
265
pipeline = "Unit tests" ,
258
266
ci_stage = "test" ,
259
267
description = fyle .stem ,
260
268
cwd = root_dir )
261
- counter ["total" ] += 1
262
269
263
270
264
- def print_counter (counter ):
265
- print ("\r {added} / {total} tests added" .format (** counter ), end = "" )
271
+
272
+ @dataclasses .dataclass
273
+ class Counter :
274
+ total : int
275
+ added : int = 0
276
+ width : int = 0
266
277
267
278
268
- def main ():
279
+ def __post_init__ (self ):
280
+ self .width = int (math .log10 (self .total )) + 1
281
+
282
+
283
+ def bump (self ):
284
+ self .added += 1
285
+ msg = "\r {added:{width}}/{total:{width}} tests added" .format (
286
+ added = self .added , total = self .total , width = self .width )
287
+ print (msg , end = "" , file = sys .stderr )
288
+
289
+
290
+
291
+ async def drain_job_queue (queue , counter , litani ):
292
+ while True :
293
+ args = await queue .get ()
294
+ await litani_add (litani , ** args )
295
+ counter .bump ()
296
+ queue .task_done ()
297
+
298
+
299
+ def get_pool_size ():
300
+ ret = os .cpu_count ()
301
+ if ret is None or ret < 4 :
302
+ return 1
303
+ return ret - 2
304
+
305
+
306
+ async def main ():
269
307
args = get_args ()
270
308
logging .basicConfig (format = "\n run-tests: %(message)s" )
271
309
test_dir = pathlib .Path (__file__ ).resolve ().parent
@@ -276,24 +314,25 @@ def main():
276
314
output_dir .mkdir (exist_ok = True , parents = True )
277
315
os .chdir (output_dir )
278
316
279
- run_cmd ([
317
+ await run_cmd ([
280
318
litani , "init" ,
281
319
"--project" , "Litani Test Suite" ,
282
320
"--output-prefix" , "." ,
283
321
"--output-symlink" , "latest" ])
284
322
285
- counter = {
286
- "added" : 0 ,
287
- "total" : 0 ,
288
- }
289
-
290
- add_unit_tests (litani , test_dir , root , counter )
291
- add_e2e_tests (
292
- litani , test_dir , counter , output_dir , args .fast )
293
- print ()
323
+ job_queue = asyncio .Queue ()
324
+ add_unit_tests (test_dir , root , job_queue )
325
+ add_e2e_tests (test_dir , output_dir , args .fast , job_queue , litani )
294
326
295
- run_cmd ([litani , "run-build" , "--fail-on-pipeline-failure" ])
327
+ counter = Counter (job_queue .qsize ())
328
+ tasks = []
329
+ for _ in range (get_pool_size ()):
330
+ task = asyncio .create_task (drain_job_queue (job_queue , counter , litani ))
331
+ tasks .append (task )
332
+ await job_queue .join ()
333
+ print ("" , file = sys .stderr )
334
+ await run_cmd ([litani , "run-build" , "--fail-on-pipeline-failure" ])
296
335
297
336
298
337
if __name__ == "__main__" :
299
- main ()
338
+ asyncio . run ( main () )
0 commit comments