| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008 |
- // biome-ignore-all assist/source/organizeImports: ANT-ONLY import markers must not be reordered
- /**
- * Env-less Remote Control bridge core.
- *
- * "Env-less" = no Environments API layer. Distinct from "CCR v2" (the
- * /worker/* transport protocol) — the env-based path (replBridge.ts) can also
- * use CCR v2 transport via CLAUDE_CODE_USE_CCR_V2. This file is about removing
- * the poll/dispatch layer, not about which transport protocol is underneath.
- *
- * Unlike initBridgeCore (env-based, ~2400 lines), this connects directly
- * to the session-ingress layer without the Environments API work-dispatch
- * layer:
- *
- * 1. POST /v1/code/sessions (OAuth, no env_id) → session.id
- * 2. POST /v1/code/sessions/{id}/bridge (OAuth) → {worker_jwt, expires_in, api_base_url, worker_epoch}
- * Each /bridge call bumps epoch — it IS the register. No separate /worker/register.
- * 3. createV2ReplTransport(worker_jwt, worker_epoch) → SSE + CCRClient
- * 4. createTokenRefreshScheduler → proactive /bridge re-call (new JWT + new epoch)
- * 5. 401 on SSE → rebuild transport with fresh /bridge credentials (same seq-num)
- *
- * No register/poll/ack/stop/heartbeat/deregister environment lifecycle.
- * The Environments API historically existed because CCR's /worker/*
- * endpoints required a session_id+role=worker JWT that only the work-dispatch
- * layer could mint. Server PR #292605 (renamed in #293280) adds the /bridge endpoint as a direct
- * OAuth→worker_jwt exchange, making the env layer optional for REPL sessions.
- *
- * Gated by `tengu_bridge_repl_v2` GrowthBook flag in initReplBridge.ts.
- * REPL-only — daemon/print stay on env-based.
- */
- import { feature } from 'bun:bundle'
- import axios from 'axios'
- import {
- createV2ReplTransport,
- type ReplBridgeTransport,
- } from './replBridgeTransport.js'
- import { buildCCRv2SdkUrl } from './workSecret.js'
- import { toCompatSessionId } from './sessionIdCompat.js'
- import { FlushGate } from './flushGate.js'
- import { createTokenRefreshScheduler } from './jwtUtils.js'
- import { getTrustedDeviceToken } from './trustedDevice.js'
- import {
- getEnvLessBridgeConfig,
- type EnvLessBridgeConfig,
- } from './envLessBridgeConfig.js'
- import {
- handleIngressMessage,
- handleServerControlRequest,
- makeResultMessage,
- isEligibleBridgeMessage,
- extractTitleText,
- BoundedUUIDSet,
- } from './bridgeMessaging.js'
- import { logBridgeSkip } from './debugUtils.js'
- import { logForDebugging } from '../utils/debug.js'
- import { logForDiagnosticsNoPII } from '../utils/diagLogs.js'
- import { isInProtectedNamespace } from '../utils/envUtils.js'
- import { errorMessage } from '../utils/errors.js'
- import { sleep } from '../utils/sleep.js'
- import { registerCleanup } from '../utils/cleanupRegistry.js'
- import {
- type AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS,
- logEvent,
- } from '../services/analytics/index.js'
- import type { ReplBridgeHandle, BridgeState } from './replBridge.js'
- import type { Message } from '../types/message.js'
- import type { SDKMessage } from '../entrypoints/agentSdkTypes.js'
- import type {
- SDKControlRequest,
- SDKControlResponse,
- } from '../entrypoints/sdk/controlTypes.js'
- import type { PermissionMode } from '../utils/permissions/PermissionMode.js'
- const ANTHROPIC_VERSION = '2023-06-01'
- // Telemetry discriminator for ws_connected. 'initial' is the default and
- // never passed to rebuildTransport (which can only be called post-init);
- // Exclude<> makes that constraint explicit at both signatures.
- type ConnectCause = 'initial' | 'proactive_refresh' | 'auth_401_recovery'
- function oauthHeaders(accessToken: string): Record<string, string> {
- return {
- Authorization: `Bearer ${accessToken}`,
- 'Content-Type': 'application/json',
- 'anthropic-version': ANTHROPIC_VERSION,
- }
- }
- export type EnvLessBridgeParams = {
- baseUrl: string
- orgUUID: string
- title: string
- getAccessToken: () => string | undefined
- onAuth401?: (staleAccessToken: string) => Promise<boolean>
- /**
- * Converts internal Message[] → SDKMessage[] for writeMessages() and the
- * initial-flush/drain paths. Injected rather than imported — mappers.ts
- * transitively pulls in src/commands.ts (entire command registry + React
- * tree) which would bloat bundles that don't already have it.
- */
- toSDKMessages: (messages: Message[]) => SDKMessage[]
- initialHistoryCap: number
- initialMessages?: Message[]
- onInboundMessage?: (msg: SDKMessage) => void | Promise<void>
- /**
- * Fired on each title-worthy user message seen in writeMessages() until
- * the callback returns true (done). Mirrors replBridge.ts's onUserMessage —
- * caller derives a title and PATCHes /v1/sessions/{id} so auto-started
- * sessions don't stay at the generic fallback. The caller owns the
- * derive-at-count-1-and-3 policy; the transport just keeps calling until
- * told to stop. sessionId is the raw cse_* — updateBridgeSessionTitle
- * retags internally.
- */
- onUserMessage?: (text: string, sessionId: string) => boolean
- onPermissionResponse?: (response: SDKControlResponse) => void
- onInterrupt?: () => void
- onSetModel?: (model: string | undefined) => void
- onSetMaxThinkingTokens?: (maxTokens: number | null) => void
- onSetPermissionMode?: (
- mode: PermissionMode,
- ) => { ok: true } | { ok: false; error: string }
- onStateChange?: (state: BridgeState, detail?: string) => void
- /**
- * When true, skip opening the SSE read stream — only the CCRClient write
- * path is activated. Threaded to createV2ReplTransport and
- * handleServerControlRequest.
- */
- outboundOnly?: boolean
- /** Free-form tags for session categorization (e.g. ['ccr-mirror']). */
- tags?: string[]
- }
- /**
- * Create a session, fetch a worker JWT, connect the v2 transport.
- *
- * Returns null on any pre-flight failure (session create failed, /bridge
- * failed, transport setup failed). Caller (initReplBridge) surfaces this
- * as a generic "initialization failed" state.
- */
- export async function initEnvLessBridgeCore(
- params: EnvLessBridgeParams,
- ): Promise<ReplBridgeHandle | null> {
- const {
- baseUrl,
- orgUUID,
- title,
- getAccessToken,
- onAuth401,
- toSDKMessages,
- initialHistoryCap,
- initialMessages,
- onInboundMessage,
- onUserMessage,
- onPermissionResponse,
- onInterrupt,
- onSetModel,
- onSetMaxThinkingTokens,
- onSetPermissionMode,
- onStateChange,
- outboundOnly,
- tags,
- } = params
- const cfg = await getEnvLessBridgeConfig()
- // ── 1. Create session (POST /v1/code/sessions, no env_id) ───────────────
- const accessToken = getAccessToken()
- if (!accessToken) {
- logForDebugging('[remote-bridge] No OAuth token')
- return null
- }
- const createdSessionId = await withRetry(
- () =>
- createCodeSession(baseUrl, accessToken, title, cfg.http_timeout_ms, tags),
- 'createCodeSession',
- cfg,
- )
- if (!createdSessionId) {
- onStateChange?.('failed', 'Session creation failed — see debug log')
- logBridgeSkip('v2_session_create_failed', undefined, true)
- return null
- }
- const sessionId: string = createdSessionId
- logForDebugging(`[remote-bridge] Created session ${sessionId}`)
- logForDiagnosticsNoPII('info', 'bridge_repl_v2_session_created')
- // ── 2. Fetch bridge credentials (POST /bridge → worker_jwt, expires_in, api_base_url) ──
- const credentials = await withRetry(
- () =>
- fetchRemoteCredentials(
- sessionId,
- baseUrl,
- accessToken,
- cfg.http_timeout_ms,
- ),
- 'fetchRemoteCredentials',
- cfg,
- )
- if (!credentials) {
- onStateChange?.('failed', 'Remote credentials fetch failed — see debug log')
- logBridgeSkip('v2_remote_creds_failed', undefined, true)
- void archiveSession(
- sessionId,
- baseUrl,
- accessToken,
- orgUUID,
- cfg.http_timeout_ms,
- )
- return null
- }
- logForDebugging(
- `[remote-bridge] Fetched bridge credentials (expires_in=${credentials.expires_in}s)`,
- )
- // ── 3. Build v2 transport (SSETransport + CCRClient) ────────────────────
- const sessionUrl = buildCCRv2SdkUrl(credentials.api_base_url, sessionId)
- logForDebugging(`[remote-bridge] v2 session URL: ${sessionUrl}`)
- let transport: ReplBridgeTransport
- try {
- transport = await createV2ReplTransport({
- sessionUrl,
- ingressToken: credentials.worker_jwt,
- sessionId,
- epoch: credentials.worker_epoch,
- heartbeatIntervalMs: cfg.heartbeat_interval_ms,
- heartbeatJitterFraction: cfg.heartbeat_jitter_fraction,
- // Per-instance closure — keeps the worker JWT out of
- // process.env.CLAUDE_CODE_SESSION_ACCESS_TOKEN, which mcp/client.ts
- // reads ungatedly and would otherwise send to user-configured ws/http
- // MCP servers. Frozen-at-construction is correct: transport is fully
- // rebuilt on refresh (rebuildTransport below).
- getAuthToken: () => credentials.worker_jwt,
- outboundOnly,
- })
- } catch (err) {
- logForDebugging(
- `[remote-bridge] v2 transport setup failed: ${errorMessage(err)}`,
- { level: 'error' },
- )
- onStateChange?.('failed', `Transport setup failed: ${errorMessage(err)}`)
- logBridgeSkip('v2_transport_setup_failed', undefined, true)
- void archiveSession(
- sessionId,
- baseUrl,
- accessToken,
- orgUUID,
- cfg.http_timeout_ms,
- )
- return null
- }
- logForDebugging(
- `[remote-bridge] v2 transport created (epoch=${credentials.worker_epoch})`,
- )
- onStateChange?.('ready')
- // ── 4. State ────────────────────────────────────────────────────────────
- // Echo dedup: messages we POST come back on the read stream. Seeded with
- // initial message UUIDs so server echoes of flushed history are recognized.
- // Both sets cover initial UUIDs — recentPostedUUIDs is a 2000-cap ring buffer
- // and could evict them after enough live writes; initialMessageUUIDs is the
- // unbounded fallback. Defense-in-depth; mirrors replBridge.ts.
- const recentPostedUUIDs = new BoundedUUIDSet(cfg.uuid_dedup_buffer_size)
- const initialMessageUUIDs = new Set<string>()
- if (initialMessages) {
- for (const msg of initialMessages) {
- initialMessageUUIDs.add(msg.uuid)
- recentPostedUUIDs.add(msg.uuid)
- }
- }
- // Defensive dedup for re-delivered inbound prompts (seq-num negotiation
- // edge cases, server history replay after transport swap).
- const recentInboundUUIDs = new BoundedUUIDSet(cfg.uuid_dedup_buffer_size)
- // FlushGate: queue live writes while the history flush POST is in flight,
- // so the server receives [history..., live...] in order.
- const flushGate = new FlushGate<Message>()
- let initialFlushDone = false
- let tornDown = false
- let authRecoveryInFlight = false
- // Latch for onUserMessage — flips true when the callback returns true
- // (policy says "done deriving"). sessionId is const (no re-create path —
- // rebuildTransport swaps JWT/epoch, same session), so no reset needed.
- let userMessageCallbackDone = !onUserMessage
- // Telemetry: why did onConnect fire? Set by rebuildTransport before
- // wireTransportCallbacks; read asynchronously by onConnect. Race-safe
- // because authRecoveryInFlight serializes rebuild callers, and a fresh
- // initEnvLessBridgeCore() call gets a fresh closure defaulting to 'initial'.
- let connectCause: ConnectCause = 'initial'
- // Deadline for onConnect after transport.connect(). Cleared by onConnect
- // (connected) and onClose (got a close — not silent). If neither fires
- // before cfg.connect_timeout_ms, onConnectTimeout emits — the only
- // signal for the `started → (silence)` gap.
- let connectDeadline: ReturnType<typeof setTimeout> | undefined
- function onConnectTimeout(cause: ConnectCause): void {
- if (tornDown) return
- logEvent('tengu_bridge_repl_connect_timeout', {
- v2: true,
- elapsed_ms: cfg.connect_timeout_ms,
- cause:
- cause as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS,
- })
- }
- // ── 5. JWT refresh scheduler ────────────────────────────────────────────
- // Schedule a callback 5min before expiry (per response.expires_in). On fire,
- // re-fetch /bridge with OAuth → rebuild transport with fresh credentials.
- // Each /bridge call bumps epoch server-side, so a JWT-only swap would leave
- // the old CCRClient heartbeating with a stale epoch → 409 within 20s.
- // JWT is opaque — do not decode.
- const refresh = createTokenRefreshScheduler({
- refreshBufferMs: cfg.token_refresh_buffer_ms,
- getAccessToken: async () => {
- // Unconditionally refresh OAuth before calling /bridge — getAccessToken()
- // returns expired tokens as non-null strings (doesn't check expiresAt),
- // so truthiness doesn't mean valid. Pass the stale token to onAuth401
- // so handleOAuth401Error's keychain-comparison can detect parallel refresh.
- const stale = getAccessToken()
- if (onAuth401) await onAuth401(stale ?? '')
- return getAccessToken() ?? stale
- },
- onRefresh: (sid, oauthToken) => {
- void (async () => {
- // Laptop wake: overdue proactive timer + SSE 401 fire ~simultaneously.
- // Claim the flag BEFORE the /bridge fetch so the other path skips
- // entirely — prevents double epoch bump (each /bridge call bumps; if
- // both fetch, the first rebuild gets a stale epoch and 409s).
- if (authRecoveryInFlight || tornDown) {
- logForDebugging(
- '[remote-bridge] Recovery already in flight, skipping proactive refresh',
- )
- return
- }
- authRecoveryInFlight = true
- try {
- const fresh = await withRetry(
- () =>
- fetchRemoteCredentials(
- sid,
- baseUrl,
- oauthToken,
- cfg.http_timeout_ms,
- ),
- 'fetchRemoteCredentials (proactive)',
- cfg,
- )
- if (!fresh || tornDown) return
- await rebuildTransport(fresh, 'proactive_refresh')
- logForDebugging(
- '[remote-bridge] Transport rebuilt (proactive refresh)',
- )
- } catch (err) {
- logForDebugging(
- `[remote-bridge] Proactive refresh rebuild failed: ${errorMessage(err)}`,
- { level: 'error' },
- )
- logForDiagnosticsNoPII(
- 'error',
- 'bridge_repl_v2_proactive_refresh_failed',
- )
- if (!tornDown) {
- onStateChange?.('failed', `Refresh failed: ${errorMessage(err)}`)
- }
- } finally {
- authRecoveryInFlight = false
- }
- })()
- },
- label: 'remote',
- })
- refresh.scheduleFromExpiresIn(sessionId, credentials.expires_in)
- // ── 6. Wire callbacks (extracted so transport-rebuild can re-wire) ──────
- function wireTransportCallbacks(): void {
- transport.setOnConnect(() => {
- clearTimeout(connectDeadline)
- logForDebugging('[remote-bridge] v2 transport connected')
- logForDiagnosticsNoPII('info', 'bridge_repl_v2_transport_connected')
- logEvent('tengu_bridge_repl_ws_connected', {
- v2: true,
- cause:
- connectCause as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS,
- })
- if (!initialFlushDone && initialMessages && initialMessages.length > 0) {
- initialFlushDone = true
- // Capture current transport — if 401/teardown happens mid-flush,
- // the stale .finally() must not drain the gate or signal connected.
- // (Same guard pattern as replBridge.ts:1119.)
- const flushTransport = transport
- void flushHistory(initialMessages)
- .catch(e =>
- logForDebugging(`[remote-bridge] flushHistory failed: ${e}`),
- )
- .finally(() => {
- // authRecoveryInFlight catches the v1-vs-v2 asymmetry: v1 nulls
- // transport synchronously in setOnClose (replBridge.ts:1175), so
- // transport !== flushTransport trips immediately. v2 doesn't null —
- // transport reassigned only at rebuildTransport:346, 3 awaits deep.
- // authRecoveryInFlight is set synchronously at rebuildTransport entry.
- if (
- transport !== flushTransport ||
- tornDown ||
- authRecoveryInFlight
- ) {
- return
- }
- drainFlushGate()
- onStateChange?.('connected')
- })
- } else if (!flushGate.active) {
- onStateChange?.('connected')
- }
- })
- transport.setOnData((data: string) => {
- handleIngressMessage(
- data,
- recentPostedUUIDs,
- recentInboundUUIDs,
- onInboundMessage,
- // Remote client answered the permission prompt — the turn resumes.
- // Without this the server stays on requires_action until the next
- // user message or turn-end result.
- onPermissionResponse
- ? res => {
- transport.reportState('running')
- onPermissionResponse(res)
- }
- : undefined,
- req =>
- handleServerControlRequest(req, {
- transport,
- sessionId,
- onInterrupt,
- onSetModel,
- onSetMaxThinkingTokens,
- onSetPermissionMode,
- outboundOnly,
- }),
- )
- })
- transport.setOnClose((code?: number) => {
- clearTimeout(connectDeadline)
- if (tornDown) return
- logForDebugging(`[remote-bridge] v2 transport closed (code=${code})`)
- logEvent('tengu_bridge_repl_ws_closed', { code, v2: true })
- // onClose fires only for TERMINAL failures: 401 (JWT invalid),
- // 4090 (CCR epoch mismatch), 4091 (CCR init failed), or SSE 10-min
- // reconnect budget exhausted. Transient disconnects are handled
- // transparently inside SSETransport. 401 we can recover from (fetch
- // fresh JWT, rebuild transport); all other codes are dead-ends.
- if (code === 401 && !authRecoveryInFlight) {
- void recoverFromAuthFailure()
- return
- }
- onStateChange?.('failed', `Transport closed (code ${code})`)
- })
- }
- // ── 7. Transport rebuild (shared by proactive refresh + 401 recovery) ──
- // Every /bridge call bumps epoch server-side. Both refresh paths must
- // rebuild the transport with the new epoch — a JWT-only swap leaves the
- // old CCRClient heartbeating stale epoch → 409. SSE resumes from the old
- // transport's high-water-mark seq-num so no server-side replay.
- // Caller MUST set authRecoveryInFlight = true before calling (synchronously,
- // before any await) and clear it in a finally. This function doesn't manage
- // the flag — moving it here would be too late to prevent a double /bridge
- // fetch, and each fetch bumps epoch.
- async function rebuildTransport(
- fresh: RemoteCredentials,
- cause: Exclude<ConnectCause, 'initial'>,
- ): Promise<void> {
- connectCause = cause
- // Queue writes during rebuild — once /bridge returns, the old transport's
- // epoch is stale and its next write/heartbeat 409s. Without this gate,
- // writeMessages adds UUIDs to recentPostedUUIDs then writeBatch silently
- // no-ops (closed uploader after 409) → permanent silent message loss.
- flushGate.start()
- try {
- const seq = transport.getLastSequenceNum()
- transport.close()
- transport = await createV2ReplTransport({
- sessionUrl: buildCCRv2SdkUrl(fresh.api_base_url, sessionId),
- ingressToken: fresh.worker_jwt,
- sessionId,
- epoch: fresh.worker_epoch,
- heartbeatIntervalMs: cfg.heartbeat_interval_ms,
- heartbeatJitterFraction: cfg.heartbeat_jitter_fraction,
- initialSequenceNum: seq,
- getAuthToken: () => fresh.worker_jwt,
- outboundOnly,
- })
- if (tornDown) {
- // Teardown fired during the async createV2ReplTransport window.
- // Don't wire/connect/schedule — we'd re-arm timers after cancelAll()
- // and fire onInboundMessage into a torn-down bridge.
- transport.close()
- return
- }
- wireTransportCallbacks()
- transport.connect()
- connectDeadline = setTimeout(
- onConnectTimeout,
- cfg.connect_timeout_ms,
- connectCause,
- )
- refresh.scheduleFromExpiresIn(sessionId, fresh.expires_in)
- // Drain queued writes into the new uploader. Runs before
- // ccr.initialize() resolves (transport.connect() is fire-and-forget),
- // but the uploader serializes behind the initial PUT /worker. If
- // init fails (4091), events drop — but only recentPostedUUIDs
- // (per-instance) is populated, so re-enabling the bridge re-flushes.
- drainFlushGate()
- } finally {
- // End the gate on failure paths too — drainFlushGate already ended
- // it on success. Queued messages are dropped (transport still dead).
- flushGate.drop()
- }
- }
- // ── 8. 401 recovery (OAuth refresh + rebuild) ───────────────────────────
- async function recoverFromAuthFailure(): Promise<void> {
- // setOnClose already guards `!authRecoveryInFlight` but that check and
- // this set must be atomic against onRefresh — claim synchronously before
- // any await. Laptop wake fires both paths ~simultaneously.
- if (authRecoveryInFlight) return
- authRecoveryInFlight = true
- onStateChange?.('reconnecting', 'JWT expired — refreshing')
- logForDebugging('[remote-bridge] 401 on SSE — attempting JWT refresh')
- try {
- // Unconditionally try OAuth refresh — getAccessToken() returns expired
- // tokens as non-null strings, so !oauthToken doesn't catch expiry.
- // Pass the stale token so handleOAuth401Error's keychain-comparison
- // can detect if another tab already refreshed.
- const stale = getAccessToken()
- if (onAuth401) await onAuth401(stale ?? '')
- const oauthToken = getAccessToken() ?? stale
- if (!oauthToken || tornDown) {
- if (!tornDown) {
- onStateChange?.('failed', 'JWT refresh failed: no OAuth token')
- }
- return
- }
- const fresh = await withRetry(
- () =>
- fetchRemoteCredentials(
- sessionId,
- baseUrl,
- oauthToken,
- cfg.http_timeout_ms,
- ),
- 'fetchRemoteCredentials (recovery)',
- cfg,
- )
- if (!fresh || tornDown) {
- if (!tornDown) {
- onStateChange?.('failed', 'JWT refresh failed after 401')
- }
- return
- }
- // If 401 interrupted the initial flush, writeBatch may have silently
- // no-op'd on the closed uploader (ccr.close() ran in the SSE wrapper
- // before our setOnClose callback). Reset so the new onConnect re-flushes.
- // (v1 scopes initialFlushDone inside the per-transport closure at
- // replBridge.ts:1027 so it resets naturally; v2 has it at outer scope.)
- initialFlushDone = false
- await rebuildTransport(fresh, 'auth_401_recovery')
- logForDebugging('[remote-bridge] Transport rebuilt after 401')
- } catch (err) {
- logForDebugging(
- `[remote-bridge] 401 recovery failed: ${errorMessage(err)}`,
- { level: 'error' },
- )
- logForDiagnosticsNoPII('error', 'bridge_repl_v2_jwt_refresh_failed')
- if (!tornDown) {
- onStateChange?.('failed', `JWT refresh failed: ${errorMessage(err)}`)
- }
- } finally {
- authRecoveryInFlight = false
- }
- }
- wireTransportCallbacks()
- // Start flushGate BEFORE connect so writeMessages() during handshake
- // queues instead of racing the history POST.
- if (initialMessages && initialMessages.length > 0) {
- flushGate.start()
- }
- transport.connect()
- connectDeadline = setTimeout(
- onConnectTimeout,
- cfg.connect_timeout_ms,
- connectCause,
- )
- // ── 8. History flush + drain helpers ────────────────────────────────────
- function drainFlushGate(): void {
- const msgs = flushGate.end()
- if (msgs.length === 0) return
- for (const msg of msgs) recentPostedUUIDs.add(msg.uuid)
- const events = toSDKMessages(msgs).map(m => ({
- ...m,
- session_id: sessionId,
- }))
- if (msgs.some(m => m.type === 'user')) {
- transport.reportState('running')
- }
- logForDebugging(
- `[remote-bridge] Drained ${msgs.length} queued message(s) after flush`,
- )
- void transport.writeBatch(events)
- }
- async function flushHistory(msgs: Message[]): Promise<void> {
- // v2 always creates a fresh server session (unconditional createCodeSession
- // above) — no session reuse, no double-post risk. Unlike v1, we do NOT
- // filter by previouslyFlushedUUIDs: that set persists across REPL enable/
- // disable cycles (useRef), so it would wrongly suppress history on re-enable.
- const eligible = msgs.filter(isEligibleBridgeMessage)
- const capped =
- initialHistoryCap > 0 && eligible.length > initialHistoryCap
- ? eligible.slice(-initialHistoryCap)
- : eligible
- if (capped.length < eligible.length) {
- logForDebugging(
- `[remote-bridge] Capped initial flush: ${eligible.length} -> ${capped.length} (cap=${initialHistoryCap})`,
- )
- }
- const events = toSDKMessages(capped).map(m => ({
- ...m,
- session_id: sessionId,
- }))
- if (events.length === 0) return
- // Mid-turn init: if Remote Control is enabled while a query is running,
- // the last eligible message is a user prompt or tool_result (both 'user'
- // type). Without this the init PUT's 'idle' sticks until the next user-
- // type message forwards via writeMessages — which for a pure-text turn
- // is never (only assistant chunks stream post-init). Check eligible (pre-
- // cap), not capped: the cap may truncate to a user message even when the
- // actual trailing message is assistant.
- if (eligible.at(-1)?.type === 'user') {
- transport.reportState('running')
- }
- logForDebugging(`[remote-bridge] Flushing ${events.length} history events`)
- await transport.writeBatch(events)
- }
- // ── 9. Teardown ───────────────────────────────────────────────────────────
- // On SIGINT/SIGTERM//exit, gracefulShutdown races runCleanupFunctions()
- // against a 2s cap before forceExit kills the process. Budget accordingly:
- // - archive: teardown_archive_timeout_ms (default 1500, cap 2000)
- // - result write: fire-and-forget, archive latency covers the drain
- // - 401 retry: only if first archive 401s, shares the same budget
- async function teardown(): Promise<void> {
- if (tornDown) return
- tornDown = true
- refresh.cancelAll()
- clearTimeout(connectDeadline)
- flushGate.drop()
- // Fire the result message before archive — transport.write() only awaits
- // enqueue (SerialBatchEventUploader resolves once buffered, drain is
- // async). Archiving before close() gives the uploader's drain loop a
- // window (typical archive ≈ 100-500ms) to POST the result without an
- // explicit sleep. close() sets closed=true which interrupts drain at the
- // next while-check, so close-before-archive drops the result.
- transport.reportState('idle')
- void transport.write(makeResultMessage(sessionId))
- let token = getAccessToken()
- let status = await archiveSession(
- sessionId,
- baseUrl,
- token,
- orgUUID,
- cfg.teardown_archive_timeout_ms,
- )
- // Token is usually fresh (refresh scheduler runs 5min before expiry) but
- // laptop-wake past the refresh window leaves getAccessToken() returning a
- // stale string. Retry once on 401 — onAuth401 (= handleOAuth401Error)
- // clears keychain cache + force-refreshes. No proactive refresh on the
- // happy path: handleOAuth401Error force-refreshes even valid tokens,
- // which would waste budget 99% of the time. try/catch mirrors
- // recoverFromAuthFailure: keychain reads can throw (macOS locked after
- // wake); an uncaught throw here would skip transport.close + telemetry.
- if (status === 401 && onAuth401) {
- try {
- await onAuth401(token ?? '')
- token = getAccessToken()
- status = await archiveSession(
- sessionId,
- baseUrl,
- token,
- orgUUID,
- cfg.teardown_archive_timeout_ms,
- )
- } catch (err) {
- logForDebugging(
- `[remote-bridge] Teardown 401 retry threw: ${errorMessage(err)}`,
- { level: 'error' },
- )
- }
- }
- transport.close()
- const archiveStatus: ArchiveTelemetryStatus =
- status === 'no_token'
- ? 'skipped_no_token'
- : status === 'timeout' || status === 'error'
- ? 'network_error'
- : status >= 500
- ? 'server_5xx'
- : status >= 400
- ? 'server_4xx'
- : 'ok'
- logForDebugging(`[remote-bridge] Torn down (archive=${status})`)
- logForDiagnosticsNoPII('info', 'bridge_repl_v2_teardown')
- logEvent(
- feature('CCR_MIRROR') && outboundOnly
- ? 'tengu_ccr_mirror_teardown'
- : 'tengu_bridge_repl_teardown',
- {
- v2: true,
- archive_status:
- archiveStatus as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS,
- archive_ok: typeof status === 'number' && status < 400,
- archive_http_status: typeof status === 'number' ? status : undefined,
- archive_timeout: status === 'timeout',
- archive_no_token: status === 'no_token',
- },
- )
- }
- const unregister = registerCleanup(teardown)
- if (feature('CCR_MIRROR') && outboundOnly) {
- logEvent('tengu_ccr_mirror_started', {
- v2: true,
- expires_in_s: credentials.expires_in,
- })
- } else {
- logEvent('tengu_bridge_repl_started', {
- has_initial_messages: !!(initialMessages && initialMessages.length > 0),
- v2: true,
- expires_in_s: credentials.expires_in,
- inProtectedNamespace: isInProtectedNamespace(),
- })
- }
- // ── 10. Handle ──────────────────────────────────────────────────────────
- return {
- bridgeSessionId: sessionId,
- environmentId: '',
- sessionIngressUrl: credentials.api_base_url,
- writeMessages(messages) {
- const filtered = messages.filter(
- m =>
- isEligibleBridgeMessage(m) &&
- !initialMessageUUIDs.has(m.uuid) &&
- !recentPostedUUIDs.has(m.uuid),
- )
- if (filtered.length === 0) return
- // Fire onUserMessage for title derivation. Scan before the flushGate
- // check — prompts are title-worthy even if they queue. Keeps calling
- // on every title-worthy message until the callback returns true; the
- // caller owns the policy (derive at 1st and 3rd, skip if explicit).
- if (!userMessageCallbackDone) {
- for (const m of filtered) {
- const text = extractTitleText(m)
- if (text !== undefined && onUserMessage?.(text, sessionId)) {
- userMessageCallbackDone = true
- break
- }
- }
- }
- if (flushGate.enqueue(...filtered)) {
- logForDebugging(
- `[remote-bridge] Queued ${filtered.length} message(s) during flush`,
- )
- return
- }
- for (const msg of filtered) recentPostedUUIDs.add(msg.uuid)
- const events = toSDKMessages(filtered).map(m => ({
- ...m,
- session_id: sessionId,
- }))
- // v2 does not derive worker_status from events server-side (unlike v1
- // session-ingress session_status_updater.go). Push it from here so the
- // CCR web session list shows Running instead of stuck on Idle. A user
- // message in the batch marks turn start. CCRClient.reportState dedupes
- // consecutive same-state pushes.
- if (filtered.some(m => m.type === 'user')) {
- transport.reportState('running')
- }
- logForDebugging(`[remote-bridge] Sending ${filtered.length} message(s)`)
- void transport.writeBatch(events)
- },
- writeSdkMessages(messages: SDKMessage[]) {
- const filtered = messages.filter(
- m => !m.uuid || !recentPostedUUIDs.has(m.uuid),
- )
- if (filtered.length === 0) return
- for (const msg of filtered) {
- if (msg.uuid) recentPostedUUIDs.add(msg.uuid)
- }
- const events = filtered.map(m => ({ ...m, session_id: sessionId }))
- void transport.writeBatch(events)
- },
- sendControlRequest(request: SDKControlRequest) {
- if (authRecoveryInFlight) {
- logForDebugging(
- `[remote-bridge] Dropping control_request during 401 recovery: ${request.request_id}`,
- )
- return
- }
- const event = { ...request, session_id: sessionId }
- if (request.request.subtype === 'can_use_tool') {
- transport.reportState('requires_action')
- }
- void transport.write(event)
- logForDebugging(
- `[remote-bridge] Sent control_request request_id=${request.request_id}`,
- )
- },
- sendControlResponse(response: SDKControlResponse) {
- if (authRecoveryInFlight) {
- logForDebugging(
- '[remote-bridge] Dropping control_response during 401 recovery',
- )
- return
- }
- const event = { ...response, session_id: sessionId }
- transport.reportState('running')
- void transport.write(event)
- logForDebugging('[remote-bridge] Sent control_response')
- },
- sendControlCancelRequest(requestId: string) {
- if (authRecoveryInFlight) {
- logForDebugging(
- `[remote-bridge] Dropping control_cancel_request during 401 recovery: ${requestId}`,
- )
- return
- }
- const event = {
- type: 'control_cancel_request' as const,
- request_id: requestId,
- session_id: sessionId,
- }
- // Hook/classifier/channel/recheck resolved the permission locally —
- // interactiveHandler calls only cancelRequest (no sendResponse) on
- // those paths, so without this the server stays on requires_action.
- transport.reportState('running')
- void transport.write(event)
- logForDebugging(
- `[remote-bridge] Sent control_cancel_request request_id=${requestId}`,
- )
- },
- sendResult() {
- if (authRecoveryInFlight) {
- logForDebugging('[remote-bridge] Dropping result during 401 recovery')
- return
- }
- transport.reportState('idle')
- void transport.write(makeResultMessage(sessionId))
- logForDebugging(`[remote-bridge] Sent result`)
- },
- async teardown() {
- unregister()
- await teardown()
- },
- }
- }
- // ─── Session API (v2 /code/sessions, no env) ─────────────────────────────────
- /** Retry an async init call with exponential backoff + jitter. */
- async function withRetry<T>(
- fn: () => Promise<T | null>,
- label: string,
- cfg: EnvLessBridgeConfig,
- ): Promise<T | null> {
- const max = cfg.init_retry_max_attempts
- for (let attempt = 1; attempt <= max; attempt++) {
- const result = await fn()
- if (result !== null) return result
- if (attempt < max) {
- const base = cfg.init_retry_base_delay_ms * 2 ** (attempt - 1)
- const jitter =
- base * cfg.init_retry_jitter_fraction * (2 * Math.random() - 1)
- const delay = Math.min(base + jitter, cfg.init_retry_max_delay_ms)
- logForDebugging(
- `[remote-bridge] ${label} failed (attempt ${attempt}/${max}), retrying in ${Math.round(delay)}ms`,
- )
- await sleep(delay)
- }
- }
- return null
- }
- // Moved to codeSessionApi.ts so the SDK /bridge subpath can bundle them
- // without pulling in this file's heavy CLI tree (analytics, transport).
- export {
- createCodeSession,
- type RemoteCredentials,
- } from './codeSessionApi.js'
- import {
- createCodeSession,
- fetchRemoteCredentials as fetchRemoteCredentialsRaw,
- type RemoteCredentials,
- } from './codeSessionApi.js'
- import { getBridgeBaseUrlOverride } from './bridgeConfig.js'
- // CLI-side wrapper that applies the CLAUDE_BRIDGE_BASE_URL dev override and
- // injects the trusted-device token (both are env/GrowthBook reads that the
- // SDK-facing codeSessionApi.ts export must stay free of).
- export async function fetchRemoteCredentials(
- sessionId: string,
- baseUrl: string,
- accessToken: string,
- timeoutMs: number,
- ): Promise<RemoteCredentials | null> {
- const creds = await fetchRemoteCredentialsRaw(
- sessionId,
- baseUrl,
- accessToken,
- timeoutMs,
- getTrustedDeviceToken(),
- )
- if (!creds) return null
- return getBridgeBaseUrlOverride()
- ? { ...creds, api_base_url: baseUrl }
- : creds
- }
- type ArchiveStatus = number | 'timeout' | 'error' | 'no_token'
- // Single categorical for BQ `GROUP BY archive_status`. The booleans on
- // _teardown predate this and are redundant with it (except archive_timeout,
- // which distinguishes ECONNABORTED from other network errors — both map to
- // 'network_error' here since the dominant cause in a 1.5s window is timeout).
- type ArchiveTelemetryStatus =
- | 'ok'
- | 'skipped_no_token'
- | 'network_error'
- | 'server_4xx'
- | 'server_5xx'
- async function archiveSession(
- sessionId: string,
- baseUrl: string,
- accessToken: string | undefined,
- orgUUID: string,
- timeoutMs: number,
- ): Promise<ArchiveStatus> {
- if (!accessToken) return 'no_token'
- // Archive lives at the compat layer (/v1/sessions/*, not /v1/code/sessions).
- // compat.parseSessionID only accepts TagSession (session_*), so retag cse_*.
- // anthropic-beta + x-organization-uuid are required — without them the
- // compat gateway 404s before reaching the handler.
- //
- // Unlike bridgeMain.ts (which caches compatId in sessionCompatIds to keep
- // in-memory titledSessions/logger keys consistent across a mid-session
- // gate flip), this compatId is only a server URL path segment — no
- // in-memory state. Fresh compute matches whatever the server currently
- // validates: if the gate is OFF, the server has been updated to accept
- // cse_* and we correctly send it.
- const compatId = toCompatSessionId(sessionId)
- try {
- const response = await axios.post(
- `${baseUrl}/v1/sessions/${compatId}/archive`,
- {},
- {
- headers: {
- ...oauthHeaders(accessToken),
- 'anthropic-beta': 'ccr-byoc-2025-07-29',
- 'x-organization-uuid': orgUUID,
- },
- timeout: timeoutMs,
- validateStatus: () => true,
- },
- )
- logForDebugging(
- `[remote-bridge] Archive ${compatId} status=${response.status}`,
- )
- return response.status
- } catch (err) {
- const msg = errorMessage(err)
- logForDebugging(`[remote-bridge] Archive failed: ${msg}`)
- return axios.isAxiosError(err) && err.code === 'ECONNABORTED'
- ? 'timeout'
- : 'error'
- }
- }
|