import {Transform, getDefaultHighWaterMark} from 'node:stream'; import {isAsyncGenerator} from '../stdio/type.js'; import {getSplitLinesGenerator, getAppendNewlineGenerator} from './split.js'; import {getValidateTransformInput, getValidateTransformReturn} from './validate.js'; import {getEncodingTransformGenerator} from './encoding-transform.js'; import { pushChunks, transformChunk, finalChunks, destroyTransform, } from './run-async.js'; import { pushChunksSync, transformChunkSync, finalChunksSync, runTransformSync, } from './run-sync.js'; /* Generators can be used to transform/filter standard streams. Generators have a simple syntax, yet allows all of the following: - Sharing `state` between chunks - Flushing logic, by using a `final` function - Asynchronous logic - Emitting multiple chunks from a single source chunk, even if spaced in time, by using multiple `yield` - Filtering, by using no `yield` Therefore, there is no need to allow Node.js or web transform streams. The `highWaterMark` is kept as the default value, since this is what `subprocess.std*` uses. Chunks are currently processed serially. We could add a `concurrency` option to parallelize in the future. Transform an array of generator functions into a `Transform` stream. `Duplex.from(generator)` cannot be used because it does not allow setting the `objectMode` and `highWaterMark`. */ export const generatorToStream = ({ value, value: {transform, final, writableObjectMode, readableObjectMode}, optionName, }, {encoding}) => { const state = {}; const generators = addInternalGenerators(value, encoding, optionName); const transformAsync = isAsyncGenerator(transform); const finalAsync = isAsyncGenerator(final); const transformMethod = transformAsync ? pushChunks.bind(undefined, transformChunk, state) : pushChunksSync.bind(undefined, transformChunkSync); const finalMethod = transformAsync || finalAsync ? pushChunks.bind(undefined, finalChunks, state) : pushChunksSync.bind(undefined, finalChunksSync); const destroyMethod = transformAsync || finalAsync ? destroyTransform.bind(undefined, state) : undefined; const stream = new Transform({ writableObjectMode, writableHighWaterMark: getDefaultHighWaterMark(writableObjectMode), readableObjectMode, readableHighWaterMark: getDefaultHighWaterMark(readableObjectMode), transform(chunk, encoding, done) { transformMethod([chunk, generators, 0], this, done); }, flush(done) { finalMethod([generators], this, done); }, destroy: destroyMethod, }); return {stream}; }; // Applies transform generators in sync mode export const runGeneratorsSync = (chunks, stdioItems, encoding, isInput) => { const generators = stdioItems.filter(({type}) => type === 'generator'); const reversedGenerators = isInput ? generators.reverse() : generators; for (const {value, optionName} of reversedGenerators) { const generators = addInternalGenerators(value, encoding, optionName); chunks = runTransformSync(generators, chunks); } return chunks; }; // Generators used internally to convert the chunk type, validate it, and split into lines const addInternalGenerators = ( {transform, final, binary, writableObjectMode, readableObjectMode, preserveNewlines}, encoding, optionName, ) => { const state = {}; return [ {transform: getValidateTransformInput(writableObjectMode, optionName)}, getEncodingTransformGenerator(binary, encoding, writableObjectMode), getSplitLinesGenerator(binary, preserveNewlines, writableObjectMode, state), {transform, final}, {transform: getValidateTransformReturn(readableObjectMode, optionName)}, getAppendNewlineGenerator({ binary, preserveNewlines, readableObjectMode, state, }), ].filter(Boolean); };