@@ -162,9 +162,7 @@ def __init__(self, processes=None, initializer=None, initargs=(),
162162
163163 self ._worker_handler = threading .Thread (
164164 target = Pool ._handle_workers ,
165- args = (self ._cache , self ._processes , self ._pool , self .Process ,
166- self ._inqueue , self ._outqueue , self ._initializer ,
167- self ._initargs , self ._maxtasksperchild , self ._taskqueue )
165+ args = (self , )
168166 )
169167 self ._worker_handler .daemon = True
170168 self ._worker_handler ._state = RUN
@@ -196,56 +194,42 @@ def __init__(self, processes=None, initializer=None, initargs=(),
196194 exitpriority = 15
197195 )
198196
199- @staticmethod
200- def _join_exited_workers (pool ):
197+ def _join_exited_workers (self ):
201198 """Cleanup after any worker processes which have exited due to reaching
202199 their specified lifetime. Returns True if any workers were cleaned up.
203200 """
204201 cleaned = False
205- for i in reversed (range (len (pool ))):
206- worker = pool [i ]
202+ for i in reversed (range (len (self . _pool ))):
203+ worker = self . _pool [i ]
207204 if worker .exitcode is not None :
208205 # worker exited
209206 debug ('cleaning up worker %d' % i )
210207 worker .join ()
211208 cleaned = True
212- del pool [i ]
209+ del self . _pool [i ]
213210 return cleaned
214211
215212 def _repopulate_pool (self ):
216- return self ._repopulate_pool_static (self ._processes , self ._pool ,
217- self .Process , self ._inqueue ,
218- self ._outqueue , self ._initializer ,
219- self ._initargs ,
220- self ._maxtasksperchild )
221-
222- @staticmethod
223- def _repopulate_pool_static (processes , pool , Process , inqueue , outqueue ,
224- initializer , initargs , maxtasksperchild ):
225213 """Bring the number of pool processes up to the specified number,
226214 for use after reaping workers which have exited.
227215 """
228- for i in range (processes - len (pool )):
229- w = Process (target = worker ,
230- args = (inqueue , outqueue ,
231- initializer ,
232- initargs , maxtasksperchild )
233- )
234- pool .append (w )
216+ for i in range (self . _processes - len (self . _pool )):
217+ w = self . Process (target = worker ,
218+ args = (self . _inqueue , self . _outqueue ,
219+ self . _initializer ,
220+ self . _initargs , self . _maxtasksperchild )
221+ )
222+ self . _pool .append (w )
235223 w .name = w .name .replace ('Process' , 'PoolWorker' )
236224 w .daemon = True
237225 w .start ()
238226 debug ('added worker' )
239227
240- @staticmethod
241- def _maintain_pool (processes , pool , Process , inqueue , outqueue ,
242- initializer , initargs , maxtasksperchild ):
228+ def _maintain_pool (self ):
243229 """Clean up any exited workers and start replacements for them.
244230 """
245- if Pool ._join_exited_workers (pool ):
246- Pool ._repopulate_pool_static (processes , pool , Process , inqueue ,
247- outqueue , initializer , initargs ,
248- maxtasksperchild )
231+ if self ._join_exited_workers ():
232+ self ._repopulate_pool ()
249233
250234 def _setup_queues (self ):
251235 from .queues import SimpleQueue
@@ -335,18 +319,16 @@ def map_async(self, func, iterable, chunksize=None, callback=None):
335319 return result
336320
337321 @staticmethod
338- def _handle_workers (cache , processes , pool , Process , inqueue , outqueue ,
339- initializer , initargs , maxtasksperchild , taskqueue ):
322+ def _handle_workers (pool ):
340323 thread = threading .current_thread ()
341324
342325 # Keep maintaining workers until the cache gets drained, unless the pool
343326 # is terminated.
344- while thread ._state == RUN or (cache and thread ._state != TERMINATE ):
345- Pool ._maintain_pool (processes , pool , Process , inqueue , outqueue ,
346- initializer , initargs , maxtasksperchild )
327+ while thread ._state == RUN or (pool ._cache and thread ._state != TERMINATE ):
328+ pool ._maintain_pool ()
347329 time .sleep (0.1 )
348330 # send sentinel to stop workers
349- taskqueue .put (None )
331+ pool . _taskqueue .put (None )
350332 debug ('worker handler exiting' )
351333
352334 @staticmethod
0 commit comments