@@ -5,6 +5,7 @@ const EventEmitter = require('events');
55const assert = require('assert');
66const path = require('path');
77const util = require('util');
8+ const { Readable, Writable } = require('stream');
89const {
910 ERR_INVALID_ARG_TYPE,
1011 ERR_WORKER_NEED_ABSOLUTE_PATH,
@@ -29,13 +30,20 @@ const isMainThread = threadId === 0;
2930
3031const kOnMessageListener = Symbol('kOnMessageListener');
3132const kHandle = Symbol('kHandle');
33+ const kName = Symbol('kName');
3234const kPort = Symbol('kPort');
3335const kPublicPort = Symbol('kPublicPort');
3436const kDispose = Symbol('kDispose');
3537const kOnExit = Symbol('kOnExit');
3638const kOnMessage = Symbol('kOnMessage');
3739const kOnCouldNotSerializeErr = Symbol('kOnCouldNotSerializeErr');
3840const kOnErrorMessage = Symbol('kOnErrorMessage');
41+ const kParentSideStdio = Symbol('kParentSideStdio');
42+ const kWritableCallbacks = Symbol('kWritableCallbacks');
43+ const kStdioWantsMoreDataCallback = Symbol('kStdioWantsMoreDataCallback');
44+ const kStartedReading = Symbol('kStartedReading');
45+ const kWaitingStreams = Symbol('kWaitingStreams');
46+ const kIncrementsPortRef = Symbol('kIncrementsPortRef');
3947
4048const debug = util.debuglog('worker');
4149
@@ -129,6 +137,72 @@ function setupPortReferencing(port, eventEmitter, eventName) {
129137}
130138
131139
140+ class ReadableWorkerStdio extends Readable {
141+ constructor(port, name) {
142+ super();
143+ this[kPort] = port;
144+ this[kName] = name;
145+ this[kIncrementsPortRef] = true;
146+ this[kStartedReading] = false;
147+ this.on('end', () => {
148+ if (this[kIncrementsPortRef] && --this[kPort][kWaitingStreams] === 0)
149+ this[kPort].unref();
150+ });
151+ }
152+
153+ _read() {
154+ if (!this[kStartedReading] && this[kIncrementsPortRef]) {
155+ this[kStartedReading] = true;
156+ if (this[kPort][kWaitingStreams]++ === 0)
157+ this[kPort].ref();
158+ }
159+
160+ this[kPort].postMessage({
161+ type: 'stdioWantsMoreData',
162+ stream: this[kName]
163+ });
164+ }
165+ }
166+
167+ class WritableWorkerStdio extends Writable {
168+ constructor(port, name) {
169+ super({ decodeStrings: false });
170+ this[kPort] = port;
171+ this[kName] = name;
172+ this[kWritableCallbacks] = [];
173+ }
174+
175+ _write(chunk, encoding, cb) {
176+ this[kPort].postMessage({
177+ type: 'stdioPayload',
178+ stream: this[kName],
179+ chunk,
180+ encoding
181+ });
182+ this[kWritableCallbacks].push(cb);
183+ if (this[kPort][kWaitingStreams]++ === 0)
184+ this[kPort].ref();
185+ }
186+
187+ _final(cb) {
188+ this[kPort].postMessage({
189+ type: 'stdioPayload',
190+ stream: this[kName],
191+ chunk: null
192+ });
193+ cb();
194+ }
195+
196+ [kStdioWantsMoreDataCallback]() {
197+ const cbs = this[kWritableCallbacks];
198+ this[kWritableCallbacks] = [];
199+ for (const cb of cbs)
200+ cb();
201+ if ((this[kPort][kWaitingStreams] -= cbs.length) === 0)
202+ this[kPort].unref();
203+ }
204+ }
205+
132206class Worker extends EventEmitter {
133207 constructor(filename, options = {}) {
134208 super();
@@ -154,8 +228,25 @@ class Worker extends EventEmitter {
154228 this[kPort].on('message', (data) => this[kOnMessage](data));
155229 this[kPort].start();
156230 this[kPort].unref();
231+ this[kPort][kWaitingStreams] = 0;
157232 debug(`[${threadId}] created Worker with ID ${this.threadId}`);
158233
234+ let stdin = null;
235+ if (options.stdin)
236+ stdin = new WritableWorkerStdio(this[kPort], 'stdin');
237+ const stdout = new ReadableWorkerStdio(this[kPort], 'stdout');
238+ if (!options.stdout) {
239+ stdout[kIncrementsPortRef] = false;
240+ pipeWithoutWarning(stdout, process.stdout);
241+ }
242+ const stderr = new ReadableWorkerStdio(this[kPort], 'stderr');
243+ if (!options.stderr) {
244+ stderr[kIncrementsPortRef] = false;
245+ pipeWithoutWarning(stderr, process.stderr);
246+ }
247+
248+ this[kParentSideStdio] = { stdin, stdout, stderr };
249+
159250 const { port1, port2 } = new MessageChannel();
160251 this[kPublicPort] = port1;
161252 this[kPublicPort].on('message', (message) => this.emit('message', message));
@@ -165,7 +256,8 @@ class Worker extends EventEmitter {
165256 filename,
166257 doEval: !!options.eval,
167258 workerData: options.workerData,
168- publicPort: port2
259+ publicPort: port2,
260+ hasStdin: !!options.stdin
169261 }, [port2]);
170262 // Actually start the new thread now that everything is in place.
171263 this[kHandle].startThread();
@@ -197,6 +289,16 @@ class Worker extends EventEmitter {
197289 return this[kOnCouldNotSerializeErr]();
198290 case 'errorMessage':
199291 return this[kOnErrorMessage](message.error);
292+ case 'stdioPayload':
293+ {
294+ const { stream, chunk, encoding } = message;
295+ return this[kParentSideStdio][stream].push(chunk, encoding);
296+ }
297+ case 'stdioWantsMoreData':
298+ {
299+ const { stream } = message;
300+ return this[kParentSideStdio][stream][kStdioWantsMoreDataCallback]();
301+ }
200302 }
201303
202304 assert.fail(`Unknown worker message type ${message.type}`);
@@ -207,6 +309,18 @@ class Worker extends EventEmitter {
207309 this[kHandle] = null;
208310 this[kPort] = null;
209311 this[kPublicPort] = null;
312+
313+ const { stdout, stderr } = this[kParentSideStdio];
314+ this[kParentSideStdio] = null;
315+
316+ if (!stdout._readableState.ended) {
317+ debug(`[${threadId}] explicitly closes stdout for ${this.threadId}`);
318+ stdout.push(null);
319+ }
320+ if (!stderr._readableState.ended) {
321+ debug(`[${threadId}] explicitly closes stderr for ${this.threadId}`);
322+ stderr.push(null);
323+ }
210324 }
211325
212326 postMessage(...args) {
@@ -243,6 +357,27 @@ class Worker extends EventEmitter {
243357
244358 return this[kHandle].threadId;
245359 }
360+
361+ get stdin() {
362+ return this[kParentSideStdio].stdin;
363+ }
364+
365+ get stdout() {
366+ return this[kParentSideStdio].stdout;
367+ }
368+
369+ get stderr() {
370+ return this[kParentSideStdio].stderr;
371+ }
372+ }
373+
374+ const workerStdio = {};
375+ if (!isMainThread) {
376+ const port = getEnvMessagePort();
377+ port[kWaitingStreams] = 0;
378+ workerStdio.stdin = new ReadableWorkerStdio(port, 'stdin');
379+ workerStdio.stdout = new WritableWorkerStdio(port, 'stdout');
380+ workerStdio.stderr = new WritableWorkerStdio(port, 'stderr');
246381}
247382
248383let originalFatalException;
@@ -256,10 +391,14 @@ function setupChild(evalScript) {
256391
257392 port.on('message', (message) => {
258393 if (message.type === 'loadScript') {
259- const { filename, doEval, workerData, publicPort } = message;
394+ const { filename, doEval, workerData, publicPort, hasStdin } = message;
260395 publicWorker.parentPort = publicPort;
261396 setupPortReferencing(publicPort, publicPort, 'message');
262397 publicWorker.workerData = workerData;
398+
399+ if (!hasStdin)
400+ workerStdio.stdin.push(null);
401+
263402 debug(`[${threadId}] starts worker script ${filename} ` +
264403 `(eval = ${eval}) at cwd = ${process.cwd()}`);
265404 port.unref();
@@ -271,6 +410,14 @@ function setupChild(evalScript) {
271410 require('module').runMain();
272411 }
273412 return;
413+ } else if (message.type === 'stdioPayload') {
414+ const { stream, chunk, encoding } = message;
415+ workerStdio[stream].push(chunk, encoding);
416+ return;
417+ } else if (message.type === 'stdioWantsMoreData') {
418+ const { stream } = message;
419+ workerStdio[stream][kStdioWantsMoreDataCallback]();
420+ return;
274421 }
275422
276423 assert.fail(`Unknown worker message type ${message.type}`);
@@ -317,11 +464,24 @@ function deserializeError(error) {
317464 error.byteLength).toString('utf8');
318465}
319466
467+ function pipeWithoutWarning(source, dest) {
468+ const sourceMaxListeners = source._maxListeners;
469+ const destMaxListeners = dest._maxListeners;
470+ source.setMaxListeners(Infinity);
471+ dest.setMaxListeners(Infinity);
472+
473+ source.pipe(dest);
474+
475+ source._maxListeners = sourceMaxListeners;
476+ dest._maxListeners = destMaxListeners;
477+ }
478+
320479module.exports = {
321480 MessagePort,
322481 MessageChannel,
323482 threadId,
324483 Worker,
325484 setupChild,
326- isMainThread
485+ isMainThread,
486+ workerStdio
327487};
0 commit comments