import {Duplex} from 'node:stream'; import {callbackify} from 'node:util'; import {BINARY_ENCODINGS} from '../arguments/encoding-option.js'; import { getSubprocessStdout, getReadableOptions, getReadableMethods, onStdoutFinished, onReadableDestroy, } from './readable.js'; import { getSubprocessStdin, getWritableMethods, onStdinFinished, onWritableDestroy, } from './writable.js'; // Create a `Duplex` stream combining both `subprocess.readable()` and `subprocess.writable()` export const createDuplex = ({subprocess, concurrentStreams, encoding}, {from, to, binary: binaryOption = true, preserveNewlines = true} = {}) => { const binary = binaryOption || BINARY_ENCODINGS.has(encoding); const {subprocessStdout, waitReadableDestroy} = getSubprocessStdout(subprocess, from, concurrentStreams); const {subprocessStdin, waitWritableFinal, waitWritableDestroy} = getSubprocessStdin(subprocess, to, concurrentStreams); const {readableEncoding, readableObjectMode, readableHighWaterMark} = getReadableOptions(subprocessStdout, binary); const {read, onStdoutDataDone} = getReadableMethods({ subprocessStdout, subprocess, binary, encoding, preserveNewlines, }); const duplex = new Duplex({ read, ...getWritableMethods(subprocessStdin, subprocess, waitWritableFinal), destroy: callbackify(onDuplexDestroy.bind(undefined, { subprocessStdout, subprocessStdin, subprocess, waitReadableDestroy, waitWritableFinal, waitWritableDestroy, })), readableHighWaterMark, writableHighWaterMark: subprocessStdin.writableHighWaterMark, readableObjectMode, writableObjectMode: subprocessStdin.writableObjectMode, encoding: readableEncoding, }); onStdoutFinished({ subprocessStdout, onStdoutDataDone, readable: duplex, subprocess, subprocessStdin, }); onStdinFinished(subprocessStdin, duplex, subprocessStdout); return duplex; }; const onDuplexDestroy = async ({subprocessStdout, subprocessStdin, subprocess, waitReadableDestroy, waitWritableFinal, waitWritableDestroy}, error) => { await Promise.all([ onReadableDestroy({subprocessStdout, subprocess, waitReadableDestroy}, error), onWritableDestroy({ subprocessStdin, subprocess, waitWritableFinal, waitWritableDestroy, }, error), ]); };