Skip to content

Commit bc2d69d

Browse files
committed
stream: compose with async functions
Enables async function support for stream.compose. PR-URL: nodejs#39435 Reviewed-By: Matteo Collina <[email protected]> Reviewed-By: Benjamin Gruenbaum <[email protected]> Reviewed-By: James M Snell <[email protected]>
1 parent 3980333 commit bc2d69d

File tree

2 files changed

+54
-27
lines changed

2 files changed

+54
-27
lines changed

doc/api/stream.md

Lines changed: 50 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1912,7 +1912,7 @@ failure, this can cause event listener leaks and swallowed errors.
19121912
added: REPLACEME
19131913
-->
19141914

1915-
* `streams` {Stream[]}
1915+
* `streams` {Stream[]|Iterable[]|AsyncIterable[]|Function[]}
19161916
* Returns: {stream.Duplex}
19171917

19181918
Combines two or more streams into a `Duplex` stream that writes to the
@@ -1926,6 +1926,9 @@ when passing streams to `stream.pipeline`, typically the first stream is
19261926
a readable stream and the last a writable stream, forming a closed
19271927
circuit.
19281928

1929+
If passed a `Function` it must be a factory method taking a `source`
1930+
`Iterable`.
1931+
19291932
```mjs
19301933
import { compose, Transform } from 'stream';
19311934

@@ -1935,11 +1938,11 @@ const removeSpaces = new Transform({
19351938
}
19361939
});
19371940

1938-
const toUpper = new Transform({
1939-
transform(chunk, encoding, callback) {
1940-
callback(null, String(chunk).toUpperCase());
1941+
async function* toUpper(source) {
1942+
for await (const chunk of source) {
1943+
yield String(chunk).toUpperCase();
19411944
}
1942-
});
1945+
}
19431946

19441947
let res = '';
19451948
for await (const buf of compose(removeSpaces, toUpper).end('hello world')) {
@@ -1949,6 +1952,48 @@ for await (const buf of compose(removeSpaces, toUpper).end('hello world')) {
19491952
console.log(res); // prints 'HELLOWORLD'
19501953
```
19511954

1955+
`stream.compose` can be used to convert async iterables, generators and
1956+
functions into streams.
1957+
1958+
* `AsyncIterable` converts into a readable `Duplex`. Cannot yield
1959+
`null`.
1960+
* `AsyncGeneratorFunction` converts into a readable/writable transform `Duplex`.
1961+
Must take a source `AsyncIterable` as first parameter. Cannot yield
1962+
`null`.
1963+
* `AsyncFunction` converts into a writable `Duplex`. Must return
1964+
either `null` or `undefined`.
1965+
1966+
```mjs
1967+
import { compose } from 'stream';
1968+
import { finished } from 'stream/promises';
1969+
1970+
// Convert AsyncIterable into readable Duplex.
1971+
const s1 = compose(async function*() {
1972+
yield 'Hello';
1973+
yield 'World';
1974+
}());
1975+
1976+
// Convert AsyncGenerator into transform Duplex.
1977+
const s2 = compose(async function*(source) {
1978+
for await (const chunk of source) {
1979+
yield String(chunk).toUpperCase();
1980+
}
1981+
});
1982+
1983+
let res = '';
1984+
1985+
// Convert AsyncFunction into writable Duplex.
1986+
const s3 = compose(async function(source) {
1987+
for await (const chunk of source) {
1988+
res += chunk;
1989+
}
1990+
});
1991+
1992+
await finished(compose(s1, s2, s3));
1993+
1994+
console.log(res); // prints 'HELLOWORLD'
1995+
```
1996+
19521997
### `stream.Readable.from(iterable, [options])`
19531998
<!-- YAML
19541999
added:

lib/stream.js

Lines changed: 4 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -30,15 +30,10 @@ const {
3030
} = require('internal/util');
3131

3232
const pipeline = require('internal/streams/pipeline');
33-
const _compose = require('internal/streams/compose');
33+
const compose = require('internal/streams/compose');
34+
const { destroyer } = require('internal/streams/destroy');
3435
const eos = require('internal/streams/end-of-stream');
3536
const internalBuffer = require('internal/buffer');
36-
const { isNodeStream } = require('internal/streams/utils');
37-
const {
38-
codes: {
39-
ERR_INVALID_ARG_VALUE,
40-
},
41-
} = require('internal/errors');
4237

4338
const promises = require('stream/promises');
4439

@@ -52,21 +47,8 @@ Stream.pipeline = pipeline;
5247
const { addAbortSignal } = require('internal/streams/add-abort-signal');
5348
Stream.addAbortSignal = addAbortSignal;
5449
Stream.finished = eos;
55-
56-
Stream.compose = function compose(...streams) {
57-
// TODO (ronag): Remove this once async function API
58-
// has been discussed.
59-
for (let n = 0; n < streams.length; ++n) {
60-
if (!isNodeStream(streams[n])) {
61-
throw new ERR_INVALID_ARG_VALUE(
62-
`streams[${n}]`,
63-
streams[n],
64-
'must be stream'
65-
);
66-
}
67-
}
68-
return _compose(...streams);
69-
};
50+
Stream.destroy = destroyer;
51+
Stream.compose = compose;
7052

7153
ObjectDefineProperty(Stream, 'promises', {
7254
configurable: true,

0 commit comments

Comments
 (0)