sessionRunner.ts 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550
  1. import { type ChildProcess, spawn } from 'child_process'
  2. import { createWriteStream, type WriteStream } from 'fs'
  3. import { tmpdir } from 'os'
  4. import { dirname, join } from 'path'
  5. import { createInterface } from 'readline'
  6. import { jsonParse, jsonStringify } from '../utils/slowOperations.js'
  7. import { debugTruncate } from './debugUtils.js'
  8. import type {
  9. SessionActivity,
  10. SessionDoneStatus,
  11. SessionHandle,
  12. SessionSpawner,
  13. SessionSpawnOpts,
  14. } from './types.js'
  15. const MAX_ACTIVITIES = 10
  16. const MAX_STDERR_LINES = 10
  17. /**
  18. * Sanitize a session ID for use in file names.
  19. * Strips any characters that could cause path traversal (e.g. `../`, `/`)
  20. * or other filesystem issues, replacing them with underscores.
  21. */
  22. export function safeFilenameId(id: string): string {
  23. return id.replace(/[^a-zA-Z0-9_-]/g, '_')
  24. }
  25. /**
  26. * A control_request emitted by the child CLI when it needs permission to
  27. * execute a **specific** tool invocation (not a general capability check).
  28. * The bridge forwards this to the server so the user can approve/deny.
  29. */
  30. export type PermissionRequest = {
  31. type: 'control_request'
  32. request_id: string
  33. request: {
  34. /** Per-invocation permission check — "may I run this tool with these inputs?" */
  35. subtype: 'can_use_tool'
  36. tool_name: string
  37. input: Record<string, unknown>
  38. tool_use_id: string
  39. }
  40. }
  41. type SessionSpawnerDeps = {
  42. execPath: string
  43. /**
  44. * Arguments that must precede the CLI flags when spawning. Empty for
  45. * compiled binaries (where execPath is the claude binary itself); contains
  46. * the script path (process.argv[1]) for npm installs where execPath is the
  47. * node runtime. Without this, node sees --sdk-url as a node option and
  48. * exits with "bad option: --sdk-url" (see anthropics/claude-code#28334).
  49. */
  50. scriptArgs: string[]
  51. env: NodeJS.ProcessEnv
  52. verbose: boolean
  53. sandbox: boolean
  54. debugFile?: string
  55. permissionMode?: string
  56. onDebug: (msg: string) => void
  57. onActivity?: (sessionId: string, activity: SessionActivity) => void
  58. onPermissionRequest?: (
  59. sessionId: string,
  60. request: PermissionRequest,
  61. accessToken: string,
  62. ) => void
  63. }
  64. /** Map tool names to human-readable verbs for the status display. */
  65. const TOOL_VERBS: Record<string, string> = {
  66. Read: 'Reading',
  67. Write: 'Writing',
  68. Edit: 'Editing',
  69. MultiEdit: 'Editing',
  70. Bash: 'Running',
  71. Glob: 'Searching',
  72. Grep: 'Searching',
  73. WebFetch: 'Fetching',
  74. WebSearch: 'Searching',
  75. Task: 'Running task',
  76. FileReadTool: 'Reading',
  77. FileWriteTool: 'Writing',
  78. FileEditTool: 'Editing',
  79. GlobTool: 'Searching',
  80. GrepTool: 'Searching',
  81. BashTool: 'Running',
  82. NotebookEditTool: 'Editing notebook',
  83. LSP: 'LSP',
  84. }
  85. function toolSummary(name: string, input: Record<string, unknown>): string {
  86. const verb = TOOL_VERBS[name] ?? name
  87. const target =
  88. (input.file_path as string) ??
  89. (input.filePath as string) ??
  90. (input.pattern as string) ??
  91. (input.command as string | undefined)?.slice(0, 60) ??
  92. (input.url as string) ??
  93. (input.query as string) ??
  94. ''
  95. if (target) {
  96. return `${verb} ${target}`
  97. }
  98. return verb
  99. }
  100. function extractActivities(
  101. line: string,
  102. sessionId: string,
  103. onDebug: (msg: string) => void,
  104. ): SessionActivity[] {
  105. let parsed: unknown
  106. try {
  107. parsed = jsonParse(line)
  108. } catch {
  109. return []
  110. }
  111. if (!parsed || typeof parsed !== 'object') {
  112. return []
  113. }
  114. const msg = parsed as Record<string, unknown>
  115. const activities: SessionActivity[] = []
  116. const now = Date.now()
  117. switch (msg.type) {
  118. case 'assistant': {
  119. const message = msg.message as Record<string, unknown> | undefined
  120. if (!message) break
  121. const content = message.content
  122. if (!Array.isArray(content)) break
  123. for (const block of content) {
  124. if (!block || typeof block !== 'object') continue
  125. const b = block as Record<string, unknown>
  126. if (b.type === 'tool_use') {
  127. const name = (b.name as string) ?? 'Tool'
  128. const input = (b.input as Record<string, unknown>) ?? {}
  129. const summary = toolSummary(name, input)
  130. activities.push({
  131. type: 'tool_start',
  132. summary,
  133. timestamp: now,
  134. })
  135. onDebug(
  136. `[bridge:activity] sessionId=${sessionId} tool_use name=${name} ${inputPreview(input)}`,
  137. )
  138. } else if (b.type === 'text') {
  139. const text = (b.text as string) ?? ''
  140. if (text.length > 0) {
  141. activities.push({
  142. type: 'text',
  143. summary: text.slice(0, 80),
  144. timestamp: now,
  145. })
  146. onDebug(
  147. `[bridge:activity] sessionId=${sessionId} text "${text.slice(0, 100)}"`,
  148. )
  149. }
  150. }
  151. }
  152. break
  153. }
  154. case 'result': {
  155. const subtype = msg.subtype as string | undefined
  156. if (subtype === 'success') {
  157. activities.push({
  158. type: 'result',
  159. summary: 'Session completed',
  160. timestamp: now,
  161. })
  162. onDebug(
  163. `[bridge:activity] sessionId=${sessionId} result subtype=success`,
  164. )
  165. } else if (subtype) {
  166. const errors = msg.errors as string[] | undefined
  167. const errorSummary = errors?.[0] ?? `Error: ${subtype}`
  168. activities.push({
  169. type: 'error',
  170. summary: errorSummary,
  171. timestamp: now,
  172. })
  173. onDebug(
  174. `[bridge:activity] sessionId=${sessionId} result subtype=${subtype} error="${errorSummary}"`,
  175. )
  176. } else {
  177. onDebug(
  178. `[bridge:activity] sessionId=${sessionId} result subtype=undefined`,
  179. )
  180. }
  181. break
  182. }
  183. default:
  184. break
  185. }
  186. return activities
  187. }
  188. /**
  189. * Extract plain text from a replayed SDKUserMessage NDJSON line. Returns the
  190. * trimmed text if this looks like a real human-authored message, otherwise
  191. * undefined so the caller keeps waiting for the first real message.
  192. */
  193. function extractUserMessageText(
  194. msg: Record<string, unknown>,
  195. ): string | undefined {
  196. // Skip tool-result user messages (wrapped subagent results) and synthetic
  197. // caveat messages — neither is human-authored.
  198. if (msg.parent_tool_use_id != null || msg.isSynthetic || msg.isReplay)
  199. return undefined
  200. const message = msg.message as Record<string, unknown> | undefined
  201. const content = message?.content
  202. let text: string | undefined
  203. if (typeof content === 'string') {
  204. text = content
  205. } else if (Array.isArray(content)) {
  206. for (const block of content) {
  207. if (
  208. block &&
  209. typeof block === 'object' &&
  210. (block as Record<string, unknown>).type === 'text'
  211. ) {
  212. text = (block as Record<string, unknown>).text as string | undefined
  213. break
  214. }
  215. }
  216. }
  217. text = text?.trim()
  218. return text ? text : undefined
  219. }
  220. /** Build a short preview of tool input for debug logging. */
  221. function inputPreview(input: Record<string, unknown>): string {
  222. const parts: string[] = []
  223. for (const [key, val] of Object.entries(input)) {
  224. if (typeof val === 'string') {
  225. parts.push(`${key}="${val.slice(0, 100)}"`)
  226. }
  227. if (parts.length >= 3) break
  228. }
  229. return parts.join(' ')
  230. }
  231. export function createSessionSpawner(deps: SessionSpawnerDeps): SessionSpawner {
  232. return {
  233. spawn(opts: SessionSpawnOpts, dir: string): SessionHandle {
  234. // Debug file resolution:
  235. // 1. If deps.debugFile is provided, use it with session ID suffix for uniqueness
  236. // 2. If verbose or ant build, auto-generate a temp file path
  237. // 3. Otherwise, no debug file
  238. const safeId = safeFilenameId(opts.sessionId)
  239. let debugFile: string | undefined
  240. if (deps.debugFile) {
  241. const ext = deps.debugFile.lastIndexOf('.')
  242. if (ext > 0) {
  243. debugFile = `${deps.debugFile.slice(0, ext)}-${safeId}${deps.debugFile.slice(ext)}`
  244. } else {
  245. debugFile = `${deps.debugFile}-${safeId}`
  246. }
  247. } else if (deps.verbose || process.env.USER_TYPE === 'ant') {
  248. debugFile = join(tmpdir(), 'claude', `bridge-session-${safeId}.log`)
  249. }
  250. // Transcript file: write raw NDJSON lines for post-hoc analysis.
  251. // Placed alongside the debug file when one is configured.
  252. let transcriptStream: WriteStream | null = null
  253. let transcriptPath: string | undefined
  254. if (deps.debugFile) {
  255. transcriptPath = join(
  256. dirname(deps.debugFile),
  257. `bridge-transcript-${safeId}.jsonl`,
  258. )
  259. transcriptStream = createWriteStream(transcriptPath, { flags: 'a' })
  260. transcriptStream.on('error', err => {
  261. deps.onDebug(
  262. `[bridge:session] Transcript write error: ${err.message}`,
  263. )
  264. transcriptStream = null
  265. })
  266. deps.onDebug(`[bridge:session] Transcript log: ${transcriptPath}`)
  267. }
  268. const args = [
  269. ...deps.scriptArgs,
  270. '--print',
  271. '--sdk-url',
  272. opts.sdkUrl,
  273. '--session-id',
  274. opts.sessionId,
  275. '--input-format',
  276. 'stream-json',
  277. '--output-format',
  278. 'stream-json',
  279. '--replay-user-messages',
  280. ...(deps.verbose ? ['--verbose'] : []),
  281. ...(debugFile ? ['--debug-file', debugFile] : []),
  282. ...(deps.permissionMode
  283. ? ['--permission-mode', deps.permissionMode]
  284. : []),
  285. ]
  286. const env: NodeJS.ProcessEnv = {
  287. ...deps.env,
  288. // Strip the bridge's OAuth token so the child CC process uses
  289. // the session access token for inference instead.
  290. CLAUDE_CODE_OAUTH_TOKEN: undefined,
  291. CLAUDE_CODE_ENVIRONMENT_KIND: 'bridge',
  292. ...(deps.sandbox && { CLAUDE_CODE_FORCE_SANDBOX: '1' }),
  293. CLAUDE_CODE_SESSION_ACCESS_TOKEN: opts.accessToken,
  294. // v1: HybridTransport (WS reads + POST writes) to Session-Ingress.
  295. // Harmless in v2 mode — transportUtils checks CLAUDE_CODE_USE_CCR_V2 first.
  296. CLAUDE_CODE_POST_FOR_SESSION_INGRESS_V2: '1',
  297. // v2: SSETransport + CCRClient to CCR's /v1/code/sessions/* endpoints.
  298. // Same env vars environment-manager sets in the container path.
  299. ...(opts.useCcrV2 && {
  300. CLAUDE_CODE_USE_CCR_V2: '1',
  301. CLAUDE_CODE_WORKER_EPOCH: String(opts.workerEpoch),
  302. }),
  303. }
  304. deps.onDebug(
  305. `[bridge:session] Spawning sessionId=${opts.sessionId} sdkUrl=${opts.sdkUrl} accessToken=${opts.accessToken ? 'present' : 'MISSING'}`,
  306. )
  307. deps.onDebug(`[bridge:session] Child args: ${args.join(' ')}`)
  308. if (debugFile) {
  309. deps.onDebug(`[bridge:session] Debug log: ${debugFile}`)
  310. }
  311. // Pipe all three streams: stdin for control, stdout for NDJSON parsing,
  312. // stderr for error capture and diagnostics.
  313. const child: ChildProcess = spawn(deps.execPath, args, {
  314. cwd: dir,
  315. stdio: ['pipe', 'pipe', 'pipe'],
  316. env,
  317. windowsHide: true,
  318. })
  319. deps.onDebug(
  320. `[bridge:session] sessionId=${opts.sessionId} pid=${child.pid}`,
  321. )
  322. const activities: SessionActivity[] = []
  323. let currentActivity: SessionActivity | null = null
  324. const lastStderr: string[] = []
  325. let sigkillSent = false
  326. let firstUserMessageSeen = false
  327. // Buffer stderr for error diagnostics
  328. if (child.stderr) {
  329. const stderrRl = createInterface({ input: child.stderr })
  330. stderrRl.on('line', line => {
  331. // Forward stderr to bridge's stderr in verbose mode
  332. if (deps.verbose) {
  333. process.stderr.write(line + '\n')
  334. }
  335. // Ring buffer of last N lines
  336. if (lastStderr.length >= MAX_STDERR_LINES) {
  337. lastStderr.shift()
  338. }
  339. lastStderr.push(line)
  340. })
  341. }
  342. // Parse NDJSON from child stdout
  343. if (child.stdout) {
  344. const rl = createInterface({ input: child.stdout })
  345. rl.on('line', line => {
  346. // Write raw NDJSON to transcript file
  347. if (transcriptStream) {
  348. transcriptStream.write(line + '\n')
  349. }
  350. // Log all messages flowing from the child CLI to the bridge
  351. deps.onDebug(
  352. `[bridge:ws] sessionId=${opts.sessionId} <<< ${debugTruncate(line)}`,
  353. )
  354. // In verbose mode, forward raw output to stderr
  355. if (deps.verbose) {
  356. process.stderr.write(line + '\n')
  357. }
  358. const extracted = extractActivities(
  359. line,
  360. opts.sessionId,
  361. deps.onDebug,
  362. )
  363. for (const activity of extracted) {
  364. // Maintain ring buffer
  365. if (activities.length >= MAX_ACTIVITIES) {
  366. activities.shift()
  367. }
  368. activities.push(activity)
  369. currentActivity = activity
  370. deps.onActivity?.(opts.sessionId, activity)
  371. }
  372. // Detect control_request and replayed user messages.
  373. // extractActivities parses the same line but swallows parse errors
  374. // and skips 'user' type — re-parse here is cheap (NDJSON lines are
  375. // small) and keeps each path self-contained.
  376. {
  377. let parsed: unknown
  378. try {
  379. parsed = jsonParse(line)
  380. } catch {
  381. // Non-JSON line, skip detection
  382. }
  383. if (parsed && typeof parsed === 'object') {
  384. const msg = parsed as Record<string, unknown>
  385. if (msg.type === 'control_request') {
  386. const request = msg.request as
  387. | Record<string, unknown>
  388. | undefined
  389. if (
  390. request?.subtype === 'can_use_tool' &&
  391. deps.onPermissionRequest
  392. ) {
  393. deps.onPermissionRequest(
  394. opts.sessionId,
  395. parsed as PermissionRequest,
  396. opts.accessToken,
  397. )
  398. }
  399. // interrupt is turn-level; the child handles it internally (print.ts)
  400. } else if (
  401. msg.type === 'user' &&
  402. !firstUserMessageSeen &&
  403. opts.onFirstUserMessage
  404. ) {
  405. const text = extractUserMessageText(msg)
  406. if (text) {
  407. firstUserMessageSeen = true
  408. opts.onFirstUserMessage(text)
  409. }
  410. }
  411. }
  412. }
  413. })
  414. }
  415. const done = new Promise<SessionDoneStatus>(resolve => {
  416. child.on('close', (code, signal) => {
  417. // Close transcript stream on exit
  418. if (transcriptStream) {
  419. transcriptStream.end()
  420. transcriptStream = null
  421. }
  422. if (signal === 'SIGTERM' || signal === 'SIGINT') {
  423. deps.onDebug(
  424. `[bridge:session] sessionId=${opts.sessionId} interrupted signal=${signal} pid=${child.pid}`,
  425. )
  426. resolve('interrupted')
  427. } else if (code === 0) {
  428. deps.onDebug(
  429. `[bridge:session] sessionId=${opts.sessionId} completed exit_code=0 pid=${child.pid}`,
  430. )
  431. resolve('completed')
  432. } else {
  433. deps.onDebug(
  434. `[bridge:session] sessionId=${opts.sessionId} failed exit_code=${code} pid=${child.pid}`,
  435. )
  436. resolve('failed')
  437. }
  438. })
  439. child.on('error', err => {
  440. deps.onDebug(
  441. `[bridge:session] sessionId=${opts.sessionId} spawn error: ${err.message}`,
  442. )
  443. resolve('failed')
  444. })
  445. })
  446. const handle: SessionHandle = {
  447. sessionId: opts.sessionId,
  448. done,
  449. activities,
  450. accessToken: opts.accessToken,
  451. lastStderr,
  452. get currentActivity(): SessionActivity | null {
  453. return currentActivity
  454. },
  455. kill(): void {
  456. if (!child.killed) {
  457. deps.onDebug(
  458. `[bridge:session] Sending SIGTERM to sessionId=${opts.sessionId} pid=${child.pid}`,
  459. )
  460. // On Windows, child.kill('SIGTERM') throws; use default signal.
  461. if (process.platform === 'win32') {
  462. child.kill()
  463. } else {
  464. child.kill('SIGTERM')
  465. }
  466. }
  467. },
  468. forceKill(): void {
  469. // Use separate flag because child.killed is set when kill() is called,
  470. // not when the process exits. We need to send SIGKILL even after SIGTERM.
  471. if (!sigkillSent && child.pid) {
  472. sigkillSent = true
  473. deps.onDebug(
  474. `[bridge:session] Sending SIGKILL to sessionId=${opts.sessionId} pid=${child.pid}`,
  475. )
  476. if (process.platform === 'win32') {
  477. child.kill()
  478. } else {
  479. child.kill('SIGKILL')
  480. }
  481. }
  482. },
  483. writeStdin(data: string): void {
  484. if (child.stdin && !child.stdin.destroyed) {
  485. deps.onDebug(
  486. `[bridge:ws] sessionId=${opts.sessionId} >>> ${debugTruncate(data)}`,
  487. )
  488. child.stdin.write(data)
  489. }
  490. },
  491. updateAccessToken(token: string): void {
  492. handle.accessToken = token
  493. // Send the fresh token to the child process via stdin. The child's
  494. // StructuredIO handles update_environment_variables messages by
  495. // setting process.env directly, so getSessionIngressAuthToken()
  496. // picks up the new token on the next refreshHeaders call.
  497. handle.writeStdin(
  498. jsonStringify({
  499. type: 'update_environment_variables',
  500. variables: { CLAUDE_CODE_SESSION_ACCESS_TOKEN: token },
  501. }) + '\n',
  502. )
  503. deps.onDebug(
  504. `[bridge:session] Sent token refresh via stdin for sessionId=${opts.sessionId}`,
  505. )
  506. },
  507. }
  508. return handle
  509. },
  510. }
  511. }
  512. export { extractActivities as _extractActivitiesForTesting }