@@ -1696,7 +1696,7 @@ added: v16.14.0
16961696 * ` signal ` {AbortSignal} aborted if the stream is destroyed allowing to
16971697 abort the ` fn ` call early.
16981698* ` options ` {Object}
1699- * ` concurrency ` {number} the maximal concurrent invocation of ` fn ` to call
1699+ * ` concurrency ` {number} the maximum concurrent invocation of ` fn ` to call
17001700 on the stream at once. ** Default:** ` 1 ` .
17011701 * ` signal ` {AbortSignal} allows destroying the stream if the signal is
17021702 aborted.
@@ -1740,7 +1740,7 @@ added: v16.14.0
17401740 * ` signal ` {AbortSignal} aborted if the stream is destroyed allowing to
17411741 abort the ` fn ` call early.
17421742* ` options ` {Object}
1743- * ` concurrency ` {number} the maximal concurrent invocation of ` fn ` to call
1743+ * ` concurrency ` {number} the maximum concurrent invocation of ` fn ` to call
17441744 on the stream at once. ** Default:** ` 1 ` .
17451745 * ` signal ` {AbortSignal} allows destroying the stream if the signal is
17461746 aborted.
@@ -1775,6 +1775,65 @@ for await (const result of dnsResults) {
17751775}
17761776```
17771777
1778+ ### ` readable.forEach(fn[, options]) `
1779+
1780+ <!-- YAML
1781+ added: REPLACEME
1782+ -->
1783+
1784+ > Stability: 1 - Experimental
1785+
1786+ * ` fn ` {Function|AsyncFunction} a function to call on each item of the stream.
1787+ * ` data ` {any} a chunk of data from the stream.
1788+ * ` options ` {Object}
1789+ * ` signal ` {AbortSignal} aborted if the stream is destroyed allowing to
1790+ abort the ` fn ` call early.
1791+ * ` options ` {Object}
1792+ * ` concurrency ` {number} the maximum concurrent invocation of ` fn ` to call
1793+ on the stream at once. ** Default:** ` 1 ` .
1794+ * ` signal ` {AbortSignal} allows destroying the stream if the signal is
1795+ aborted.
1796+ * Returns: {Promise} a promise for when the stream has finished.
1797+
1798+ This method allows iterating a stream. For each item in the stream the
1799+ ` fn ` function will be called. If the ` fn ` function returns a promise - that
1800+ promise will be ` await ` ed.
1801+
1802+ This method is different from ` for await...of ` loops in that it can optionally
1803+ process items concurrently. In addition, a ` forEach ` iteration can only be
1804+ stopped by having passed a ` signal ` option and aborting the related
1805+ ` AbortController ` while ` for await...of ` can be stopped with ` break ` or
1806+ ` return ` . In either case the stream will be destroyed.
1807+
1808+ This method is different from listening to the [ ` 'data' ` ] [ ] event in that it
1809+ uses the [ ` readable ` ] [ ] event in the underlying machinary and can limit the
1810+ number of concurrent ` fn ` calls.
1811+
1812+ ``` mjs
1813+ import { Readable } from ' stream' ;
1814+ import { Resolver } from ' dns/promises' ;
1815+
1816+ // With a synchronous predicate.
1817+ for await (const item of Readable .from ([1 , 2 , 3 , 4 ]).filter ((x ) => x > 2 )) {
1818+ console .log (item); // 3, 4
1819+ }
1820+ // With an asynchronous predicate, making at most 2 queries at a time.
1821+ const resolver = new Resolver ();
1822+ const dnsResults = await Readable .from ([
1823+ ' nodejs.org' ,
1824+ ' openjsf.org' ,
1825+ ' www.linuxfoundation.org' ,
1826+ ]).map (async (domain ) => {
1827+ const { address } = await resolver .resolve4 (domain, { ttl: true });
1828+ return address;
1829+ }, { concurrency: 2 });
1830+ await dnsResults .forEach ((result ) => {
1831+ // Logs result, similar to `for await (const result of dnsResults)`
1832+ console .log (result);
1833+ });
1834+ console .log (' done' ); // Stream has finished
1835+ ```
1836+
17781837### Duplex and transform streams
17791838
17801839#### Class: ` stream.Duplex `
0 commit comments