ccrClient.ts 33 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998
  1. import { randomUUID } from 'crypto'
  2. import type {
  3. SDKPartialAssistantMessage,
  4. StdoutMessage,
  5. } from 'src/entrypoints/sdk/controlTypes.js'
  6. import { decodeJwtExpiry } from '../../bridge/jwtUtils.js'
  7. import { logForDebugging } from '../../utils/debug.js'
  8. import { logForDiagnosticsNoPII } from '../../utils/diagLogs.js'
  9. import { errorMessage, getErrnoCode } from '../../utils/errors.js'
  10. import { createAxiosInstance } from '../../utils/proxy.js'
  11. import {
  12. registerSessionActivityCallback,
  13. unregisterSessionActivityCallback,
  14. } from '../../utils/sessionActivity.js'
  15. import {
  16. getSessionIngressAuthHeaders,
  17. getSessionIngressAuthToken,
  18. } from '../../utils/sessionIngressAuth.js'
  19. import type {
  20. RequiresActionDetails,
  21. SessionState,
  22. } from '../../utils/sessionState.js'
  23. import { sleep } from '../../utils/sleep.js'
  24. import { getClaudeCodeUserAgent } from '../../utils/userAgent.js'
  25. import {
  26. RetryableError,
  27. SerialBatchEventUploader,
  28. } from './SerialBatchEventUploader.js'
  29. import type { SSETransport, StreamClientEvent } from './SSETransport.js'
  30. import { WorkerStateUploader } from './WorkerStateUploader.js'
  31. /** Default interval between heartbeat events (20s; server TTL is 60s). */
  32. const DEFAULT_HEARTBEAT_INTERVAL_MS = 20_000
  33. /**
  34. * stream_event messages accumulate in a delay buffer for up to this many ms
  35. * before enqueue. Mirrors HybridTransport's batching window. text_delta
  36. * events for the same content block accumulate into a single full-so-far
  37. * snapshot per flush — each emitted event is self-contained so a client
  38. * connecting mid-stream sees complete text, not a fragment.
  39. */
  40. const STREAM_EVENT_FLUSH_INTERVAL_MS = 100
  41. /** Hoisted axios validateStatus callback to avoid per-request closure allocation. */
  42. function alwaysValidStatus(): boolean {
  43. return true
  44. }
  45. export type CCRInitFailReason =
  46. | 'no_auth_headers'
  47. | 'missing_epoch'
  48. | 'worker_register_failed'
  49. /** Thrown by initialize(); carries a typed reason for the diag classifier. */
  50. export class CCRInitError extends Error {
  51. constructor(readonly reason: CCRInitFailReason) {
  52. super(`CCRClient init failed: ${reason}`)
  53. }
  54. }
  55. /**
  56. * Consecutive 401/403 with a VALID-LOOKING token before giving up. An
  57. * expired JWT short-circuits this (exits immediately — deterministic,
  58. * retry is futile). This threshold is for the uncertain case: token's
  59. * exp is in the future but server says 401 (userauth down, KMS hiccup,
  60. * clock skew). 10 × 20s heartbeat ≈ 200s to ride it out.
  61. */
  62. const MAX_CONSECUTIVE_AUTH_FAILURES = 10
  63. type EventPayload = {
  64. uuid: string
  65. type: string
  66. [key: string]: unknown
  67. }
  68. type ClientEvent = {
  69. payload: EventPayload
  70. ephemeral?: boolean
  71. }
  72. /**
  73. * Structural subset of a stream_event carrying a text_delta. Not a narrowing
  74. * of SDKPartialAssistantMessage — RawMessageStreamEvent's delta is a union and
  75. * narrowing through two levels defeats the discriminant.
  76. */
  77. type CoalescedStreamEvent = {
  78. type: 'stream_event'
  79. uuid: string
  80. session_id: string
  81. parent_tool_use_id: string | null
  82. event: {
  83. type: 'content_block_delta'
  84. index: number
  85. delta: { type: 'text_delta'; text: string }
  86. }
  87. }
  88. /**
  89. * Accumulator state for text_delta coalescing. Keyed by API message ID so
  90. * lifetime is tied to the assistant message — cleared when the complete
  91. * SDKAssistantMessage arrives (writeEvent), which is reliable even when
  92. * abort/error paths skip content_block_stop/message_stop delivery.
  93. */
  94. export type StreamAccumulatorState = {
  95. /** API message ID (msg_...) → blocks[blockIndex] → chunk array. */
  96. byMessage: Map<string, string[][]>
  97. /**
  98. * {session_id}:{parent_tool_use_id} → active message ID.
  99. * content_block_delta events don't carry the message ID (only
  100. * message_start does), so we track which message is currently streaming
  101. * for each scope. At most one message streams per scope at a time.
  102. */
  103. scopeToMessage: Map<string, string>
  104. }
  105. export function createStreamAccumulator(): StreamAccumulatorState {
  106. return { byMessage: new Map(), scopeToMessage: new Map() }
  107. }
  108. function scopeKey(m: {
  109. session_id: string
  110. parent_tool_use_id: string | null
  111. }): string {
  112. return `${m.session_id}:${m.parent_tool_use_id ?? ''}`
  113. }
  114. /**
  115. * Accumulate text_delta stream_events into full-so-far snapshots per content
  116. * block. Each flush emits ONE event per touched block containing the FULL
  117. * accumulated text from the start of the block — a client connecting
  118. * mid-stream receives a self-contained snapshot, not a fragment.
  119. *
  120. * Non-text-delta events pass through unchanged. message_start records the
  121. * active message ID for the scope; content_block_delta appends chunks;
  122. * the snapshot event reuses the first text_delta UUID seen for that block in
  123. * this flush so server-side idempotency remains stable across retries.
  124. *
  125. * Cleanup happens in writeEvent when the complete assistant message arrives
  126. * (reliable), not here on stop events (abort/error paths skip those).
  127. */
  128. export function accumulateStreamEvents(
  129. buffer: SDKPartialAssistantMessage[],
  130. state: StreamAccumulatorState,
  131. ): EventPayload[] {
  132. const out: EventPayload[] = []
  133. // chunks[] → snapshot already in `out` this flush. Keyed by the chunks
  134. // array reference (stable per {messageId, index}) so subsequent deltas
  135. // rewrite the same entry instead of emitting one event per delta.
  136. const touched = new Map<string[], CoalescedStreamEvent>()
  137. for (const msg of buffer) {
  138. switch (msg.event.type) {
  139. case 'message_start': {
  140. const id = msg.event.message.id
  141. const prevId = state.scopeToMessage.get(scopeKey(msg))
  142. if (prevId) state.byMessage.delete(prevId)
  143. state.scopeToMessage.set(scopeKey(msg), id)
  144. state.byMessage.set(id, [])
  145. out.push(msg)
  146. break
  147. }
  148. case 'content_block_delta': {
  149. if (msg.event.delta.type !== 'text_delta') {
  150. out.push(msg)
  151. break
  152. }
  153. const messageId = state.scopeToMessage.get(scopeKey(msg))
  154. const blocks = messageId ? state.byMessage.get(messageId) : undefined
  155. if (!blocks) {
  156. // Delta without a preceding message_start (reconnect mid-stream,
  157. // or message_start was in a prior buffer that got dropped). Pass
  158. // through raw — can't produce a full-so-far snapshot without the
  159. // prior chunks anyway.
  160. out.push(msg)
  161. break
  162. }
  163. const chunks = (blocks[msg.event.index] ??= [])
  164. chunks.push(msg.event.delta.text)
  165. const existing = touched.get(chunks)
  166. if (existing) {
  167. existing.event.delta.text = chunks.join('')
  168. break
  169. }
  170. const snapshot: CoalescedStreamEvent = {
  171. type: 'stream_event',
  172. uuid: msg.uuid,
  173. session_id: msg.session_id,
  174. parent_tool_use_id: msg.parent_tool_use_id,
  175. event: {
  176. type: 'content_block_delta',
  177. index: msg.event.index,
  178. delta: { type: 'text_delta', text: chunks.join('') },
  179. },
  180. }
  181. touched.set(chunks, snapshot)
  182. out.push(snapshot)
  183. break
  184. }
  185. default:
  186. out.push(msg)
  187. }
  188. }
  189. return out
  190. }
  191. /**
  192. * Clear accumulator entries for a completed assistant message. Called from
  193. * writeEvent when the SDKAssistantMessage arrives — the reliable end-of-stream
  194. * signal that fires even when abort/interrupt/error skip SSE stop events.
  195. */
  196. export function clearStreamAccumulatorForMessage(
  197. state: StreamAccumulatorState,
  198. assistant: {
  199. session_id: string
  200. parent_tool_use_id: string | null
  201. message: { id: string }
  202. },
  203. ): void {
  204. state.byMessage.delete(assistant.message.id)
  205. const scope = scopeKey(assistant)
  206. if (state.scopeToMessage.get(scope) === assistant.message.id) {
  207. state.scopeToMessage.delete(scope)
  208. }
  209. }
  210. type RequestResult = { ok: true } | { ok: false; retryAfterMs?: number }
  211. type WorkerEvent = {
  212. payload: EventPayload
  213. is_compaction?: boolean
  214. agent_id?: string
  215. }
  216. export type InternalEvent = {
  217. event_id: string
  218. event_type: string
  219. payload: Record<string, unknown>
  220. event_metadata?: Record<string, unknown> | null
  221. is_compaction: boolean
  222. created_at: string
  223. agent_id?: string
  224. }
  225. type ListInternalEventsResponse = {
  226. data: InternalEvent[]
  227. next_cursor?: string
  228. }
  229. type WorkerStateResponse = {
  230. worker?: {
  231. external_metadata?: Record<string, unknown>
  232. }
  233. }
  234. /**
  235. * Manages the worker lifecycle protocol with CCR v2:
  236. * - Epoch management: reads worker_epoch from CLAUDE_CODE_WORKER_EPOCH env var
  237. * - Runtime state reporting: PUT /sessions/{id}/worker
  238. * - Heartbeat: POST /sessions/{id}/worker/heartbeat for liveness detection
  239. *
  240. * All writes go through this.request().
  241. */
  242. export class CCRClient {
  243. private workerEpoch = 0
  244. private readonly heartbeatIntervalMs: number
  245. private readonly heartbeatJitterFraction: number
  246. private heartbeatTimer: NodeJS.Timeout | null = null
  247. private heartbeatInFlight = false
  248. private closed = false
  249. private consecutiveAuthFailures = 0
  250. private currentState: SessionState | null = null
  251. private readonly sessionBaseUrl: string
  252. private readonly sessionId: string
  253. private readonly http = createAxiosInstance({ keepAlive: true })
  254. // stream_event delay buffer — accumulates content deltas for up to
  255. // STREAM_EVENT_FLUSH_INTERVAL_MS before enqueueing (reduces POST count
  256. // and enables text_delta coalescing). Mirrors HybridTransport's pattern.
  257. private streamEventBuffer: SDKPartialAssistantMessage[] = []
  258. private streamEventTimer: ReturnType<typeof setTimeout> | null = null
  259. // Full-so-far text accumulator. Persists across flushes so each emitted
  260. // text_delta event carries the complete text from the start of the block —
  261. // mid-stream reconnects see a self-contained snapshot. Keyed by API message
  262. // ID; cleared in writeEvent when the complete assistant message arrives.
  263. private streamTextAccumulator = createStreamAccumulator()
  264. private readonly workerState: WorkerStateUploader
  265. private readonly eventUploader: SerialBatchEventUploader<ClientEvent>
  266. private readonly internalEventUploader: SerialBatchEventUploader<WorkerEvent>
  267. private readonly deliveryUploader: SerialBatchEventUploader<{
  268. eventId: string
  269. status: 'received' | 'processing' | 'processed'
  270. }>
  271. /**
  272. * Called when the server returns 409 (a newer worker epoch superseded ours).
  273. * Default: process.exit(1) — correct for spawn-mode children where the
  274. * parent bridge re-spawns. In-process callers (replBridge) MUST override
  275. * this to close gracefully instead; exit would kill the user's REPL.
  276. */
  277. private readonly onEpochMismatch: () => never
  278. /**
  279. * Auth header source. Defaults to the process-wide session-ingress token
  280. * (CLAUDE_CODE_SESSION_ACCESS_TOKEN env var). Callers managing multiple
  281. * concurrent sessions with distinct JWTs MUST inject this — the env-var
  282. * path is a process global and would stomp across sessions.
  283. */
  284. private readonly getAuthHeaders: () => Record<string, string>
  285. constructor(
  286. transport: SSETransport,
  287. sessionUrl: URL,
  288. opts?: {
  289. onEpochMismatch?: () => never
  290. heartbeatIntervalMs?: number
  291. heartbeatJitterFraction?: number
  292. /**
  293. * Per-instance auth header source. Omit to read the process-wide
  294. * CLAUDE_CODE_SESSION_ACCESS_TOKEN (single-session callers — REPL,
  295. * daemon). Required for concurrent multi-session callers.
  296. */
  297. getAuthHeaders?: () => Record<string, string>
  298. },
  299. ) {
  300. this.onEpochMismatch =
  301. opts?.onEpochMismatch ??
  302. (() => {
  303. // eslint-disable-next-line custom-rules/no-process-exit
  304. process.exit(1)
  305. })
  306. this.heartbeatIntervalMs =
  307. opts?.heartbeatIntervalMs ?? DEFAULT_HEARTBEAT_INTERVAL_MS
  308. this.heartbeatJitterFraction = opts?.heartbeatJitterFraction ?? 0
  309. this.getAuthHeaders = opts?.getAuthHeaders ?? getSessionIngressAuthHeaders
  310. // Session URL: https://host/v1/code/sessions/{id}
  311. if (sessionUrl.protocol !== 'http:' && sessionUrl.protocol !== 'https:') {
  312. throw new Error(
  313. `CCRClient: Expected http(s) URL, got ${sessionUrl.protocol}`,
  314. )
  315. }
  316. const pathname = sessionUrl.pathname.replace(/\/$/, '')
  317. this.sessionBaseUrl = `${sessionUrl.protocol}//${sessionUrl.host}${pathname}`
  318. // Extract session ID from the URL path (last segment)
  319. this.sessionId = pathname.split('/').pop() || ''
  320. this.workerState = new WorkerStateUploader({
  321. send: body =>
  322. this.request(
  323. 'put',
  324. '/worker',
  325. { worker_epoch: this.workerEpoch, ...body },
  326. 'PUT worker',
  327. ).then(r => r.ok),
  328. baseDelayMs: 500,
  329. maxDelayMs: 30_000,
  330. jitterMs: 500,
  331. })
  332. this.eventUploader = new SerialBatchEventUploader<ClientEvent>({
  333. maxBatchSize: 100,
  334. maxBatchBytes: 10 * 1024 * 1024,
  335. // flushStreamEventBuffer() enqueues a full 100ms window of accumulated
  336. // stream_events in one call. A burst of mixed delta types that don't
  337. // fold into a single snapshot could exceed the old cap (50) and deadlock
  338. // on the SerialBatchEventUploader backpressure check. Match
  339. // HybridTransport's bound — high enough to be memory-only.
  340. maxQueueSize: 100_000,
  341. send: async batch => {
  342. const result = await this.request(
  343. 'post',
  344. '/worker/events',
  345. { worker_epoch: this.workerEpoch, events: batch },
  346. 'client events',
  347. )
  348. if (!result.ok) {
  349. throw new RetryableError(
  350. 'client event POST failed',
  351. (result as any).retryAfterMs,
  352. )
  353. }
  354. },
  355. baseDelayMs: 500,
  356. maxDelayMs: 30_000,
  357. jitterMs: 500,
  358. })
  359. this.internalEventUploader = new SerialBatchEventUploader<WorkerEvent>({
  360. maxBatchSize: 100,
  361. maxBatchBytes: 10 * 1024 * 1024,
  362. maxQueueSize: 200,
  363. send: async batch => {
  364. const result = await this.request(
  365. 'post',
  366. '/worker/internal-events',
  367. { worker_epoch: this.workerEpoch, events: batch },
  368. 'internal events',
  369. )
  370. if (!result.ok) {
  371. throw new RetryableError(
  372. 'internal event POST failed',
  373. (result as any).retryAfterMs,
  374. )
  375. }
  376. },
  377. baseDelayMs: 500,
  378. maxDelayMs: 30_000,
  379. jitterMs: 500,
  380. })
  381. this.deliveryUploader = new SerialBatchEventUploader<{
  382. eventId: string
  383. status: 'received' | 'processing' | 'processed'
  384. }>({
  385. maxBatchSize: 64,
  386. maxQueueSize: 64,
  387. send: async batch => {
  388. const result = await this.request(
  389. 'post',
  390. '/worker/events/delivery',
  391. {
  392. worker_epoch: this.workerEpoch,
  393. updates: batch.map(d => ({
  394. event_id: d.eventId,
  395. status: d.status,
  396. })),
  397. },
  398. 'delivery batch',
  399. )
  400. if (!result.ok) {
  401. throw new RetryableError('delivery POST failed', (result as any).retryAfterMs)
  402. }
  403. },
  404. baseDelayMs: 500,
  405. maxDelayMs: 30_000,
  406. jitterMs: 500,
  407. })
  408. // Ack each received client_event so CCR can track delivery status.
  409. // Wired here (not in initialize()) so the callback is registered the
  410. // moment new CCRClient() returns — remoteIO must be free to call
  411. // transport.connect() immediately after without racing the first
  412. // SSE catch-up frame against an unwired onEventCallback.
  413. transport.setOnEvent((event: StreamClientEvent) => {
  414. this.reportDelivery(event.event_id, 'received')
  415. })
  416. }
  417. /**
  418. * Initialize the session worker:
  419. * 1. Take worker_epoch from the argument, or fall back to
  420. * CLAUDE_CODE_WORKER_EPOCH (set by env-manager / bridge spawner)
  421. * 2. Report state as 'idle'
  422. * 3. Start heartbeat timer
  423. *
  424. * In-process callers (replBridge) pass the epoch directly — they
  425. * registered the worker themselves and there is no parent process
  426. * setting env vars.
  427. */
  428. async initialize(epoch?: number): Promise<Record<string, unknown> | null> {
  429. const startMs = Date.now()
  430. if (Object.keys(this.getAuthHeaders()).length === 0) {
  431. throw new CCRInitError('no_auth_headers')
  432. }
  433. if (epoch === undefined) {
  434. const rawEpoch = process.env.CLAUDE_CODE_WORKER_EPOCH
  435. epoch = rawEpoch ? parseInt(rawEpoch, 10) : NaN
  436. }
  437. if (isNaN(epoch)) {
  438. throw new CCRInitError('missing_epoch')
  439. }
  440. this.workerEpoch = epoch
  441. // Concurrent with the init PUT — neither depends on the other.
  442. const restoredPromise = this.getWorkerState()
  443. const result = await this.request(
  444. 'put',
  445. '/worker',
  446. {
  447. worker_status: 'idle',
  448. worker_epoch: this.workerEpoch,
  449. // Clear stale pending_action/task_summary left by a prior
  450. // worker crash — the in-session clears don't survive process restart.
  451. external_metadata: {
  452. pending_action: null,
  453. task_summary: null,
  454. },
  455. },
  456. 'PUT worker (init)',
  457. )
  458. if (!result.ok) {
  459. // 409 → onEpochMismatch may throw, but request() catches it and returns
  460. // false. Without this check we'd continue to startHeartbeat(), leaking a
  461. // 20s timer against a dead epoch. Throw so connect()'s rejection handler
  462. // fires instead of the success path.
  463. throw new CCRInitError('worker_register_failed')
  464. }
  465. this.currentState = 'idle'
  466. this.startHeartbeat()
  467. // sessionActivity's refcount-gated timer fires while an API call or tool
  468. // is in-flight; without a write the container lease can expire mid-wait.
  469. // v1 wires this in WebSocketTransport per-connection.
  470. registerSessionActivityCallback(() => {
  471. void this.writeEvent({ type: 'keep_alive' })
  472. })
  473. logForDebugging(`CCRClient: initialized, epoch=${this.workerEpoch}`)
  474. logForDiagnosticsNoPII('info', 'cli_worker_lifecycle_initialized', {
  475. epoch: this.workerEpoch,
  476. duration_ms: Date.now() - startMs,
  477. })
  478. // Await the concurrent GET and log state_restored here, after the PUT
  479. // has succeeded — logging inside getWorkerState() raced: if the GET
  480. // resolved before the PUT failed, diagnostics showed both init_failed
  481. // and state_restored for the same session.
  482. const { metadata, durationMs } = await restoredPromise
  483. if (!this.closed) {
  484. logForDiagnosticsNoPII('info', 'cli_worker_state_restored', {
  485. duration_ms: durationMs,
  486. had_state: metadata !== null,
  487. })
  488. }
  489. return metadata
  490. }
  491. // Control_requests are marked processed and not re-delivered on
  492. // restart, so read back what the prior worker wrote.
  493. private async getWorkerState(): Promise<{
  494. metadata: Record<string, unknown> | null
  495. durationMs: number
  496. }> {
  497. const startMs = Date.now()
  498. const authHeaders = this.getAuthHeaders()
  499. if (Object.keys(authHeaders).length === 0) {
  500. return { metadata: null, durationMs: 0 }
  501. }
  502. const data = await this.getWithRetry<WorkerStateResponse>(
  503. `${this.sessionBaseUrl}/worker`,
  504. authHeaders,
  505. 'worker_state',
  506. )
  507. return {
  508. metadata: data?.worker?.external_metadata ?? null,
  509. durationMs: Date.now() - startMs,
  510. }
  511. }
  512. /**
  513. * Send an authenticated HTTP request to CCR. Handles auth headers,
  514. * 409 epoch mismatch, and error logging. Returns { ok: true } on 2xx.
  515. * On 429, reads Retry-After (integer seconds) so the uploader can honor
  516. * the server's backoff hint instead of blindly exponentiating.
  517. */
  518. private async request(
  519. method: 'post' | 'put',
  520. path: string,
  521. body: unknown,
  522. label: string,
  523. { timeout = 10_000 }: { timeout?: number } = {},
  524. ): Promise<RequestResult> {
  525. const authHeaders = this.getAuthHeaders()
  526. if (Object.keys(authHeaders).length === 0) return { ok: false }
  527. try {
  528. const response = await this.http[method](
  529. `${this.sessionBaseUrl}${path}`,
  530. body,
  531. {
  532. headers: {
  533. ...authHeaders,
  534. 'Content-Type': 'application/json',
  535. 'anthropic-version': '2023-06-01',
  536. 'User-Agent': getClaudeCodeUserAgent(),
  537. },
  538. validateStatus: alwaysValidStatus,
  539. timeout,
  540. },
  541. )
  542. if (response.status >= 200 && response.status < 300) {
  543. this.consecutiveAuthFailures = 0
  544. return { ok: true }
  545. }
  546. if (response.status === 409) {
  547. this.handleEpochMismatch()
  548. }
  549. if (response.status === 401 || response.status === 403) {
  550. // A 401 with an expired JWT is deterministic — no retry will
  551. // ever succeed. Check the token's own exp before burning
  552. // wall-clock on the threshold loop.
  553. const tok = getSessionIngressAuthToken()
  554. const exp = tok ? decodeJwtExpiry(tok) : null
  555. if (exp !== null && exp * 1000 < Date.now()) {
  556. logForDebugging(
  557. `CCRClient: session_token expired (exp=${new Date(exp * 1000).toISOString()}) — no refresh was delivered, exiting`,
  558. { level: 'error' },
  559. )
  560. logForDiagnosticsNoPII('error', 'cli_worker_token_expired_no_refresh')
  561. this.onEpochMismatch()
  562. }
  563. // Token looks valid but server says 401 — possible server-side
  564. // blip (userauth down, KMS hiccup). Count toward threshold.
  565. this.consecutiveAuthFailures++
  566. if (this.consecutiveAuthFailures >= MAX_CONSECUTIVE_AUTH_FAILURES) {
  567. logForDebugging(
  568. `CCRClient: ${this.consecutiveAuthFailures} consecutive auth failures with a valid-looking token — server-side auth unrecoverable, exiting`,
  569. { level: 'error' },
  570. )
  571. logForDiagnosticsNoPII('error', 'cli_worker_auth_failures_exhausted')
  572. this.onEpochMismatch()
  573. }
  574. }
  575. logForDebugging(`CCRClient: ${label} returned ${response.status}`, {
  576. level: 'warn',
  577. })
  578. logForDiagnosticsNoPII('warn', 'cli_worker_request_failed', {
  579. method,
  580. path,
  581. status: response.status,
  582. })
  583. if (response.status === 429) {
  584. const raw = response.headers?.['retry-after']
  585. const seconds = typeof raw === 'string' ? parseInt(raw, 10) : NaN
  586. if (!isNaN(seconds) && seconds >= 0) {
  587. return { ok: false, retryAfterMs: seconds * 1000 }
  588. }
  589. }
  590. return { ok: false }
  591. } catch (error) {
  592. logForDebugging(`CCRClient: ${label} failed: ${errorMessage(error)}`, {
  593. level: 'warn',
  594. })
  595. logForDiagnosticsNoPII('warn', 'cli_worker_request_error', {
  596. method,
  597. path,
  598. error_code: getErrnoCode(error),
  599. })
  600. return { ok: false }
  601. }
  602. }
  603. /** Report worker state to CCR via PUT /sessions/{id}/worker. */
  604. reportState(state: SessionState, details?: RequiresActionDetails): void {
  605. if (state === this.currentState && !details) return
  606. this.currentState = state
  607. this.workerState.enqueue({
  608. worker_status: state,
  609. requires_action_details: details
  610. ? {
  611. tool_name: details.tool_name,
  612. action_description: details.action_description,
  613. request_id: details.request_id,
  614. }
  615. : null,
  616. })
  617. }
  618. /** Report external metadata to CCR via PUT /worker. */
  619. reportMetadata(metadata: Record<string, unknown>): void {
  620. this.workerState.enqueue({ external_metadata: metadata })
  621. }
  622. /**
  623. * Handle epoch mismatch (409 Conflict). A newer CC instance has replaced
  624. * this one — exit immediately.
  625. */
  626. private handleEpochMismatch(): never {
  627. logForDebugging('CCRClient: Epoch mismatch (409), shutting down', {
  628. level: 'error',
  629. })
  630. logForDiagnosticsNoPII('error', 'cli_worker_epoch_mismatch')
  631. this.onEpochMismatch()
  632. }
  633. /** Start periodic heartbeat. */
  634. private startHeartbeat(): void {
  635. this.stopHeartbeat()
  636. const schedule = (): void => {
  637. const jitter =
  638. this.heartbeatIntervalMs *
  639. this.heartbeatJitterFraction *
  640. (2 * Math.random() - 1)
  641. this.heartbeatTimer = setTimeout(tick, this.heartbeatIntervalMs + jitter)
  642. }
  643. const tick = (): void => {
  644. void this.sendHeartbeat()
  645. // stopHeartbeat nulls the timer; check after the fire-and-forget send
  646. // but before rescheduling so close() during sendHeartbeat is honored.
  647. if (this.heartbeatTimer === null) return
  648. schedule()
  649. }
  650. schedule()
  651. }
  652. /** Stop heartbeat timer. */
  653. private stopHeartbeat(): void {
  654. if (this.heartbeatTimer) {
  655. clearTimeout(this.heartbeatTimer)
  656. this.heartbeatTimer = null
  657. }
  658. }
  659. /** Send a heartbeat via POST /sessions/{id}/worker/heartbeat. */
  660. private async sendHeartbeat(): Promise<void> {
  661. if (this.heartbeatInFlight) return
  662. this.heartbeatInFlight = true
  663. try {
  664. const result = await this.request(
  665. 'post',
  666. '/worker/heartbeat',
  667. { session_id: this.sessionId, worker_epoch: this.workerEpoch },
  668. 'Heartbeat',
  669. { timeout: 5_000 },
  670. )
  671. if (result.ok) {
  672. logForDebugging('CCRClient: Heartbeat sent')
  673. }
  674. } finally {
  675. this.heartbeatInFlight = false
  676. }
  677. }
  678. /**
  679. * Write a StdoutMessage as a client event via POST /sessions/{id}/worker/events.
  680. * These events are visible to frontend clients via the SSE stream.
  681. * Injects a UUID if missing to ensure server-side idempotency on retry.
  682. *
  683. * stream_event messages are held in a 100ms delay buffer and accumulated
  684. * (text_deltas for the same content block emit a full-so-far snapshot per
  685. * flush). A non-stream_event write flushes the buffer first so downstream
  686. * ordering is preserved.
  687. */
  688. async writeEvent(message: StdoutMessage): Promise<void> {
  689. if (message.type === 'stream_event') {
  690. this.streamEventBuffer.push(message)
  691. if (!this.streamEventTimer) {
  692. this.streamEventTimer = setTimeout(
  693. () => void this.flushStreamEventBuffer(),
  694. STREAM_EVENT_FLUSH_INTERVAL_MS,
  695. )
  696. }
  697. return
  698. }
  699. await this.flushStreamEventBuffer()
  700. if (message.type === 'assistant') {
  701. clearStreamAccumulatorForMessage(this.streamTextAccumulator, message)
  702. }
  703. await this.eventUploader.enqueue(this.toClientEvent(message))
  704. }
  705. /** Wrap a StdoutMessage as a ClientEvent, injecting a UUID if missing. */
  706. private toClientEvent(message: StdoutMessage): ClientEvent {
  707. const msg = message as unknown as Record<string, unknown>
  708. return {
  709. payload: {
  710. ...msg,
  711. uuid: typeof msg.uuid === 'string' ? msg.uuid : randomUUID(),
  712. } as EventPayload,
  713. }
  714. }
  715. /**
  716. * Drain the stream_event delay buffer: accumulate text_deltas into
  717. * full-so-far snapshots, clear the timer, enqueue the resulting events.
  718. * Called from the timer, from writeEvent on a non-stream message, and from
  719. * flush(). close() drops the buffer — call flush() first if you need
  720. * delivery.
  721. */
  722. private async flushStreamEventBuffer(): Promise<void> {
  723. if (this.streamEventTimer) {
  724. clearTimeout(this.streamEventTimer)
  725. this.streamEventTimer = null
  726. }
  727. if (this.streamEventBuffer.length === 0) return
  728. const buffered = this.streamEventBuffer
  729. this.streamEventBuffer = []
  730. const payloads = accumulateStreamEvents(
  731. buffered,
  732. this.streamTextAccumulator,
  733. )
  734. await this.eventUploader.enqueue(
  735. payloads.map(payload => ({ payload, ephemeral: true })),
  736. )
  737. }
  738. /**
  739. * Write an internal worker event via POST /sessions/{id}/worker/internal-events.
  740. * These events are NOT visible to frontend clients — they store worker-internal
  741. * state (transcript messages, compaction markers) needed for session resume.
  742. */
  743. async writeInternalEvent(
  744. eventType: string,
  745. payload: Record<string, unknown>,
  746. {
  747. isCompaction = false,
  748. agentId,
  749. }: {
  750. isCompaction?: boolean
  751. agentId?: string
  752. } = {},
  753. ): Promise<void> {
  754. const event: WorkerEvent = {
  755. payload: {
  756. type: eventType,
  757. ...payload,
  758. uuid: typeof payload.uuid === 'string' ? payload.uuid : randomUUID(),
  759. } as EventPayload,
  760. ...(isCompaction && { is_compaction: true }),
  761. ...(agentId && { agent_id: agentId }),
  762. }
  763. await this.internalEventUploader.enqueue(event)
  764. }
  765. /**
  766. * Flush pending internal events. Call between turns and on shutdown
  767. * to ensure transcript entries are persisted.
  768. */
  769. flushInternalEvents(): Promise<void> {
  770. return this.internalEventUploader.flush()
  771. }
  772. /**
  773. * Flush pending client events (writeEvent queue). Call before close()
  774. * when the caller needs delivery confirmation — close() abandons the
  775. * queue. Resolves once the uploader drains or rejects; returns
  776. * regardless of whether individual POSTs succeeded (check server state
  777. * separately if that matters).
  778. */
  779. async flush(): Promise<void> {
  780. await this.flushStreamEventBuffer()
  781. return this.eventUploader.flush()
  782. }
  783. /**
  784. * Read foreground agent internal events from
  785. * GET /sessions/{id}/worker/internal-events.
  786. * Returns transcript entries from the last compaction boundary, or null on failure.
  787. * Used for session resume.
  788. */
  789. async readInternalEvents(): Promise<InternalEvent[] | null> {
  790. return this.paginatedGet('/worker/internal-events', {}, 'internal_events')
  791. }
  792. /**
  793. * Read all subagent internal events from
  794. * GET /sessions/{id}/worker/internal-events?subagents=true.
  795. * Returns a merged stream across all non-foreground agents, each from its
  796. * compaction point. Used for session resume.
  797. */
  798. async readSubagentInternalEvents(): Promise<InternalEvent[] | null> {
  799. return this.paginatedGet(
  800. '/worker/internal-events',
  801. { subagents: 'true' },
  802. 'subagent_events',
  803. )
  804. }
  805. /**
  806. * Paginated GET with retry. Fetches all pages from a list endpoint,
  807. * retrying each page on failure with exponential backoff + jitter.
  808. */
  809. private async paginatedGet(
  810. path: string,
  811. params: Record<string, string>,
  812. context: string,
  813. ): Promise<InternalEvent[] | null> {
  814. const authHeaders = this.getAuthHeaders()
  815. if (Object.keys(authHeaders).length === 0) return null
  816. const allEvents: InternalEvent[] = []
  817. let cursor: string | undefined
  818. do {
  819. const url = new URL(`${this.sessionBaseUrl}${path}`)
  820. for (const [k, v] of Object.entries(params)) {
  821. url.searchParams.set(k, v)
  822. }
  823. if (cursor) {
  824. url.searchParams.set('cursor', cursor)
  825. }
  826. const page = await this.getWithRetry<ListInternalEventsResponse>(
  827. url.toString(),
  828. authHeaders,
  829. context,
  830. )
  831. if (!page) return null
  832. allEvents.push(...(page.data ?? []))
  833. cursor = page.next_cursor
  834. } while (cursor)
  835. logForDebugging(
  836. `CCRClient: Read ${allEvents.length} internal events from ${path}${params.subagents ? ' (subagents)' : ''}`,
  837. )
  838. return allEvents
  839. }
  840. /**
  841. * Single GET request with retry. Returns the parsed response body
  842. * on success, null if all retries are exhausted.
  843. */
  844. private async getWithRetry<T>(
  845. url: string,
  846. authHeaders: Record<string, string>,
  847. context: string,
  848. ): Promise<T | null> {
  849. for (let attempt = 1; attempt <= 10; attempt++) {
  850. let response
  851. try {
  852. response = await this.http.get<T>(url, {
  853. headers: {
  854. ...authHeaders,
  855. 'anthropic-version': '2023-06-01',
  856. 'User-Agent': getClaudeCodeUserAgent(),
  857. },
  858. validateStatus: alwaysValidStatus,
  859. timeout: 30_000,
  860. })
  861. } catch (error) {
  862. logForDebugging(
  863. `CCRClient: GET ${url} failed (attempt ${attempt}/10): ${errorMessage(error)}`,
  864. { level: 'warn' },
  865. )
  866. if (attempt < 10) {
  867. const delay =
  868. Math.min(500 * 2 ** (attempt - 1), 30_000) + Math.random() * 500
  869. await sleep(delay)
  870. }
  871. continue
  872. }
  873. if (response.status >= 200 && response.status < 300) {
  874. return response.data
  875. }
  876. if (response.status === 409) {
  877. this.handleEpochMismatch()
  878. }
  879. logForDebugging(
  880. `CCRClient: GET ${url} returned ${response.status} (attempt ${attempt}/10)`,
  881. { level: 'warn' },
  882. )
  883. if (attempt < 10) {
  884. const delay =
  885. Math.min(500 * 2 ** (attempt - 1), 30_000) + Math.random() * 500
  886. await sleep(delay)
  887. }
  888. }
  889. logForDebugging('CCRClient: GET retries exhausted', { level: 'error' })
  890. logForDiagnosticsNoPII('error', 'cli_worker_get_retries_exhausted', {
  891. context,
  892. })
  893. return null
  894. }
  895. /**
  896. * Report delivery status for a client-to-worker event.
  897. * POST /v1/code/sessions/{id}/worker/events/delivery (batch endpoint)
  898. */
  899. reportDelivery(
  900. eventId: string,
  901. status: 'received' | 'processing' | 'processed',
  902. ): void {
  903. void this.deliveryUploader.enqueue({ eventId, status })
  904. }
  905. /** Get the current epoch (for external use). */
  906. getWorkerEpoch(): number {
  907. return this.workerEpoch
  908. }
  909. /** Internal-event queue depth — shutdown-snapshot backpressure signal. */
  910. get internalEventsPending(): number {
  911. return this.internalEventUploader.pendingCount
  912. }
  913. /** Clean up uploaders and timers. */
  914. close(): void {
  915. this.closed = true
  916. this.stopHeartbeat()
  917. unregisterSessionActivityCallback()
  918. if (this.streamEventTimer) {
  919. clearTimeout(this.streamEventTimer)
  920. this.streamEventTimer = null
  921. }
  922. this.streamEventBuffer = []
  923. this.streamTextAccumulator.byMessage.clear()
  924. this.streamTextAccumulator.scopeToMessage.clear()
  925. this.workerState.close()
  926. this.eventUploader.close()
  927. this.internalEventUploader.close()
  928. this.deliveryUploader.close()
  929. }
  930. }