@@ -87,7 +87,9 @@ def do_test(
87
87
* ,
88
88
incoming ,
89
89
simulate_send_failure = False ,
90
+ simulate_sigint_during_stdout_write = False ,
90
91
expected_outgoing = None ,
92
+ expected_outgoing_signals = None ,
91
93
expected_completions = None ,
92
94
expected_exception = None ,
93
95
expected_stdout = "" ,
@@ -96,6 +98,8 @@ def do_test(
96
98
):
97
99
if expected_outgoing is None :
98
100
expected_outgoing = []
101
+ if expected_outgoing_signals is None :
102
+ expected_outgoing_signals = []
99
103
if expected_completions is None :
100
104
expected_completions = []
101
105
if expected_state is None :
@@ -130,7 +134,9 @@ def mock_input(prompt):
130
134
reply = message ["input" ]
131
135
if isinstance (reply , BaseException ):
132
136
raise reply
133
- return reply
137
+ if isinstance (reply , str ):
138
+ return reply
139
+ return reply ()
134
140
135
141
with ExitStack () as stack :
136
142
client_sock , server_sock = socket .socketpair ()
@@ -155,15 +161,25 @@ def mock_input(prompt):
155
161
156
162
stdout = io .StringIO ()
157
163
164
+ if simulate_sigint_during_stdout_write :
165
+ orig_stdout_write = stdout .write
166
+
167
+ def sigint_stdout_write (s ):
168
+ signal .raise_signal (signal .SIGINT )
169
+ return orig_stdout_write (s )
170
+
171
+ stdout .write = sigint_stdout_write
172
+
158
173
input_mock = stack .enter_context (
159
174
unittest .mock .patch ("pdb.input" , side_effect = mock_input )
160
175
)
161
176
stack .enter_context (redirect_stdout (stdout ))
162
177
178
+ interrupt_sock = unittest .mock .Mock (spec = socket .socket )
163
179
client = _PdbClient (
164
180
pid = 0 ,
165
181
server_socket = server_sock ,
166
- interrupt_sock = unittest . mock . Mock ( spec = socket . socket ) ,
182
+ interrupt_sock = interrupt_sock ,
167
183
)
168
184
169
185
if expected_exception is not None :
@@ -188,6 +204,12 @@ def mock_input(prompt):
188
204
actual_state = {k : getattr (client , k ) for k in expected_state }
189
205
self .assertEqual (actual_state , expected_state )
190
206
207
+ outgoing_signals = [
208
+ signal .Signals (int .from_bytes (call .args [0 ]))
209
+ for call in interrupt_sock .sendall .call_args_list
210
+ ]
211
+ self .assertEqual (outgoing_signals , expected_outgoing_signals )
212
+
191
213
def test_remote_immediately_closing_the_connection (self ):
192
214
"""Test the behavior when the remote closes the connection immediately."""
193
215
incoming = []
@@ -382,11 +404,17 @@ def test_handling_unrecognized_prompt_type(self):
382
404
expected_state = {"state" : "dumb" },
383
405
)
384
406
385
- def test_keyboard_interrupt_at_prompt (self ):
386
- """Test signaling when a prompt gets a KeyboardInterrupt ."""
407
+ def test_sigint_at_prompt (self ):
408
+ """Test signaling when a prompt gets interrupted ."""
387
409
incoming = [
388
410
("server" , {"prompt" : "(Pdb) " , "state" : "pdb" }),
389
- ("user" , {"prompt" : "(Pdb) " , "input" : KeyboardInterrupt ()}),
411
+ (
412
+ "user" ,
413
+ {
414
+ "prompt" : "(Pdb) " ,
415
+ "input" : lambda : signal .raise_signal (signal .SIGINT ),
416
+ },
417
+ ),
390
418
]
391
419
self .do_test (
392
420
incoming = incoming ,
@@ -396,6 +424,40 @@ def test_keyboard_interrupt_at_prompt(self):
396
424
expected_state = {"state" : "pdb" },
397
425
)
398
426
427
+ def test_sigint_at_continuation_prompt (self ):
428
+ """Test signaling when a continuation prompt gets interrupted."""
429
+ incoming = [
430
+ ("server" , {"prompt" : "(Pdb) " , "state" : "pdb" }),
431
+ ("user" , {"prompt" : "(Pdb) " , "input" : "if True:" }),
432
+ (
433
+ "user" ,
434
+ {
435
+ "prompt" : "... " ,
436
+ "input" : lambda : signal .raise_signal (signal .SIGINT ),
437
+ },
438
+ ),
439
+ ]
440
+ self .do_test (
441
+ incoming = incoming ,
442
+ expected_outgoing = [
443
+ {"signal" : "INT" },
444
+ ],
445
+ expected_state = {"state" : "pdb" },
446
+ )
447
+
448
+ def test_sigint_when_writing (self ):
449
+ """Test siginaling when sys.stdout.write() gets interrupted."""
450
+ incoming = [
451
+ ("server" , {"message" : "Some message or other\n " , "type" : "info" }),
452
+ ]
453
+ self .do_test (
454
+ incoming = incoming ,
455
+ simulate_sigint_during_stdout_write = True ,
456
+ expected_outgoing = [],
457
+ expected_outgoing_signals = [signal .SIGINT ],
458
+ expected_stdout = "Some message or other\n " ,
459
+ )
460
+
399
461
def test_eof_at_prompt (self ):
400
462
"""Test signaling when a prompt gets an EOFError."""
401
463
incoming = [
0 commit comments