@@ -70,14 +70,52 @@ func (p *WorkerPool) Push(data Data) {
7070 atomic .AddInt64 (& p .numInQueue , 1 )
7171 p .lock .Lock ()
7272 if p .blockTimeout > 0 && p .boostTimeout > 0 && (p .numberOfWorkers <= p .maxNumberOfWorkers || p .maxNumberOfWorkers < 0 ) {
73- p .lock .Unlock ()
73+ if p .numberOfWorkers == 0 {
74+ p .zeroBoost ()
75+ } else {
76+ p .lock .Unlock ()
77+ }
7478 p .pushBoost (data )
7579 } else {
7680 p .lock .Unlock ()
7781 p .dataChan <- data
7882 }
7983}
8084
85+ func (p * WorkerPool ) zeroBoost () {
86+ ctx , cancel := context .WithCancel (p .baseCtx )
87+ mq := GetManager ().GetManagedQueue (p .qid )
88+ boost := p .boostWorkers
89+ if (boost + p .numberOfWorkers ) > p .maxNumberOfWorkers && p .maxNumberOfWorkers >= 0 {
90+ boost = p .maxNumberOfWorkers - p .numberOfWorkers
91+ }
92+ if mq != nil {
93+ log .Warn ("WorkerPool: %d (for %s) has zero workers - adding %d temporary workers for %s" , p .qid , mq .Name , boost , p .boostTimeout )
94+
95+ start := time .Now ()
96+ pid := mq .RegisterWorkers (boost , start , true , start .Add (p .boostTimeout ), cancel , false )
97+ go func () {
98+ select {
99+ case <- ctx .Done ():
100+ case <- time .After (p .boostTimeout ):
101+ }
102+ mq .RemoveWorkers (pid )
103+ cancel ()
104+ }()
105+ } else {
106+ log .Warn ("WorkerPool: %d has zero workers - adding %d temporary workers for %s" , p .qid , p .boostWorkers , p .boostTimeout )
107+ go func () {
108+ select {
109+ case <- ctx .Done ():
110+ case <- time .After (p .boostTimeout ):
111+ }
112+ cancel ()
113+ }()
114+ }
115+ p .lock .Unlock ()
116+ p .addWorkers (ctx , boost )
117+ }
118+
81119func (p * WorkerPool ) pushBoost (data Data ) {
82120 select {
83121 case p .dataChan <- data :
@@ -112,7 +150,7 @@ func (p *WorkerPool) pushBoost(data Data) {
112150 log .Warn ("WorkerPool: %d (for %s) Channel blocked for %v - adding %d temporary workers for %s, block timeout now %v" , p .qid , mq .Name , ourTimeout , boost , p .boostTimeout , p .blockTimeout )
113151
114152 start := time .Now ()
115- pid := mq .RegisterWorkers (boost , start , false , start , cancel , false )
153+ pid := mq .RegisterWorkers (boost , start , true , start . Add ( p . boostTimeout ) , cancel , false )
116154 go func () {
117155 <- ctx .Done ()
118156 mq .RemoveWorkers (pid )
0 commit comments