| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465 |
- import type { ChildProcess } from 'child_process'
- import { stat } from 'fs/promises'
- import type { Readable } from 'stream'
- import treeKill from 'tree-kill'
- import { generateTaskId } from '../Task.js'
- import { formatDuration } from './format.js'
- import {
- MAX_TASK_OUTPUT_BYTES,
- MAX_TASK_OUTPUT_BYTES_DISPLAY,
- } from './task/diskOutput.js'
- import { TaskOutput } from './task/TaskOutput.js'
- export type ExecResult = {
- stdout: string
- stderr: string
- code: number
- interrupted: boolean
- backgroundTaskId?: string
- backgroundedByUser?: boolean
- /** Set when assistant-mode auto-backgrounded a long-running blocking command. */
- assistantAutoBackgrounded?: boolean
- /** Set when stdout was too large to fit inline — points to the output file on disk. */
- outputFilePath?: string
- /** Total size of the output file in bytes (set when outputFilePath is set). */
- outputFileSize?: number
- /** The task ID for the output file (set when outputFilePath is set). */
- outputTaskId?: string
- /** Error message when the command failed before spawning (e.g., deleted cwd). */
- preSpawnError?: string
- }
- export type ShellCommand = {
- background: (backgroundTaskId: string) => boolean
- result: Promise<ExecResult>
- kill: () => void
- status: 'running' | 'backgrounded' | 'completed' | 'killed'
- /**
- * Cleans up stream resources (event listeners).
- * Should be called after the command completes or is killed to prevent memory leaks.
- */
- cleanup: () => void
- onTimeout?: (
- callback: (backgroundFn: (taskId: string) => boolean) => void,
- ) => void
- /** The TaskOutput instance that owns all stdout/stderr data and progress. */
- taskOutput: TaskOutput
- }
- const SIGKILL = 137
- const SIGTERM = 143
- // Background tasks write stdout/stderr directly to a file fd (no JS involvement),
- // so a stuck append loop can fill the disk. Poll file size and kill when exceeded.
- const SIZE_WATCHDOG_INTERVAL_MS = 5_000
- function prependStderr(prefix: string, stderr: string): string {
- return stderr ? `${prefix} ${stderr}` : prefix
- }
- /**
- * Thin pipe from a child process stream into TaskOutput.
- * Used in pipe mode (hooks) for stdout and stderr.
- * In file mode (bash commands), both fds go to the output file —
- * the child process streams are null and no wrappers are created.
- */
- class StreamWrapper {
- #stream: Readable | null
- #isCleanedUp = false
- #taskOutput: TaskOutput | null
- #isStderr: boolean
- #onData = this.#dataHandler.bind(this)
- constructor(stream: Readable, taskOutput: TaskOutput, isStderr: boolean) {
- this.#stream = stream
- this.#taskOutput = taskOutput
- this.#isStderr = isStderr
- // Emit strings instead of Buffers - avoids repeated .toString() calls
- stream.setEncoding('utf-8')
- stream.on('data', this.#onData)
- }
- #dataHandler(data: Buffer | string): void {
- const str = typeof data === 'string' ? data : data.toString()
- if (this.#isStderr) {
- this.#taskOutput!.writeStderr(str)
- } else {
- this.#taskOutput!.writeStdout(str)
- }
- }
- cleanup(): void {
- if (this.#isCleanedUp) {
- return
- }
- this.#isCleanedUp = true
- this.#stream!.removeListener('data', this.#onData)
- // Release references so the stream, its StringDecoder, and
- // the TaskOutput can be GC'd independently of this wrapper.
- this.#stream = null
- this.#taskOutput = null
- this.#onData = () => {}
- }
- }
- /**
- * Implementation of ShellCommand that wraps a child process.
- *
- * For bash commands: both stdout and stderr go to a file fd via
- * stdio[1] and stdio[2] — no JS involvement. Progress is extracted
- * by polling the file tail.
- * For hooks: pipe mode with StreamWrappers for real-time detection.
- */
- class ShellCommandImpl implements ShellCommand {
- #status: 'running' | 'backgrounded' | 'completed' | 'killed' = 'running'
- #backgroundTaskId: string | undefined
- #stdoutWrapper: StreamWrapper | null
- #stderrWrapper: StreamWrapper | null
- #childProcess: ChildProcess
- #timeoutId: NodeJS.Timeout | null = null
- #sizeWatchdog: NodeJS.Timeout | null = null
- #killedForSize = false
- #maxOutputBytes: number
- #abortSignal: AbortSignal
- #onTimeoutCallback:
- | ((backgroundFn: (taskId: string) => boolean) => void)
- | undefined
- #timeout: number
- #shouldAutoBackground: boolean
- #resultResolver: ((result: ExecResult) => void) | null = null
- #exitCodeResolver: ((code: number) => void) | null = null
- #boundAbortHandler: (() => void) | null = null
- readonly taskOutput: TaskOutput
- static #handleTimeout(self: ShellCommandImpl): void {
- if (self.#shouldAutoBackground && self.#onTimeoutCallback) {
- self.#onTimeoutCallback(self.background.bind(self))
- } else {
- self.#doKill(SIGTERM)
- }
- }
- readonly result: Promise<ExecResult>
- readonly onTimeout?: (
- callback: (backgroundFn: (taskId: string) => boolean) => void,
- ) => void
- constructor(
- childProcess: ChildProcess,
- abortSignal: AbortSignal,
- timeout: number,
- taskOutput: TaskOutput,
- shouldAutoBackground = false,
- maxOutputBytes = MAX_TASK_OUTPUT_BYTES,
- ) {
- this.#childProcess = childProcess
- this.#abortSignal = abortSignal
- this.#timeout = timeout
- this.#shouldAutoBackground = shouldAutoBackground
- this.#maxOutputBytes = maxOutputBytes
- this.taskOutput = taskOutput
- // In file mode (bash commands), both stdout and stderr go to the
- // output file fd — childProcess.stdout/.stderr are both null.
- // In pipe mode (hooks), wrap streams to funnel data into TaskOutput.
- this.#stderrWrapper = childProcess.stderr
- ? new StreamWrapper(childProcess.stderr, taskOutput, true)
- : null
- this.#stdoutWrapper = childProcess.stdout
- ? new StreamWrapper(childProcess.stdout, taskOutput, false)
- : null
- if (shouldAutoBackground) {
- this.onTimeout = (callback): void => {
- this.#onTimeoutCallback = callback
- }
- }
- this.result = this.#createResultPromise()
- }
- get status(): 'running' | 'backgrounded' | 'completed' | 'killed' {
- return this.#status
- }
- #abortHandler(): void {
- // On 'interrupt' (user submitted a new message), don't kill — let the
- // caller background the process so the model can see partial output.
- if (this.#abortSignal.reason === 'interrupt') {
- return
- }
- this.kill()
- }
- #exitHandler(code: number | null, signal: NodeJS.Signals | null): void {
- const exitCode =
- code !== null && code !== undefined
- ? code
- : signal === 'SIGTERM'
- ? 144
- : 1
- this.#resolveExitCode(exitCode)
- }
- #errorHandler(): void {
- this.#resolveExitCode(1)
- }
- #resolveExitCode(code: number): void {
- if (this.#exitCodeResolver) {
- this.#exitCodeResolver(code)
- this.#exitCodeResolver = null
- }
- }
- // Note: exit/error listeners are NOT removed here — they're needed for
- // the result promise to resolve. They clean up when the child process exits.
- #cleanupListeners(): void {
- this.#clearSizeWatchdog()
- const timeoutId = this.#timeoutId
- if (timeoutId) {
- clearTimeout(timeoutId)
- this.#timeoutId = null
- }
- const boundAbortHandler = this.#boundAbortHandler
- if (boundAbortHandler) {
- this.#abortSignal.removeEventListener('abort', boundAbortHandler)
- this.#boundAbortHandler = null
- }
- }
- #clearSizeWatchdog(): void {
- if (this.#sizeWatchdog) {
- clearInterval(this.#sizeWatchdog)
- this.#sizeWatchdog = null
- }
- }
- #startSizeWatchdog(): void {
- this.#sizeWatchdog = setInterval(() => {
- void stat(this.taskOutput.path).then(
- s => {
- // Bail if the watchdog was cleared while this stat was in flight
- // (process exited on its own) — otherwise we'd mislabel stderr.
- if (
- s.size > this.#maxOutputBytes &&
- this.#status === 'backgrounded' &&
- this.#sizeWatchdog !== null
- ) {
- this.#killedForSize = true
- this.#clearSizeWatchdog()
- this.#doKill(SIGKILL)
- }
- },
- () => {
- // ENOENT before first write, or unlinked mid-run — skip this tick
- },
- )
- }, SIZE_WATCHDOG_INTERVAL_MS)
- this.#sizeWatchdog.unref()
- }
- #createResultPromise(): Promise<ExecResult> {
- this.#boundAbortHandler = this.#abortHandler.bind(this)
- this.#abortSignal.addEventListener('abort', this.#boundAbortHandler, {
- once: true,
- })
- // Use 'exit' not 'close': 'close' waits for stdio to close, which includes
- // grandchild processes that inherit file descriptors (e.g. `sleep 30 &`).
- // 'exit' fires when the shell itself exits, returning control immediately.
- this.#childProcess.once('exit', this.#exitHandler.bind(this))
- this.#childProcess.once('error', this.#errorHandler.bind(this))
- this.#timeoutId = setTimeout(
- ShellCommandImpl.#handleTimeout,
- this.#timeout,
- this,
- ) as NodeJS.Timeout
- const exitPromise = new Promise<number>(resolve => {
- this.#exitCodeResolver = resolve
- })
- return new Promise<ExecResult>(resolve => {
- this.#resultResolver = resolve
- void exitPromise.then(this.#handleExit.bind(this))
- })
- }
- async #handleExit(code: number): Promise<void> {
- this.#cleanupListeners()
- if (this.#status === 'running' || this.#status === 'backgrounded') {
- this.#status = 'completed'
- }
- const stdout = await this.taskOutput.getStdout()
- const result: ExecResult = {
- code,
- stdout,
- stderr: this.taskOutput.getStderr(),
- interrupted: code === SIGKILL,
- backgroundTaskId: this.#backgroundTaskId,
- }
- if (this.taskOutput.stdoutToFile && !this.#backgroundTaskId) {
- if (this.taskOutput.outputFileRedundant) {
- // Small file — full content is in result.stdout, delete the file
- void this.taskOutput.deleteOutputFile()
- } else {
- // Large file — tell the caller where the full output lives
- result.outputFilePath = this.taskOutput.path
- result.outputFileSize = this.taskOutput.outputFileSize
- result.outputTaskId = this.taskOutput.taskId
- }
- }
- if (this.#killedForSize) {
- result.stderr = prependStderr(
- `Background command killed: output file exceeded ${MAX_TASK_OUTPUT_BYTES_DISPLAY}`,
- result.stderr,
- )
- } else if (code === SIGTERM) {
- result.stderr = prependStderr(
- `Command timed out after ${formatDuration(this.#timeout)}`,
- result.stderr,
- )
- }
- const resultResolver = this.#resultResolver
- if (resultResolver) {
- this.#resultResolver = null
- resultResolver(result)
- }
- }
- #doKill(code?: number): void {
- this.#status = 'killed'
- if (this.#childProcess.pid) {
- treeKill(this.#childProcess.pid, 'SIGKILL')
- }
- this.#resolveExitCode(code ?? SIGKILL)
- }
- kill(): void {
- this.#doKill()
- }
- background(taskId: string): boolean {
- if (this.#status === 'running') {
- this.#backgroundTaskId = taskId
- this.#status = 'backgrounded'
- this.#cleanupListeners()
- if (this.taskOutput.stdoutToFile) {
- // File mode: child writes directly to the fd with no JS involvement.
- // The foreground timeout is gone, so watch file size to prevent
- // a stuck append loop from filling the disk (768GB incident).
- this.#startSizeWatchdog()
- } else {
- // Pipe mode: spill the in-memory buffer so readers can find it on disk.
- this.taskOutput.spillToDisk()
- }
- return true
- }
- return false
- }
- cleanup(): void {
- this.#stdoutWrapper?.cleanup()
- this.#stderrWrapper?.cleanup()
- this.taskOutput.clear()
- // Must run before nulling #abortSignal — #cleanupListeners() calls
- // removeEventListener on it. Without this, a kill()+cleanup() sequence
- // crashes: kill() queues #handleExit as a microtask, cleanup() nulls
- // #abortSignal, then #handleExit runs #cleanupListeners() on the null ref.
- this.#cleanupListeners()
- // Release references to allow GC of ChildProcess internals and AbortController chain
- this.#childProcess = null!
- this.#abortSignal = null!
- this.#onTimeoutCallback = undefined
- }
- }
- /**
- * Wraps a child process to enable flexible handling of shell command execution.
- */
- export function wrapSpawn(
- childProcess: ChildProcess,
- abortSignal: AbortSignal,
- timeout: number,
- taskOutput: TaskOutput,
- shouldAutoBackground = false,
- maxOutputBytes = MAX_TASK_OUTPUT_BYTES,
- ): ShellCommand {
- return new ShellCommandImpl(
- childProcess,
- abortSignal,
- timeout,
- taskOutput,
- shouldAutoBackground,
- maxOutputBytes,
- )
- }
- /**
- * Static ShellCommand implementation for commands that were aborted before execution.
- */
- class AbortedShellCommand implements ShellCommand {
- readonly status = 'killed' as const
- readonly result: Promise<ExecResult>
- readonly taskOutput: TaskOutput
- constructor(opts?: {
- backgroundTaskId?: string
- stderr?: string
- code?: number
- }) {
- this.taskOutput = new TaskOutput(generateTaskId('local_bash'), null)
- this.result = Promise.resolve({
- code: opts?.code ?? 145,
- stdout: '',
- stderr: opts?.stderr ?? 'Command aborted before execution',
- interrupted: true,
- backgroundTaskId: opts?.backgroundTaskId,
- })
- }
- background(): boolean {
- return false
- }
- kill(): void {}
- cleanup(): void {}
- }
- export function createAbortedCommand(
- backgroundTaskId?: string,
- opts?: { stderr?: string; code?: number },
- ): ShellCommand {
- return new AbortedShellCommand({
- backgroundTaskId,
- ...opts,
- })
- }
- export function createFailedCommand(preSpawnError: string): ShellCommand {
- const taskOutput = new TaskOutput(generateTaskId('local_bash'), null)
- return {
- status: 'completed' as const,
- result: Promise.resolve({
- code: 1,
- stdout: '',
- stderr: preSpawnError,
- interrupted: false,
- preSpawnError,
- }),
- taskOutput,
- background(): boolean {
- return false
- },
- kill(): void {},
- cleanup(): void {},
- }
- }
|