40
40
import time
41
41
import threading
42
42
import utils
43
+ import multiprocessing
43
44
44
45
from os .path import join , dirname , abspath , basename , isdir , exists
45
46
from datetime import datetime
@@ -58,9 +59,13 @@ class ProgressIndicator(object):
58
59
def __init__ (self , cases , flaky_tests_mode ):
59
60
self .cases = cases
60
61
self .flaky_tests_mode = flaky_tests_mode
61
- self .queue = Queue (len (cases ))
62
+ self .parallel_queue = Queue (len (cases ))
63
+ self .sequential_queue = Queue (len (cases ))
62
64
for case in cases :
63
- self .queue .put_nowait (case )
65
+ if case .parallel :
66
+ self .parallel_queue .put_nowait (case )
67
+ else :
68
+ self .sequential_queue .put_nowait (case )
64
69
self .succeeded = 0
65
70
self .remaining = len (cases )
66
71
self .total = len (cases )
@@ -87,11 +92,11 @@ def Run(self, tasks):
87
92
# That way -j1 avoids threading altogether which is a nice fallback
88
93
# in case of threading problems.
89
94
for i in xrange (tasks - 1 ):
90
- thread = threading .Thread (target = self .RunSingle , args = [])
95
+ thread = threading .Thread (target = self .RunSingle , args = [True , i + 1 ])
91
96
threads .append (thread )
92
97
thread .start ()
93
98
try :
94
- self .RunSingle ()
99
+ self .RunSingle (False , 0 )
95
100
# Wait for the remaining threads
96
101
for thread in threads :
97
102
# Use a timeout so that signals (ctrl-c) will be processed.
@@ -105,13 +110,19 @@ def Run(self, tasks):
105
110
self .Done ()
106
111
return not self .failed
107
112
108
- def RunSingle (self ):
113
+ def RunSingle (self , parallel , thread_id ):
109
114
while not self .terminate :
110
115
try :
111
- test = self .queue .get_nowait ()
116
+ test = self .parallel_queue .get_nowait ()
112
117
except Empty :
113
- return
118
+ if parallel :
119
+ return
120
+ try :
121
+ test = self .sequential_queue .get_nowait ()
122
+ except Empty :
123
+ return
114
124
case = test .case
125
+ case .thread_id = thread_id
115
126
self .lock .acquire ()
116
127
self .AboutToRun (case )
117
128
self .lock .release ()
@@ -381,6 +392,8 @@ def __init__(self, context, path, arch, mode):
381
392
self .duration = None
382
393
self .arch = arch
383
394
self .mode = mode
395
+ self .parallel = False
396
+ self .thread_id = 0
384
397
385
398
def IsNegative (self ):
386
399
return False
@@ -399,11 +412,12 @@ def IsFailureOutput(self, output):
399
412
def GetSource (self ):
400
413
return "(no source available)"
401
414
402
- def RunCommand (self , command ):
415
+ def RunCommand (self , command , env ):
403
416
full_command = self .context .processor (command )
404
417
output = Execute (full_command ,
405
418
self .context ,
406
- self .context .GetTimeout (self .mode ))
419
+ self .context .GetTimeout (self .mode ),
420
+ env )
407
421
self .Cleanup ()
408
422
return TestOutput (self ,
409
423
full_command ,
@@ -420,7 +434,9 @@ def Run(self):
420
434
self .BeforeRun ()
421
435
422
436
try :
423
- result = self .RunCommand (self .GetCommand ())
437
+ result = self .RunCommand (self .GetCommand (), {
438
+ "TEST_THREAD_ID" : "%d" % self .thread_id
439
+ })
424
440
finally :
425
441
# Tests can leave the tty in non-blocking mode. If the test runner
426
442
# tries to print to stdout/stderr after that and the tty buffer is
@@ -559,15 +575,22 @@ def CheckedUnlink(name):
559
575
PrintError ("os.unlink() " + str (e ))
560
576
561
577
562
- def Execute (args , context , timeout = None ):
578
+ def Execute (args , context , timeout = None , env = {} ):
563
579
(fd_out , outname ) = tempfile .mkstemp ()
564
580
(fd_err , errname ) = tempfile .mkstemp ()
581
+
582
+ # Extend environment
583
+ env_copy = os .environ .copy ()
584
+ for key , value in env .iteritems ():
585
+ env_copy [key ] = value
586
+
565
587
(process , exit_code , timed_out ) = RunProcess (
566
588
context ,
567
589
timeout ,
568
590
args = args ,
569
591
stdout = fd_out ,
570
592
stderr = fd_err ,
593
+ env = env_copy
571
594
)
572
595
os .close (fd_out )
573
596
os .close (fd_err )
@@ -1068,6 +1091,7 @@ class ClassifiedTest(object):
1068
1091
def __init__ (self , case , outcomes ):
1069
1092
self .case = case
1070
1093
self .outcomes = outcomes
1094
+ self .parallel = self .case .parallel
1071
1095
1072
1096
1073
1097
class Configuration (object ):
@@ -1224,6 +1248,8 @@ def BuildOptions():
1224
1248
default = False , action = "store_true" )
1225
1249
result .add_option ("-j" , help = "The number of parallel tasks to run" ,
1226
1250
default = 1 , type = "int" )
1251
+ result .add_option ("-J" , help = "Run tasks in parallel on all cores" ,
1252
+ default = False , action = "store_true" )
1227
1253
result .add_option ("--time" , help = "Print timing information after running" ,
1228
1254
default = False , action = "store_true" )
1229
1255
result .add_option ("--suppress-dialogs" , help = "Suppress Windows dialogs for crashing tests" ,
@@ -1245,6 +1271,8 @@ def ProcessOptions(options):
1245
1271
VERBOSE = options .verbose
1246
1272
options .arch = options .arch .split (',' )
1247
1273
options .mode = options .mode .split (',' )
1274
+ if options .J :
1275
+ options .j = multiprocessing .cpu_count ()
1248
1276
def CheckTestMode (name , option ):
1249
1277
if not option in ["run" , "skip" , "dontcare" ]:
1250
1278
print "Unknown %s mode %s" % (name , option )
0 commit comments