| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998 |
- import { randomUUID } from 'crypto'
- import type {
- SDKPartialAssistantMessage,
- StdoutMessage,
- } from 'src/entrypoints/sdk/controlTypes.js'
- import { decodeJwtExpiry } from '../../bridge/jwtUtils.js'
- import { logForDebugging } from '../../utils/debug.js'
- import { logForDiagnosticsNoPII } from '../../utils/diagLogs.js'
- import { errorMessage, getErrnoCode } from '../../utils/errors.js'
- import { createAxiosInstance } from '../../utils/proxy.js'
- import {
- registerSessionActivityCallback,
- unregisterSessionActivityCallback,
- } from '../../utils/sessionActivity.js'
- import {
- getSessionIngressAuthHeaders,
- getSessionIngressAuthToken,
- } from '../../utils/sessionIngressAuth.js'
- import type {
- RequiresActionDetails,
- SessionState,
- } from '../../utils/sessionState.js'
- import { sleep } from '../../utils/sleep.js'
- import { getClaudeCodeUserAgent } from '../../utils/userAgent.js'
- import {
- RetryableError,
- SerialBatchEventUploader,
- } from './SerialBatchEventUploader.js'
- import type { SSETransport, StreamClientEvent } from './SSETransport.js'
- import { WorkerStateUploader } from './WorkerStateUploader.js'
- /** Default interval between heartbeat events (20s; server TTL is 60s). */
- const DEFAULT_HEARTBEAT_INTERVAL_MS = 20_000
- /**
- * stream_event messages accumulate in a delay buffer for up to this many ms
- * before enqueue. Mirrors HybridTransport's batching window. text_delta
- * events for the same content block accumulate into a single full-so-far
- * snapshot per flush — each emitted event is self-contained so a client
- * connecting mid-stream sees complete text, not a fragment.
- */
- const STREAM_EVENT_FLUSH_INTERVAL_MS = 100
- /** Hoisted axios validateStatus callback to avoid per-request closure allocation. */
- function alwaysValidStatus(): boolean {
- return true
- }
- export type CCRInitFailReason =
- | 'no_auth_headers'
- | 'missing_epoch'
- | 'worker_register_failed'
- /** Thrown by initialize(); carries a typed reason for the diag classifier. */
- export class CCRInitError extends Error {
- constructor(readonly reason: CCRInitFailReason) {
- super(`CCRClient init failed: ${reason}`)
- }
- }
- /**
- * Consecutive 401/403 with a VALID-LOOKING token before giving up. An
- * expired JWT short-circuits this (exits immediately — deterministic,
- * retry is futile). This threshold is for the uncertain case: token's
- * exp is in the future but server says 401 (userauth down, KMS hiccup,
- * clock skew). 10 × 20s heartbeat ≈ 200s to ride it out.
- */
- const MAX_CONSECUTIVE_AUTH_FAILURES = 10
- type EventPayload = {
- uuid: string
- type: string
- [key: string]: unknown
- }
- type ClientEvent = {
- payload: EventPayload
- ephemeral?: boolean
- }
- /**
- * Structural subset of a stream_event carrying a text_delta. Not a narrowing
- * of SDKPartialAssistantMessage — RawMessageStreamEvent's delta is a union and
- * narrowing through two levels defeats the discriminant.
- */
- type CoalescedStreamEvent = {
- type: 'stream_event'
- uuid: string
- session_id: string
- parent_tool_use_id: string | null
- event: {
- type: 'content_block_delta'
- index: number
- delta: { type: 'text_delta'; text: string }
- }
- }
- /**
- * Accumulator state for text_delta coalescing. Keyed by API message ID so
- * lifetime is tied to the assistant message — cleared when the complete
- * SDKAssistantMessage arrives (writeEvent), which is reliable even when
- * abort/error paths skip content_block_stop/message_stop delivery.
- */
- export type StreamAccumulatorState = {
- /** API message ID (msg_...) → blocks[blockIndex] → chunk array. */
- byMessage: Map<string, string[][]>
- /**
- * {session_id}:{parent_tool_use_id} → active message ID.
- * content_block_delta events don't carry the message ID (only
- * message_start does), so we track which message is currently streaming
- * for each scope. At most one message streams per scope at a time.
- */
- scopeToMessage: Map<string, string>
- }
- export function createStreamAccumulator(): StreamAccumulatorState {
- return { byMessage: new Map(), scopeToMessage: new Map() }
- }
- function scopeKey(m: {
- session_id: string
- parent_tool_use_id: string | null
- }): string {
- return `${m.session_id}:${m.parent_tool_use_id ?? ''}`
- }
- /**
- * Accumulate text_delta stream_events into full-so-far snapshots per content
- * block. Each flush emits ONE event per touched block containing the FULL
- * accumulated text from the start of the block — a client connecting
- * mid-stream receives a self-contained snapshot, not a fragment.
- *
- * Non-text-delta events pass through unchanged. message_start records the
- * active message ID for the scope; content_block_delta appends chunks;
- * the snapshot event reuses the first text_delta UUID seen for that block in
- * this flush so server-side idempotency remains stable across retries.
- *
- * Cleanup happens in writeEvent when the complete assistant message arrives
- * (reliable), not here on stop events (abort/error paths skip those).
- */
- export function accumulateStreamEvents(
- buffer: SDKPartialAssistantMessage[],
- state: StreamAccumulatorState,
- ): EventPayload[] {
- const out: EventPayload[] = []
- // chunks[] → snapshot already in `out` this flush. Keyed by the chunks
- // array reference (stable per {messageId, index}) so subsequent deltas
- // rewrite the same entry instead of emitting one event per delta.
- const touched = new Map<string[], CoalescedStreamEvent>()
- for (const msg of buffer) {
- switch (msg.event.type) {
- case 'message_start': {
- const id = msg.event.message.id
- const prevId = state.scopeToMessage.get(scopeKey(msg))
- if (prevId) state.byMessage.delete(prevId)
- state.scopeToMessage.set(scopeKey(msg), id)
- state.byMessage.set(id, [])
- out.push(msg)
- break
- }
- case 'content_block_delta': {
- if (msg.event.delta.type !== 'text_delta') {
- out.push(msg)
- break
- }
- const messageId = state.scopeToMessage.get(scopeKey(msg))
- const blocks = messageId ? state.byMessage.get(messageId) : undefined
- if (!blocks) {
- // Delta without a preceding message_start (reconnect mid-stream,
- // or message_start was in a prior buffer that got dropped). Pass
- // through raw — can't produce a full-so-far snapshot without the
- // prior chunks anyway.
- out.push(msg)
- break
- }
- const chunks = (blocks[msg.event.index] ??= [])
- chunks.push(msg.event.delta.text)
- const existing = touched.get(chunks)
- if (existing) {
- existing.event.delta.text = chunks.join('')
- break
- }
- const snapshot: CoalescedStreamEvent = {
- type: 'stream_event',
- uuid: msg.uuid,
- session_id: msg.session_id,
- parent_tool_use_id: msg.parent_tool_use_id,
- event: {
- type: 'content_block_delta',
- index: msg.event.index,
- delta: { type: 'text_delta', text: chunks.join('') },
- },
- }
- touched.set(chunks, snapshot)
- out.push(snapshot)
- break
- }
- default:
- out.push(msg)
- }
- }
- return out
- }
- /**
- * Clear accumulator entries for a completed assistant message. Called from
- * writeEvent when the SDKAssistantMessage arrives — the reliable end-of-stream
- * signal that fires even when abort/interrupt/error skip SSE stop events.
- */
- export function clearStreamAccumulatorForMessage(
- state: StreamAccumulatorState,
- assistant: {
- session_id: string
- parent_tool_use_id: string | null
- message: { id: string }
- },
- ): void {
- state.byMessage.delete(assistant.message.id)
- const scope = scopeKey(assistant)
- if (state.scopeToMessage.get(scope) === assistant.message.id) {
- state.scopeToMessage.delete(scope)
- }
- }
- type RequestResult = { ok: true } | { ok: false; retryAfterMs?: number }
- type WorkerEvent = {
- payload: EventPayload
- is_compaction?: boolean
- agent_id?: string
- }
- export type InternalEvent = {
- event_id: string
- event_type: string
- payload: Record<string, unknown>
- event_metadata?: Record<string, unknown> | null
- is_compaction: boolean
- created_at: string
- agent_id?: string
- }
- type ListInternalEventsResponse = {
- data: InternalEvent[]
- next_cursor?: string
- }
- type WorkerStateResponse = {
- worker?: {
- external_metadata?: Record<string, unknown>
- }
- }
- /**
- * Manages the worker lifecycle protocol with CCR v2:
- * - Epoch management: reads worker_epoch from CLAUDE_CODE_WORKER_EPOCH env var
- * - Runtime state reporting: PUT /sessions/{id}/worker
- * - Heartbeat: POST /sessions/{id}/worker/heartbeat for liveness detection
- *
- * All writes go through this.request().
- */
- export class CCRClient {
- private workerEpoch = 0
- private readonly heartbeatIntervalMs: number
- private readonly heartbeatJitterFraction: number
- private heartbeatTimer: NodeJS.Timeout | null = null
- private heartbeatInFlight = false
- private closed = false
- private consecutiveAuthFailures = 0
- private currentState: SessionState | null = null
- private readonly sessionBaseUrl: string
- private readonly sessionId: string
- private readonly http = createAxiosInstance({ keepAlive: true })
- // stream_event delay buffer — accumulates content deltas for up to
- // STREAM_EVENT_FLUSH_INTERVAL_MS before enqueueing (reduces POST count
- // and enables text_delta coalescing). Mirrors HybridTransport's pattern.
- private streamEventBuffer: SDKPartialAssistantMessage[] = []
- private streamEventTimer: ReturnType<typeof setTimeout> | null = null
- // Full-so-far text accumulator. Persists across flushes so each emitted
- // text_delta event carries the complete text from the start of the block —
- // mid-stream reconnects see a self-contained snapshot. Keyed by API message
- // ID; cleared in writeEvent when the complete assistant message arrives.
- private streamTextAccumulator = createStreamAccumulator()
- private readonly workerState: WorkerStateUploader
- private readonly eventUploader: SerialBatchEventUploader<ClientEvent>
- private readonly internalEventUploader: SerialBatchEventUploader<WorkerEvent>
- private readonly deliveryUploader: SerialBatchEventUploader<{
- eventId: string
- status: 'received' | 'processing' | 'processed'
- }>
- /**
- * Called when the server returns 409 (a newer worker epoch superseded ours).
- * Default: process.exit(1) — correct for spawn-mode children where the
- * parent bridge re-spawns. In-process callers (replBridge) MUST override
- * this to close gracefully instead; exit would kill the user's REPL.
- */
- private readonly onEpochMismatch: () => never
- /**
- * Auth header source. Defaults to the process-wide session-ingress token
- * (CLAUDE_CODE_SESSION_ACCESS_TOKEN env var). Callers managing multiple
- * concurrent sessions with distinct JWTs MUST inject this — the env-var
- * path is a process global and would stomp across sessions.
- */
- private readonly getAuthHeaders: () => Record<string, string>
- constructor(
- transport: SSETransport,
- sessionUrl: URL,
- opts?: {
- onEpochMismatch?: () => never
- heartbeatIntervalMs?: number
- heartbeatJitterFraction?: number
- /**
- * Per-instance auth header source. Omit to read the process-wide
- * CLAUDE_CODE_SESSION_ACCESS_TOKEN (single-session callers — REPL,
- * daemon). Required for concurrent multi-session callers.
- */
- getAuthHeaders?: () => Record<string, string>
- },
- ) {
- this.onEpochMismatch =
- opts?.onEpochMismatch ??
- (() => {
- // eslint-disable-next-line custom-rules/no-process-exit
- process.exit(1)
- })
- this.heartbeatIntervalMs =
- opts?.heartbeatIntervalMs ?? DEFAULT_HEARTBEAT_INTERVAL_MS
- this.heartbeatJitterFraction = opts?.heartbeatJitterFraction ?? 0
- this.getAuthHeaders = opts?.getAuthHeaders ?? getSessionIngressAuthHeaders
- // Session URL: https://host/v1/code/sessions/{id}
- if (sessionUrl.protocol !== 'http:' && sessionUrl.protocol !== 'https:') {
- throw new Error(
- `CCRClient: Expected http(s) URL, got ${sessionUrl.protocol}`,
- )
- }
- const pathname = sessionUrl.pathname.replace(/\/$/, '')
- this.sessionBaseUrl = `${sessionUrl.protocol}//${sessionUrl.host}${pathname}`
- // Extract session ID from the URL path (last segment)
- this.sessionId = pathname.split('/').pop() || ''
- this.workerState = new WorkerStateUploader({
- send: body =>
- this.request(
- 'put',
- '/worker',
- { worker_epoch: this.workerEpoch, ...body },
- 'PUT worker',
- ).then(r => r.ok),
- baseDelayMs: 500,
- maxDelayMs: 30_000,
- jitterMs: 500,
- })
- this.eventUploader = new SerialBatchEventUploader<ClientEvent>({
- maxBatchSize: 100,
- maxBatchBytes: 10 * 1024 * 1024,
- // flushStreamEventBuffer() enqueues a full 100ms window of accumulated
- // stream_events in one call. A burst of mixed delta types that don't
- // fold into a single snapshot could exceed the old cap (50) and deadlock
- // on the SerialBatchEventUploader backpressure check. Match
- // HybridTransport's bound — high enough to be memory-only.
- maxQueueSize: 100_000,
- send: async batch => {
- const result = await this.request(
- 'post',
- '/worker/events',
- { worker_epoch: this.workerEpoch, events: batch },
- 'client events',
- )
- if (!result.ok) {
- throw new RetryableError(
- 'client event POST failed',
- (result as any).retryAfterMs,
- )
- }
- },
- baseDelayMs: 500,
- maxDelayMs: 30_000,
- jitterMs: 500,
- })
- this.internalEventUploader = new SerialBatchEventUploader<WorkerEvent>({
- maxBatchSize: 100,
- maxBatchBytes: 10 * 1024 * 1024,
- maxQueueSize: 200,
- send: async batch => {
- const result = await this.request(
- 'post',
- '/worker/internal-events',
- { worker_epoch: this.workerEpoch, events: batch },
- 'internal events',
- )
- if (!result.ok) {
- throw new RetryableError(
- 'internal event POST failed',
- (result as any).retryAfterMs,
- )
- }
- },
- baseDelayMs: 500,
- maxDelayMs: 30_000,
- jitterMs: 500,
- })
- this.deliveryUploader = new SerialBatchEventUploader<{
- eventId: string
- status: 'received' | 'processing' | 'processed'
- }>({
- maxBatchSize: 64,
- maxQueueSize: 64,
- send: async batch => {
- const result = await this.request(
- 'post',
- '/worker/events/delivery',
- {
- worker_epoch: this.workerEpoch,
- updates: batch.map(d => ({
- event_id: d.eventId,
- status: d.status,
- })),
- },
- 'delivery batch',
- )
- if (!result.ok) {
- throw new RetryableError('delivery POST failed', (result as any).retryAfterMs)
- }
- },
- baseDelayMs: 500,
- maxDelayMs: 30_000,
- jitterMs: 500,
- })
- // Ack each received client_event so CCR can track delivery status.
- // Wired here (not in initialize()) so the callback is registered the
- // moment new CCRClient() returns — remoteIO must be free to call
- // transport.connect() immediately after without racing the first
- // SSE catch-up frame against an unwired onEventCallback.
- transport.setOnEvent((event: StreamClientEvent) => {
- this.reportDelivery(event.event_id, 'received')
- })
- }
- /**
- * Initialize the session worker:
- * 1. Take worker_epoch from the argument, or fall back to
- * CLAUDE_CODE_WORKER_EPOCH (set by env-manager / bridge spawner)
- * 2. Report state as 'idle'
- * 3. Start heartbeat timer
- *
- * In-process callers (replBridge) pass the epoch directly — they
- * registered the worker themselves and there is no parent process
- * setting env vars.
- */
- async initialize(epoch?: number): Promise<Record<string, unknown> | null> {
- const startMs = Date.now()
- if (Object.keys(this.getAuthHeaders()).length === 0) {
- throw new CCRInitError('no_auth_headers')
- }
- if (epoch === undefined) {
- const rawEpoch = process.env.CLAUDE_CODE_WORKER_EPOCH
- epoch = rawEpoch ? parseInt(rawEpoch, 10) : NaN
- }
- if (isNaN(epoch)) {
- throw new CCRInitError('missing_epoch')
- }
- this.workerEpoch = epoch
- // Concurrent with the init PUT — neither depends on the other.
- const restoredPromise = this.getWorkerState()
- const result = await this.request(
- 'put',
- '/worker',
- {
- worker_status: 'idle',
- worker_epoch: this.workerEpoch,
- // Clear stale pending_action/task_summary left by a prior
- // worker crash — the in-session clears don't survive process restart.
- external_metadata: {
- pending_action: null,
- task_summary: null,
- },
- },
- 'PUT worker (init)',
- )
- if (!result.ok) {
- // 409 → onEpochMismatch may throw, but request() catches it and returns
- // false. Without this check we'd continue to startHeartbeat(), leaking a
- // 20s timer against a dead epoch. Throw so connect()'s rejection handler
- // fires instead of the success path.
- throw new CCRInitError('worker_register_failed')
- }
- this.currentState = 'idle'
- this.startHeartbeat()
- // sessionActivity's refcount-gated timer fires while an API call or tool
- // is in-flight; without a write the container lease can expire mid-wait.
- // v1 wires this in WebSocketTransport per-connection.
- registerSessionActivityCallback(() => {
- void this.writeEvent({ type: 'keep_alive' })
- })
- logForDebugging(`CCRClient: initialized, epoch=${this.workerEpoch}`)
- logForDiagnosticsNoPII('info', 'cli_worker_lifecycle_initialized', {
- epoch: this.workerEpoch,
- duration_ms: Date.now() - startMs,
- })
- // Await the concurrent GET and log state_restored here, after the PUT
- // has succeeded — logging inside getWorkerState() raced: if the GET
- // resolved before the PUT failed, diagnostics showed both init_failed
- // and state_restored for the same session.
- const { metadata, durationMs } = await restoredPromise
- if (!this.closed) {
- logForDiagnosticsNoPII('info', 'cli_worker_state_restored', {
- duration_ms: durationMs,
- had_state: metadata !== null,
- })
- }
- return metadata
- }
- // Control_requests are marked processed and not re-delivered on
- // restart, so read back what the prior worker wrote.
- private async getWorkerState(): Promise<{
- metadata: Record<string, unknown> | null
- durationMs: number
- }> {
- const startMs = Date.now()
- const authHeaders = this.getAuthHeaders()
- if (Object.keys(authHeaders).length === 0) {
- return { metadata: null, durationMs: 0 }
- }
- const data = await this.getWithRetry<WorkerStateResponse>(
- `${this.sessionBaseUrl}/worker`,
- authHeaders,
- 'worker_state',
- )
- return {
- metadata: data?.worker?.external_metadata ?? null,
- durationMs: Date.now() - startMs,
- }
- }
- /**
- * Send an authenticated HTTP request to CCR. Handles auth headers,
- * 409 epoch mismatch, and error logging. Returns { ok: true } on 2xx.
- * On 429, reads Retry-After (integer seconds) so the uploader can honor
- * the server's backoff hint instead of blindly exponentiating.
- */
- private async request(
- method: 'post' | 'put',
- path: string,
- body: unknown,
- label: string,
- { timeout = 10_000 }: { timeout?: number } = {},
- ): Promise<RequestResult> {
- const authHeaders = this.getAuthHeaders()
- if (Object.keys(authHeaders).length === 0) return { ok: false }
- try {
- const response = await this.http[method](
- `${this.sessionBaseUrl}${path}`,
- body,
- {
- headers: {
- ...authHeaders,
- 'Content-Type': 'application/json',
- 'anthropic-version': '2023-06-01',
- 'User-Agent': getClaudeCodeUserAgent(),
- },
- validateStatus: alwaysValidStatus,
- timeout,
- },
- )
- if (response.status >= 200 && response.status < 300) {
- this.consecutiveAuthFailures = 0
- return { ok: true }
- }
- if (response.status === 409) {
- this.handleEpochMismatch()
- }
- if (response.status === 401 || response.status === 403) {
- // A 401 with an expired JWT is deterministic — no retry will
- // ever succeed. Check the token's own exp before burning
- // wall-clock on the threshold loop.
- const tok = getSessionIngressAuthToken()
- const exp = tok ? decodeJwtExpiry(tok) : null
- if (exp !== null && exp * 1000 < Date.now()) {
- logForDebugging(
- `CCRClient: session_token expired (exp=${new Date(exp * 1000).toISOString()}) — no refresh was delivered, exiting`,
- { level: 'error' },
- )
- logForDiagnosticsNoPII('error', 'cli_worker_token_expired_no_refresh')
- this.onEpochMismatch()
- }
- // Token looks valid but server says 401 — possible server-side
- // blip (userauth down, KMS hiccup). Count toward threshold.
- this.consecutiveAuthFailures++
- if (this.consecutiveAuthFailures >= MAX_CONSECUTIVE_AUTH_FAILURES) {
- logForDebugging(
- `CCRClient: ${this.consecutiveAuthFailures} consecutive auth failures with a valid-looking token — server-side auth unrecoverable, exiting`,
- { level: 'error' },
- )
- logForDiagnosticsNoPII('error', 'cli_worker_auth_failures_exhausted')
- this.onEpochMismatch()
- }
- }
- logForDebugging(`CCRClient: ${label} returned ${response.status}`, {
- level: 'warn',
- })
- logForDiagnosticsNoPII('warn', 'cli_worker_request_failed', {
- method,
- path,
- status: response.status,
- })
- if (response.status === 429) {
- const raw = response.headers?.['retry-after']
- const seconds = typeof raw === 'string' ? parseInt(raw, 10) : NaN
- if (!isNaN(seconds) && seconds >= 0) {
- return { ok: false, retryAfterMs: seconds * 1000 }
- }
- }
- return { ok: false }
- } catch (error) {
- logForDebugging(`CCRClient: ${label} failed: ${errorMessage(error)}`, {
- level: 'warn',
- })
- logForDiagnosticsNoPII('warn', 'cli_worker_request_error', {
- method,
- path,
- error_code: getErrnoCode(error),
- })
- return { ok: false }
- }
- }
- /** Report worker state to CCR via PUT /sessions/{id}/worker. */
- reportState(state: SessionState, details?: RequiresActionDetails): void {
- if (state === this.currentState && !details) return
- this.currentState = state
- this.workerState.enqueue({
- worker_status: state,
- requires_action_details: details
- ? {
- tool_name: details.tool_name,
- action_description: details.action_description,
- request_id: details.request_id,
- }
- : null,
- })
- }
- /** Report external metadata to CCR via PUT /worker. */
- reportMetadata(metadata: Record<string, unknown>): void {
- this.workerState.enqueue({ external_metadata: metadata })
- }
- /**
- * Handle epoch mismatch (409 Conflict). A newer CC instance has replaced
- * this one — exit immediately.
- */
- private handleEpochMismatch(): never {
- logForDebugging('CCRClient: Epoch mismatch (409), shutting down', {
- level: 'error',
- })
- logForDiagnosticsNoPII('error', 'cli_worker_epoch_mismatch')
- this.onEpochMismatch()
- }
- /** Start periodic heartbeat. */
- private startHeartbeat(): void {
- this.stopHeartbeat()
- const schedule = (): void => {
- const jitter =
- this.heartbeatIntervalMs *
- this.heartbeatJitterFraction *
- (2 * Math.random() - 1)
- this.heartbeatTimer = setTimeout(tick, this.heartbeatIntervalMs + jitter)
- }
- const tick = (): void => {
- void this.sendHeartbeat()
- // stopHeartbeat nulls the timer; check after the fire-and-forget send
- // but before rescheduling so close() during sendHeartbeat is honored.
- if (this.heartbeatTimer === null) return
- schedule()
- }
- schedule()
- }
- /** Stop heartbeat timer. */
- private stopHeartbeat(): void {
- if (this.heartbeatTimer) {
- clearTimeout(this.heartbeatTimer)
- this.heartbeatTimer = null
- }
- }
- /** Send a heartbeat via POST /sessions/{id}/worker/heartbeat. */
- private async sendHeartbeat(): Promise<void> {
- if (this.heartbeatInFlight) return
- this.heartbeatInFlight = true
- try {
- const result = await this.request(
- 'post',
- '/worker/heartbeat',
- { session_id: this.sessionId, worker_epoch: this.workerEpoch },
- 'Heartbeat',
- { timeout: 5_000 },
- )
- if (result.ok) {
- logForDebugging('CCRClient: Heartbeat sent')
- }
- } finally {
- this.heartbeatInFlight = false
- }
- }
- /**
- * Write a StdoutMessage as a client event via POST /sessions/{id}/worker/events.
- * These events are visible to frontend clients via the SSE stream.
- * Injects a UUID if missing to ensure server-side idempotency on retry.
- *
- * stream_event messages are held in a 100ms delay buffer and accumulated
- * (text_deltas for the same content block emit a full-so-far snapshot per
- * flush). A non-stream_event write flushes the buffer first so downstream
- * ordering is preserved.
- */
- async writeEvent(message: StdoutMessage): Promise<void> {
- if (message.type === 'stream_event') {
- this.streamEventBuffer.push(message)
- if (!this.streamEventTimer) {
- this.streamEventTimer = setTimeout(
- () => void this.flushStreamEventBuffer(),
- STREAM_EVENT_FLUSH_INTERVAL_MS,
- )
- }
- return
- }
- await this.flushStreamEventBuffer()
- if (message.type === 'assistant') {
- clearStreamAccumulatorForMessage(this.streamTextAccumulator, message)
- }
- await this.eventUploader.enqueue(this.toClientEvent(message))
- }
- /** Wrap a StdoutMessage as a ClientEvent, injecting a UUID if missing. */
- private toClientEvent(message: StdoutMessage): ClientEvent {
- const msg = message as unknown as Record<string, unknown>
- return {
- payload: {
- ...msg,
- uuid: typeof msg.uuid === 'string' ? msg.uuid : randomUUID(),
- } as EventPayload,
- }
- }
- /**
- * Drain the stream_event delay buffer: accumulate text_deltas into
- * full-so-far snapshots, clear the timer, enqueue the resulting events.
- * Called from the timer, from writeEvent on a non-stream message, and from
- * flush(). close() drops the buffer — call flush() first if you need
- * delivery.
- */
- private async flushStreamEventBuffer(): Promise<void> {
- if (this.streamEventTimer) {
- clearTimeout(this.streamEventTimer)
- this.streamEventTimer = null
- }
- if (this.streamEventBuffer.length === 0) return
- const buffered = this.streamEventBuffer
- this.streamEventBuffer = []
- const payloads = accumulateStreamEvents(
- buffered,
- this.streamTextAccumulator,
- )
- await this.eventUploader.enqueue(
- payloads.map(payload => ({ payload, ephemeral: true })),
- )
- }
- /**
- * Write an internal worker event via POST /sessions/{id}/worker/internal-events.
- * These events are NOT visible to frontend clients — they store worker-internal
- * state (transcript messages, compaction markers) needed for session resume.
- */
- async writeInternalEvent(
- eventType: string,
- payload: Record<string, unknown>,
- {
- isCompaction = false,
- agentId,
- }: {
- isCompaction?: boolean
- agentId?: string
- } = {},
- ): Promise<void> {
- const event: WorkerEvent = {
- payload: {
- type: eventType,
- ...payload,
- uuid: typeof payload.uuid === 'string' ? payload.uuid : randomUUID(),
- } as EventPayload,
- ...(isCompaction && { is_compaction: true }),
- ...(agentId && { agent_id: agentId }),
- }
- await this.internalEventUploader.enqueue(event)
- }
- /**
- * Flush pending internal events. Call between turns and on shutdown
- * to ensure transcript entries are persisted.
- */
- flushInternalEvents(): Promise<void> {
- return this.internalEventUploader.flush()
- }
- /**
- * Flush pending client events (writeEvent queue). Call before close()
- * when the caller needs delivery confirmation — close() abandons the
- * queue. Resolves once the uploader drains or rejects; returns
- * regardless of whether individual POSTs succeeded (check server state
- * separately if that matters).
- */
- async flush(): Promise<void> {
- await this.flushStreamEventBuffer()
- return this.eventUploader.flush()
- }
- /**
- * Read foreground agent internal events from
- * GET /sessions/{id}/worker/internal-events.
- * Returns transcript entries from the last compaction boundary, or null on failure.
- * Used for session resume.
- */
- async readInternalEvents(): Promise<InternalEvent[] | null> {
- return this.paginatedGet('/worker/internal-events', {}, 'internal_events')
- }
- /**
- * Read all subagent internal events from
- * GET /sessions/{id}/worker/internal-events?subagents=true.
- * Returns a merged stream across all non-foreground agents, each from its
- * compaction point. Used for session resume.
- */
- async readSubagentInternalEvents(): Promise<InternalEvent[] | null> {
- return this.paginatedGet(
- '/worker/internal-events',
- { subagents: 'true' },
- 'subagent_events',
- )
- }
- /**
- * Paginated GET with retry. Fetches all pages from a list endpoint,
- * retrying each page on failure with exponential backoff + jitter.
- */
- private async paginatedGet(
- path: string,
- params: Record<string, string>,
- context: string,
- ): Promise<InternalEvent[] | null> {
- const authHeaders = this.getAuthHeaders()
- if (Object.keys(authHeaders).length === 0) return null
- const allEvents: InternalEvent[] = []
- let cursor: string | undefined
- do {
- const url = new URL(`${this.sessionBaseUrl}${path}`)
- for (const [k, v] of Object.entries(params)) {
- url.searchParams.set(k, v)
- }
- if (cursor) {
- url.searchParams.set('cursor', cursor)
- }
- const page = await this.getWithRetry<ListInternalEventsResponse>(
- url.toString(),
- authHeaders,
- context,
- )
- if (!page) return null
- allEvents.push(...(page.data ?? []))
- cursor = page.next_cursor
- } while (cursor)
- logForDebugging(
- `CCRClient: Read ${allEvents.length} internal events from ${path}${params.subagents ? ' (subagents)' : ''}`,
- )
- return allEvents
- }
- /**
- * Single GET request with retry. Returns the parsed response body
- * on success, null if all retries are exhausted.
- */
- private async getWithRetry<T>(
- url: string,
- authHeaders: Record<string, string>,
- context: string,
- ): Promise<T | null> {
- for (let attempt = 1; attempt <= 10; attempt++) {
- let response
- try {
- response = await this.http.get<T>(url, {
- headers: {
- ...authHeaders,
- 'anthropic-version': '2023-06-01',
- 'User-Agent': getClaudeCodeUserAgent(),
- },
- validateStatus: alwaysValidStatus,
- timeout: 30_000,
- })
- } catch (error) {
- logForDebugging(
- `CCRClient: GET ${url} failed (attempt ${attempt}/10): ${errorMessage(error)}`,
- { level: 'warn' },
- )
- if (attempt < 10) {
- const delay =
- Math.min(500 * 2 ** (attempt - 1), 30_000) + Math.random() * 500
- await sleep(delay)
- }
- continue
- }
- if (response.status >= 200 && response.status < 300) {
- return response.data
- }
- if (response.status === 409) {
- this.handleEpochMismatch()
- }
- logForDebugging(
- `CCRClient: GET ${url} returned ${response.status} (attempt ${attempt}/10)`,
- { level: 'warn' },
- )
- if (attempt < 10) {
- const delay =
- Math.min(500 * 2 ** (attempt - 1), 30_000) + Math.random() * 500
- await sleep(delay)
- }
- }
- logForDebugging('CCRClient: GET retries exhausted', { level: 'error' })
- logForDiagnosticsNoPII('error', 'cli_worker_get_retries_exhausted', {
- context,
- })
- return null
- }
- /**
- * Report delivery status for a client-to-worker event.
- * POST /v1/code/sessions/{id}/worker/events/delivery (batch endpoint)
- */
- reportDelivery(
- eventId: string,
- status: 'received' | 'processing' | 'processed',
- ): void {
- void this.deliveryUploader.enqueue({ eventId, status })
- }
- /** Get the current epoch (for external use). */
- getWorkerEpoch(): number {
- return this.workerEpoch
- }
- /** Internal-event queue depth — shutdown-snapshot backpressure signal. */
- get internalEventsPending(): number {
- return this.internalEventUploader.pendingCount
- }
- /** Clean up uploaders and timers. */
- close(): void {
- this.closed = true
- this.stopHeartbeat()
- unregisterSessionActivityCallback()
- if (this.streamEventTimer) {
- clearTimeout(this.streamEventTimer)
- this.streamEventTimer = null
- }
- this.streamEventBuffer = []
- this.streamTextAccumulator.byMessage.clear()
- this.streamTextAccumulator.scopeToMessage.clear()
- this.workerState.close()
- this.eventUploader.close()
- this.internalEventUploader.close()
- this.deliveryUploader.close()
- }
- }
|