6868class _ThreadWakeup :
6969 def __init__ (self ):
7070 self ._closed = False
71+ self ._lock = threading .Lock ()
7172 self ._reader , self ._writer = mp .Pipe (duplex = False )
7273
7374 def close (self ):
74- # Please note that we do not take the shutdown lock when
75+ # Please note that we do not take the self._lock when
7576 # calling clear() (to avoid deadlocking) so this method can
7677 # only be called safely from the same thread as all calls to
77- # clear() even if you hold the shutdown lock. Otherwise we
78+ # clear() even if you hold the lock. Otherwise we
7879 # might try to read from the closed pipe.
79- if not self ._closed :
80- self ._closed = True
81- self ._writer .close ()
82- self ._reader .close ()
80+ with self ._lock :
81+ if not self ._closed :
82+ self ._closed = True
83+ self ._writer .close ()
84+ self ._reader .close ()
8385
8486 def wakeup (self ):
85- if not self ._closed :
86- self ._writer .send_bytes (b"" )
87+ with self ._lock :
88+ if not self ._closed :
89+ self ._writer .send_bytes (b"" )
8790
8891 def clear (self ):
89- if not self ._closed :
90- while self ._reader .poll ():
91- self ._reader .recv_bytes ()
92+ assert not self ._closed
93+ while self ._reader .poll ():
94+ self ._reader .recv_bytes ()
9295
9396
9497def _python_exit ():
@@ -167,10 +170,8 @@ def __init__(self, work_id, fn, args, kwargs):
167170
168171class _SafeQueue (Queue ):
169172 """Safe Queue set exception to the future object linked to a job"""
170- def __init__ (self , max_size = 0 , * , ctx , pending_work_items , shutdown_lock ,
171- thread_wakeup ):
173+ def __init__ (self , max_size = 0 , * , ctx , pending_work_items , thread_wakeup ):
172174 self .pending_work_items = pending_work_items
173- self .shutdown_lock = shutdown_lock
174175 self .thread_wakeup = thread_wakeup
175176 super ().__init__ (max_size , ctx = ctx )
176177
@@ -179,8 +180,7 @@ def _on_queue_feeder_error(self, e, obj):
179180 tb = format_exception (type (e ), e , e .__traceback__ )
180181 e .__cause__ = _RemoteTraceback ('\n """\n {}"""' .format ('' .join (tb )))
181182 work_item = self .pending_work_items .pop (obj .work_id , None )
182- with self .shutdown_lock :
183- self .thread_wakeup .wakeup ()
183+ self .thread_wakeup .wakeup ()
184184 # work_item can be None if another process terminated. In this
185185 # case, the executor_manager_thread fails all work_items
186186 # with BrokenProcessPool
@@ -296,12 +296,10 @@ def __init__(self, executor):
296296 # if there is no pending work item.
297297 def weakref_cb (_ ,
298298 thread_wakeup = self .thread_wakeup ,
299- shutdown_lock = self .shutdown_lock ,
300299 mp_util_debug = mp .util .debug ):
301300 mp_util_debug ('Executor collected: triggering callback for'
302301 ' QueueManager wakeup' )
303- with shutdown_lock :
304- thread_wakeup .wakeup ()
302+ thread_wakeup .wakeup ()
305303
306304 self .executor_reference = weakref .ref (executor , weakref_cb )
307305
@@ -429,11 +427,6 @@ def wait_result_broken_or_wakeup(self):
429427 elif wakeup_reader in ready :
430428 is_broken = False
431429
432- # No need to hold the _shutdown_lock here because:
433- # 1. we're the only thread to use the wakeup reader
434- # 2. we're also the only thread to call thread_wakeup.close()
435- # 3. we want to avoid a possible deadlock when both reader and writer
436- # would block (gh-105829)
437430 self .thread_wakeup .clear ()
438431
439432 return result_item , is_broken , cause
@@ -735,7 +728,6 @@ def __init__(self, max_workers=None, mp_context=None,
735728 self ._call_queue = _SafeQueue (
736729 max_size = queue_size , ctx = self ._mp_context ,
737730 pending_work_items = self ._pending_work_items ,
738- shutdown_lock = self ._shutdown_lock ,
739731 thread_wakeup = self ._executor_manager_thread_wakeup )
740732 # Killed worker processes can produce spurious "broken pipe"
741733 # tracebacks in the queue's own worker thread. But we detect killed
0 commit comments