@@ -615,6 +615,10 @@ def __init__(self, max_workers=None, mp_context=None,
615615 mp_context = mp .get_context ()
616616 self ._mp_context = mp_context
617617
618+ # https://github.com/python/cpython/issues/90622
619+ self ._safe_to_dynamically_spawn_children = (
620+ self ._mp_context .get_start_method (allow_none = False ) != "fork" )
621+
618622 if initializer is not None and not callable (initializer ):
619623 raise TypeError ("initializer must be a callable" )
620624 self ._initializer = initializer
@@ -665,6 +669,8 @@ def __init__(self, max_workers=None, mp_context=None,
665669 def _start_executor_manager_thread (self ):
666670 if self ._executor_manager_thread is None :
667671 # Start the processes so that their sentinels are known.
672+ if not self ._safe_to_dynamically_spawn_children : # ie, using fork.
673+ self ._launch_processes ()
668674 self ._executor_manager_thread = _ExecutorManagerThread (self )
669675 self ._executor_manager_thread .start ()
670676 _threads_wakeups [self ._executor_manager_thread ] = \
@@ -677,14 +683,31 @@ def _adjust_process_count(self):
677683
678684 process_count = len (self ._processes )
679685 if process_count < self ._max_workers :
680- p = self ._mp_context .Process (
681- target = _process_worker ,
682- args = (self ._call_queue ,
683- self ._result_queue ,
684- self ._initializer ,
685- self ._initargs ))
686- p .start ()
687- self ._processes [p .pid ] = p
686+ # Assertion disabled as this codepath is also used to replace a
687+ # worker that unexpectedly dies, even when using the 'fork' start
688+ # method. That means there is still a potential deadlock bug. If a
689+ # 'fork' mp_context worker dies, we'll be forking a new one when
690+ # we know a thread is running (self._executor_manager_thread).
691+ #assert self._safe_to_dynamically_spawn_children or not self._executor_manager_thread, 'https://github.com/python/cpython/issues/90622'
692+ self ._spawn_process ()
693+
694+ def _launch_processes (self ):
695+ # https://github.com/python/cpython/issues/90622
696+ assert not self ._executor_manager_thread , (
697+ 'Processes cannot be fork()ed after the thread has started, '
698+ 'deadlock in the child processes could result.' )
699+ for _ in range (len (self ._processes ), self ._max_workers ):
700+ self ._spawn_process ()
701+
702+ def _spawn_process (self ):
703+ p = self ._mp_context .Process (
704+ target = _process_worker ,
705+ args = (self ._call_queue ,
706+ self ._result_queue ,
707+ self ._initializer ,
708+ self ._initargs ))
709+ p .start ()
710+ self ._processes [p .pid ] = p
688711
689712 def submit (self , fn , / , * args , ** kwargs ):
690713 with self ._shutdown_lock :
@@ -705,7 +728,8 @@ def submit(self, fn, /, *args, **kwargs):
705728 # Wake up queue management thread
706729 self ._executor_manager_thread_wakeup .wakeup ()
707730
708- self ._adjust_process_count ()
731+ if self ._safe_to_dynamically_spawn_children :
732+ self ._adjust_process_count ()
709733 self ._start_executor_manager_thread ()
710734 return f
711735 submit .__doc__ = _base .Executor .submit .__doc__
0 commit comments