11import os
2+ import queue
3+ import signal
24import sys
35import threading
46import time
57import unittest
8+ import unittest .mock
69from concurrent import futures
710from concurrent .futures .process import BrokenProcessPool
811
912from test import support
1013from test .support import hashlib_helper
14+ from test .test_importlib .metadata .fixtures import parameterize
1115
1216from .executor import ExecutorTest , mul
1317from .util import (
@@ -22,6 +26,19 @@ def __init__(self, mgr):
2226 def __del__ (self ):
2327 self .event .set ()
2428
29+ TERMINATE_WORKERS = futures .ProcessPoolExecutor .terminate_workers .__name__
30+ KILL_WORKERS = futures .ProcessPoolExecutor .kill_workers .__name__
31+ FORCE_SHUTDOWN_PARAMS = [
32+ dict (function_name = TERMINATE_WORKERS ),
33+ dict (function_name = KILL_WORKERS ),
34+ ]
35+
36+ def _put_sleep_put (queue ):
37+ """ Used as part of test_terminate_workers """
38+ queue .put ('started' )
39+ time .sleep (2 )
40+ queue .put ('finished' )
41+
2542
2643class ProcessPoolExecutorTest (ExecutorTest ):
2744
@@ -218,6 +235,98 @@ def mock_start_new_thread(func, *args, **kwargs):
218235 list (executor .map (mul , [(2 , 3 )] * 10 ))
219236 executor .shutdown ()
220237
238+ def test_terminate_workers (self ):
239+ mock_fn = unittest .mock .Mock ()
240+ with self .executor_type (max_workers = 1 ) as executor :
241+ executor ._force_shutdown = mock_fn
242+ executor .terminate_workers ()
243+
244+ mock_fn .assert_called_once_with (operation = futures .process ._TERMINATE )
245+
246+ def test_kill_workers (self ):
247+ mock_fn = unittest .mock .Mock ()
248+ with self .executor_type (max_workers = 1 ) as executor :
249+ executor ._force_shutdown = mock_fn
250+ executor .kill_workers ()
251+
252+ mock_fn .assert_called_once_with (operation = futures .process ._KILL )
253+
254+ def test_force_shutdown_workers_invalid_op (self ):
255+ with self .executor_type (max_workers = 1 ) as executor :
256+ self .assertRaises (ValueError ,
257+ executor ._force_shutdown ,
258+ operation = 'invalid operation' ),
259+
260+ @parameterize (* FORCE_SHUTDOWN_PARAMS )
261+ def test_force_shutdown_workers (self , function_name ):
262+ manager = self .get_context ().Manager ()
263+ q = manager .Queue ()
264+
265+ with self .executor_type (max_workers = 1 ) as executor :
266+ executor .submit (_put_sleep_put , q )
267+
268+ # We should get started, but not finished since we'll terminate the
269+ # workers just after
270+ self .assertEqual (q .get (timeout = 5 ), 'started' )
271+
272+ worker_process = list (executor ._processes .values ())[0 ]
273+
274+ Mock = unittest .mock .Mock
275+ worker_process .terminate = Mock (wraps = worker_process .terminate )
276+ worker_process .kill = Mock (wraps = worker_process .kill )
277+
278+ getattr (executor , function_name )()
279+ worker_process .join ()
280+
281+ if function_name == TERMINATE_WORKERS :
282+ worker_process .terminate .assert_called ()
283+ elif function_name == KILL_WORKERS :
284+ worker_process .kill .assert_called ()
285+ else :
286+ self .fail (f"Unknown operation: { function_name } " )
287+
288+ self .assertRaises (queue .Empty , q .get , timeout = 1 )
289+
290+ @parameterize (* FORCE_SHUTDOWN_PARAMS )
291+ def test_force_shutdown_workers_dead_workers (self , function_name ):
292+ with self .executor_type (max_workers = 1 ) as executor :
293+ future = executor .submit (os ._exit , 1 )
294+ self .assertRaises (BrokenProcessPool , future .result )
295+
296+ # even though the pool is broken, this shouldn't raise
297+ getattr (executor , function_name )()
298+
299+ @parameterize (* FORCE_SHUTDOWN_PARAMS )
300+ def test_force_shutdown_workers_not_started_yet (self , function_name ):
301+ ctx = self .get_context ()
302+ with unittest .mock .patch .object (ctx , 'Process' ) as mock_process :
303+ with self .executor_type (max_workers = 1 , mp_context = ctx ) as executor :
304+ # The worker has not been started yet, terminate/kill_workers
305+ # should basically no-op
306+ getattr (executor , function_name )()
307+
308+ mock_process .return_value .kill .assert_not_called ()
309+ mock_process .return_value .terminate .assert_not_called ()
310+
311+ @parameterize (* FORCE_SHUTDOWN_PARAMS )
312+ def test_force_shutdown_workers_stops_pool (self , function_name ):
313+ with self .executor_type (max_workers = 1 ) as executor :
314+ task = executor .submit (time .sleep , 0 )
315+ self .assertIsNone (task .result ())
316+
317+ worker_process = list (executor ._processes .values ())[0 ]
318+ getattr (executor , function_name )()
319+
320+ self .assertRaises (RuntimeError , executor .submit , time .sleep , 0 )
321+
322+ # A signal sent, is not a signal reacted to.
323+ # So wait a moment here for the process to die.
324+ # If we don't, every once in a while we may get an ENV CHANGE
325+ # error since the process would be alive immediately after the
326+ # test run.. and die a moment later.
327+ worker_process .join (timeout = 5 )
328+ self .assertFalse (worker_process .is_alive ())
329+
221330
222331create_executor_tests (globals (), ProcessPoolExecutorTest ,
223332 executor_mixins = (ProcessPoolForkMixin ,
0 commit comments