@@ -2368,6 +2368,26 @@ class CustomListener(logging.handlers.QueueListener):
2368
2368
class CustomQueue (queue .Queue ):
2369
2369
pass
2370
2370
2371
+ class CustomQueueProtocol :
2372
+ def __init__ (self , maxsize = 0 ):
2373
+ self .queue = queue .Queue (maxsize )
2374
+
2375
+ def __getattr__ (self , attribute ):
2376
+ queue = object .__getattribute__ (self , 'queue' )
2377
+ return getattr (queue , attribute )
2378
+
2379
+ class CustomQueueFakeProtocol (CustomQueueProtocol ):
2380
+ # An object implementing the Queue API (incorrect signatures).
2381
+ # The object will be considered a valid queue class since we
2382
+ # do not check the signatures (only callability of methods)
2383
+ # but will NOT be usable in production since a TypeError will
2384
+ # be raised due to a missing argument.
2385
+ def empty (self , x ):
2386
+ pass
2387
+
2388
+ class CustomQueueWrongProtocol (CustomQueueProtocol ):
2389
+ empty = None
2390
+
2371
2391
def queueMaker ():
2372
2392
return queue .Queue ()
2373
2393
@@ -3901,18 +3921,16 @@ def do_queuehandler_configuration(self, qspec, lspec):
3901
3921
@threading_helper .requires_working_threading ()
3902
3922
@support .requires_subprocess ()
3903
3923
def test_config_queue_handler (self ):
3904
- q = CustomQueue ()
3905
- dq = {
3906
- '()' : __name__ + '.CustomQueue' ,
3907
- 'maxsize' : 10
3908
- }
3924
+ qs = [CustomQueue (), CustomQueueProtocol ()]
3925
+ dqs = [{'()' : f'{ __name__ } .{ cls } ' , 'maxsize' : 10 }
3926
+ for cls in ['CustomQueue' , 'CustomQueueProtocol' ]]
3909
3927
dl = {
3910
3928
'()' : __name__ + '.listenerMaker' ,
3911
3929
'arg1' : None ,
3912
3930
'arg2' : None ,
3913
3931
'respect_handler_level' : True
3914
3932
}
3915
- qvalues = (None , __name__ + '.queueMaker' , __name__ + '.CustomQueue' , dq , q )
3933
+ qvalues = (None , __name__ + '.queueMaker' , __name__ + '.CustomQueue' , * dqs , * qs )
3916
3934
lvalues = (None , __name__ + '.CustomListener' , dl , CustomListener )
3917
3935
for qspec , lspec in itertools .product (qvalues , lvalues ):
3918
3936
self .do_queuehandler_configuration (qspec , lspec )
@@ -3932,15 +3950,21 @@ def test_config_queue_handler(self):
3932
3950
@support .requires_subprocess ()
3933
3951
@patch ("multiprocessing.Manager" )
3934
3952
def test_config_queue_handler_does_not_create_multiprocessing_manager (self , manager ):
3935
- # gh-120868
3953
+ # gh-120868, gh-121723
3936
3954
3937
3955
from multiprocessing import Queue as MQ
3938
3956
3939
3957
q1 = {"()" : "queue.Queue" , "maxsize" : - 1 }
3940
3958
q2 = MQ ()
3941
3959
q3 = queue .Queue ()
3942
-
3943
- for qspec in (q1 , q2 , q3 ):
3960
+ # CustomQueueFakeProtocol passes the checks but will not be usable
3961
+ # since the signatures are incompatible. Checking the Queue API
3962
+ # without testing the type of the actual queue is a trade-off
3963
+ # between usability and the work we need to do in order to safely
3964
+ # check that the queue object correctly implements the API.
3965
+ q4 = CustomQueueFakeProtocol ()
3966
+
3967
+ for qspec in (q1 , q2 , q3 , q4 ):
3944
3968
self .apply_config (
3945
3969
{
3946
3970
"version" : 1 ,
@@ -3956,21 +3980,62 @@ def test_config_queue_handler_does_not_create_multiprocessing_manager(self, mana
3956
3980
3957
3981
@patch ("multiprocessing.Manager" )
3958
3982
def test_config_queue_handler_invalid_config_does_not_create_multiprocessing_manager (self , manager ):
3959
- # gh-120868
3983
+ # gh-120868, gh-121723
3960
3984
3961
- with self .assertRaises (ValueError ):
3962
- self .apply_config (
3963
- {
3964
- "version" : 1 ,
3965
- "handlers" : {
3966
- "queue_listener" : {
3967
- "class" : "logging.handlers.QueueHandler" ,
3968
- "queue" : object (),
3985
+ for qspec in [object (), CustomQueueWrongProtocol ()]:
3986
+ with self .assertRaises (ValueError ):
3987
+ self .apply_config (
3988
+ {
3989
+ "version" : 1 ,
3990
+ "handlers" : {
3991
+ "queue_listener" : {
3992
+ "class" : "logging.handlers.QueueHandler" ,
3993
+ "queue" : qspec ,
3994
+ },
3969
3995
},
3970
- },
3996
+ }
3997
+ )
3998
+ manager .assert_not_called ()
3999
+
4000
+ @skip_if_tsan_fork
4001
+ @support .requires_subprocess ()
4002
+ @unittest .skipUnless (support .Py_DEBUG , "requires a debug build for testing"
4003
+ "assertions in multiprocessing" )
4004
+ def test_config_queue_handler_multiprocessing_context (self ):
4005
+ # regression test for gh-121723
4006
+ if support .MS_WINDOWS :
4007
+ start_methods = ['spawn' ]
4008
+ else :
4009
+ start_methods = ['spawn' , 'fork' , 'forkserver' ]
4010
+ for start_method in start_methods :
4011
+ with self .subTest (start_method = start_method ):
4012
+ ctx = multiprocessing .get_context (start_method )
4013
+ with ctx .Manager () as manager :
4014
+ q = manager .Queue ()
4015
+ records = []
4016
+ # use 1 process and 1 task per child to put 1 record
4017
+ with ctx .Pool (1 , initializer = self ._mpinit_issue121723 ,
4018
+ initargs = (q , "text" ), maxtasksperchild = 1 ):
4019
+ records .append (q .get (timeout = 60 ))
4020
+ self .assertTrue (q .empty ())
4021
+ self .assertEqual (len (records ), 1 )
4022
+
4023
+ @staticmethod
4024
+ def _mpinit_issue121723 (qspec , message_to_log ):
4025
+ # static method for pickling support
4026
+ logging .config .dictConfig ({
4027
+ 'version' : 1 ,
4028
+ 'disable_existing_loggers' : True ,
4029
+ 'handlers' : {
4030
+ 'log_to_parent' : {
4031
+ 'class' : 'logging.handlers.QueueHandler' ,
4032
+ 'queue' : qspec
3971
4033
}
3972
- )
3973
- manager .assert_not_called ()
4034
+ },
4035
+ 'root' : {'handlers' : ['log_to_parent' ], 'level' : 'DEBUG' }
4036
+ })
4037
+ # log a message (this creates a record put in the queue)
4038
+ logging .getLogger ().info (message_to_log )
3974
4039
3975
4040
@skip_if_tsan_fork
3976
4041
@support .requires_subprocess ()
0 commit comments