remoteBridgeCore.ts 39 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008
  1. // biome-ignore-all assist/source/organizeImports: ANT-ONLY import markers must not be reordered
  2. /**
  3. * Env-less Remote Control bridge core.
  4. *
  5. * "Env-less" = no Environments API layer. Distinct from "CCR v2" (the
  6. * /worker/* transport protocol) — the env-based path (replBridge.ts) can also
  7. * use CCR v2 transport via CLAUDE_CODE_USE_CCR_V2. This file is about removing
  8. * the poll/dispatch layer, not about which transport protocol is underneath.
  9. *
  10. * Unlike initBridgeCore (env-based, ~2400 lines), this connects directly
  11. * to the session-ingress layer without the Environments API work-dispatch
  12. * layer:
  13. *
  14. * 1. POST /v1/code/sessions (OAuth, no env_id) → session.id
  15. * 2. POST /v1/code/sessions/{id}/bridge (OAuth) → {worker_jwt, expires_in, api_base_url, worker_epoch}
  16. * Each /bridge call bumps epoch — it IS the register. No separate /worker/register.
  17. * 3. createV2ReplTransport(worker_jwt, worker_epoch) → SSE + CCRClient
  18. * 4. createTokenRefreshScheduler → proactive /bridge re-call (new JWT + new epoch)
  19. * 5. 401 on SSE → rebuild transport with fresh /bridge credentials (same seq-num)
  20. *
  21. * No register/poll/ack/stop/heartbeat/deregister environment lifecycle.
  22. * The Environments API historically existed because CCR's /worker/*
  23. * endpoints required a session_id+role=worker JWT that only the work-dispatch
  24. * layer could mint. Server PR #292605 (renamed in #293280) adds the /bridge endpoint as a direct
  25. * OAuth→worker_jwt exchange, making the env layer optional for REPL sessions.
  26. *
  27. * Gated by `tengu_bridge_repl_v2` GrowthBook flag in initReplBridge.ts.
  28. * REPL-only — daemon/print stay on env-based.
  29. */
  30. import { feature } from 'bun:bundle'
  31. import axios from 'axios'
  32. import {
  33. createV2ReplTransport,
  34. type ReplBridgeTransport,
  35. } from './replBridgeTransport.js'
  36. import { buildCCRv2SdkUrl } from './workSecret.js'
  37. import { toCompatSessionId } from './sessionIdCompat.js'
  38. import { FlushGate } from './flushGate.js'
  39. import { createTokenRefreshScheduler } from './jwtUtils.js'
  40. import { getTrustedDeviceToken } from './trustedDevice.js'
  41. import {
  42. getEnvLessBridgeConfig,
  43. type EnvLessBridgeConfig,
  44. } from './envLessBridgeConfig.js'
  45. import {
  46. handleIngressMessage,
  47. handleServerControlRequest,
  48. makeResultMessage,
  49. isEligibleBridgeMessage,
  50. extractTitleText,
  51. BoundedUUIDSet,
  52. } from './bridgeMessaging.js'
  53. import { logBridgeSkip } from './debugUtils.js'
  54. import { logForDebugging } from '../utils/debug.js'
  55. import { logForDiagnosticsNoPII } from '../utils/diagLogs.js'
  56. import { isInProtectedNamespace } from '../utils/envUtils.js'
  57. import { errorMessage } from '../utils/errors.js'
  58. import { sleep } from '../utils/sleep.js'
  59. import { registerCleanup } from '../utils/cleanupRegistry.js'
  60. import {
  61. type AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS,
  62. logEvent,
  63. } from '../services/analytics/index.js'
  64. import type { ReplBridgeHandle, BridgeState } from './replBridge.js'
  65. import type { Message } from '../types/message.js'
  66. import type { SDKMessage } from '../entrypoints/agentSdkTypes.js'
  67. import type {
  68. SDKControlRequest,
  69. SDKControlResponse,
  70. } from '../entrypoints/sdk/controlTypes.js'
  71. import type { PermissionMode } from '../utils/permissions/PermissionMode.js'
  72. const ANTHROPIC_VERSION = '2023-06-01'
  73. // Telemetry discriminator for ws_connected. 'initial' is the default and
  74. // never passed to rebuildTransport (which can only be called post-init);
  75. // Exclude<> makes that constraint explicit at both signatures.
  76. type ConnectCause = 'initial' | 'proactive_refresh' | 'auth_401_recovery'
  77. function oauthHeaders(accessToken: string): Record<string, string> {
  78. return {
  79. Authorization: `Bearer ${accessToken}`,
  80. 'Content-Type': 'application/json',
  81. 'anthropic-version': ANTHROPIC_VERSION,
  82. }
  83. }
  84. export type EnvLessBridgeParams = {
  85. baseUrl: string
  86. orgUUID: string
  87. title: string
  88. getAccessToken: () => string | undefined
  89. onAuth401?: (staleAccessToken: string) => Promise<boolean>
  90. /**
  91. * Converts internal Message[] → SDKMessage[] for writeMessages() and the
  92. * initial-flush/drain paths. Injected rather than imported — mappers.ts
  93. * transitively pulls in src/commands.ts (entire command registry + React
  94. * tree) which would bloat bundles that don't already have it.
  95. */
  96. toSDKMessages: (messages: Message[]) => SDKMessage[]
  97. initialHistoryCap: number
  98. initialMessages?: Message[]
  99. onInboundMessage?: (msg: SDKMessage) => void | Promise<void>
  100. /**
  101. * Fired on each title-worthy user message seen in writeMessages() until
  102. * the callback returns true (done). Mirrors replBridge.ts's onUserMessage —
  103. * caller derives a title and PATCHes /v1/sessions/{id} so auto-started
  104. * sessions don't stay at the generic fallback. The caller owns the
  105. * derive-at-count-1-and-3 policy; the transport just keeps calling until
  106. * told to stop. sessionId is the raw cse_* — updateBridgeSessionTitle
  107. * retags internally.
  108. */
  109. onUserMessage?: (text: string, sessionId: string) => boolean
  110. onPermissionResponse?: (response: SDKControlResponse) => void
  111. onInterrupt?: () => void
  112. onSetModel?: (model: string | undefined) => void
  113. onSetMaxThinkingTokens?: (maxTokens: number | null) => void
  114. onSetPermissionMode?: (
  115. mode: PermissionMode,
  116. ) => { ok: true } | { ok: false; error: string }
  117. onStateChange?: (state: BridgeState, detail?: string) => void
  118. /**
  119. * When true, skip opening the SSE read stream — only the CCRClient write
  120. * path is activated. Threaded to createV2ReplTransport and
  121. * handleServerControlRequest.
  122. */
  123. outboundOnly?: boolean
  124. /** Free-form tags for session categorization (e.g. ['ccr-mirror']). */
  125. tags?: string[]
  126. }
  127. /**
  128. * Create a session, fetch a worker JWT, connect the v2 transport.
  129. *
  130. * Returns null on any pre-flight failure (session create failed, /bridge
  131. * failed, transport setup failed). Caller (initReplBridge) surfaces this
  132. * as a generic "initialization failed" state.
  133. */
  134. export async function initEnvLessBridgeCore(
  135. params: EnvLessBridgeParams,
  136. ): Promise<ReplBridgeHandle | null> {
  137. const {
  138. baseUrl,
  139. orgUUID,
  140. title,
  141. getAccessToken,
  142. onAuth401,
  143. toSDKMessages,
  144. initialHistoryCap,
  145. initialMessages,
  146. onInboundMessage,
  147. onUserMessage,
  148. onPermissionResponse,
  149. onInterrupt,
  150. onSetModel,
  151. onSetMaxThinkingTokens,
  152. onSetPermissionMode,
  153. onStateChange,
  154. outboundOnly,
  155. tags,
  156. } = params
  157. const cfg = await getEnvLessBridgeConfig()
  158. // ── 1. Create session (POST /v1/code/sessions, no env_id) ───────────────
  159. const accessToken = getAccessToken()
  160. if (!accessToken) {
  161. logForDebugging('[remote-bridge] No OAuth token')
  162. return null
  163. }
  164. const createdSessionId = await withRetry(
  165. () =>
  166. createCodeSession(baseUrl, accessToken, title, cfg.http_timeout_ms, tags),
  167. 'createCodeSession',
  168. cfg,
  169. )
  170. if (!createdSessionId) {
  171. onStateChange?.('failed', 'Session creation failed — see debug log')
  172. logBridgeSkip('v2_session_create_failed', undefined, true)
  173. return null
  174. }
  175. const sessionId: string = createdSessionId
  176. logForDebugging(`[remote-bridge] Created session ${sessionId}`)
  177. logForDiagnosticsNoPII('info', 'bridge_repl_v2_session_created')
  178. // ── 2. Fetch bridge credentials (POST /bridge → worker_jwt, expires_in, api_base_url) ──
  179. const credentials = await withRetry(
  180. () =>
  181. fetchRemoteCredentials(
  182. sessionId,
  183. baseUrl,
  184. accessToken,
  185. cfg.http_timeout_ms,
  186. ),
  187. 'fetchRemoteCredentials',
  188. cfg,
  189. )
  190. if (!credentials) {
  191. onStateChange?.('failed', 'Remote credentials fetch failed — see debug log')
  192. logBridgeSkip('v2_remote_creds_failed', undefined, true)
  193. void archiveSession(
  194. sessionId,
  195. baseUrl,
  196. accessToken,
  197. orgUUID,
  198. cfg.http_timeout_ms,
  199. )
  200. return null
  201. }
  202. logForDebugging(
  203. `[remote-bridge] Fetched bridge credentials (expires_in=${credentials.expires_in}s)`,
  204. )
  205. // ── 3. Build v2 transport (SSETransport + CCRClient) ────────────────────
  206. const sessionUrl = buildCCRv2SdkUrl(credentials.api_base_url, sessionId)
  207. logForDebugging(`[remote-bridge] v2 session URL: ${sessionUrl}`)
  208. let transport: ReplBridgeTransport
  209. try {
  210. transport = await createV2ReplTransport({
  211. sessionUrl,
  212. ingressToken: credentials.worker_jwt,
  213. sessionId,
  214. epoch: credentials.worker_epoch,
  215. heartbeatIntervalMs: cfg.heartbeat_interval_ms,
  216. heartbeatJitterFraction: cfg.heartbeat_jitter_fraction,
  217. // Per-instance closure — keeps the worker JWT out of
  218. // process.env.CLAUDE_CODE_SESSION_ACCESS_TOKEN, which mcp/client.ts
  219. // reads ungatedly and would otherwise send to user-configured ws/http
  220. // MCP servers. Frozen-at-construction is correct: transport is fully
  221. // rebuilt on refresh (rebuildTransport below).
  222. getAuthToken: () => credentials.worker_jwt,
  223. outboundOnly,
  224. })
  225. } catch (err) {
  226. logForDebugging(
  227. `[remote-bridge] v2 transport setup failed: ${errorMessage(err)}`,
  228. { level: 'error' },
  229. )
  230. onStateChange?.('failed', `Transport setup failed: ${errorMessage(err)}`)
  231. logBridgeSkip('v2_transport_setup_failed', undefined, true)
  232. void archiveSession(
  233. sessionId,
  234. baseUrl,
  235. accessToken,
  236. orgUUID,
  237. cfg.http_timeout_ms,
  238. )
  239. return null
  240. }
  241. logForDebugging(
  242. `[remote-bridge] v2 transport created (epoch=${credentials.worker_epoch})`,
  243. )
  244. onStateChange?.('ready')
  245. // ── 4. State ────────────────────────────────────────────────────────────
  246. // Echo dedup: messages we POST come back on the read stream. Seeded with
  247. // initial message UUIDs so server echoes of flushed history are recognized.
  248. // Both sets cover initial UUIDs — recentPostedUUIDs is a 2000-cap ring buffer
  249. // and could evict them after enough live writes; initialMessageUUIDs is the
  250. // unbounded fallback. Defense-in-depth; mirrors replBridge.ts.
  251. const recentPostedUUIDs = new BoundedUUIDSet(cfg.uuid_dedup_buffer_size)
  252. const initialMessageUUIDs = new Set<string>()
  253. if (initialMessages) {
  254. for (const msg of initialMessages) {
  255. initialMessageUUIDs.add(msg.uuid)
  256. recentPostedUUIDs.add(msg.uuid)
  257. }
  258. }
  259. // Defensive dedup for re-delivered inbound prompts (seq-num negotiation
  260. // edge cases, server history replay after transport swap).
  261. const recentInboundUUIDs = new BoundedUUIDSet(cfg.uuid_dedup_buffer_size)
  262. // FlushGate: queue live writes while the history flush POST is in flight,
  263. // so the server receives [history..., live...] in order.
  264. const flushGate = new FlushGate<Message>()
  265. let initialFlushDone = false
  266. let tornDown = false
  267. let authRecoveryInFlight = false
  268. // Latch for onUserMessage — flips true when the callback returns true
  269. // (policy says "done deriving"). sessionId is const (no re-create path —
  270. // rebuildTransport swaps JWT/epoch, same session), so no reset needed.
  271. let userMessageCallbackDone = !onUserMessage
  272. // Telemetry: why did onConnect fire? Set by rebuildTransport before
  273. // wireTransportCallbacks; read asynchronously by onConnect. Race-safe
  274. // because authRecoveryInFlight serializes rebuild callers, and a fresh
  275. // initEnvLessBridgeCore() call gets a fresh closure defaulting to 'initial'.
  276. let connectCause: ConnectCause = 'initial'
  277. // Deadline for onConnect after transport.connect(). Cleared by onConnect
  278. // (connected) and onClose (got a close — not silent). If neither fires
  279. // before cfg.connect_timeout_ms, onConnectTimeout emits — the only
  280. // signal for the `started → (silence)` gap.
  281. let connectDeadline: ReturnType<typeof setTimeout> | undefined
  282. function onConnectTimeout(cause: ConnectCause): void {
  283. if (tornDown) return
  284. logEvent('tengu_bridge_repl_connect_timeout', {
  285. v2: true,
  286. elapsed_ms: cfg.connect_timeout_ms,
  287. cause:
  288. cause as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS,
  289. })
  290. }
  291. // ── 5. JWT refresh scheduler ────────────────────────────────────────────
  292. // Schedule a callback 5min before expiry (per response.expires_in). On fire,
  293. // re-fetch /bridge with OAuth → rebuild transport with fresh credentials.
  294. // Each /bridge call bumps epoch server-side, so a JWT-only swap would leave
  295. // the old CCRClient heartbeating with a stale epoch → 409 within 20s.
  296. // JWT is opaque — do not decode.
  297. const refresh = createTokenRefreshScheduler({
  298. refreshBufferMs: cfg.token_refresh_buffer_ms,
  299. getAccessToken: async () => {
  300. // Unconditionally refresh OAuth before calling /bridge — getAccessToken()
  301. // returns expired tokens as non-null strings (doesn't check expiresAt),
  302. // so truthiness doesn't mean valid. Pass the stale token to onAuth401
  303. // so handleOAuth401Error's keychain-comparison can detect parallel refresh.
  304. const stale = getAccessToken()
  305. if (onAuth401) await onAuth401(stale ?? '')
  306. return getAccessToken() ?? stale
  307. },
  308. onRefresh: (sid, oauthToken) => {
  309. void (async () => {
  310. // Laptop wake: overdue proactive timer + SSE 401 fire ~simultaneously.
  311. // Claim the flag BEFORE the /bridge fetch so the other path skips
  312. // entirely — prevents double epoch bump (each /bridge call bumps; if
  313. // both fetch, the first rebuild gets a stale epoch and 409s).
  314. if (authRecoveryInFlight || tornDown) {
  315. logForDebugging(
  316. '[remote-bridge] Recovery already in flight, skipping proactive refresh',
  317. )
  318. return
  319. }
  320. authRecoveryInFlight = true
  321. try {
  322. const fresh = await withRetry(
  323. () =>
  324. fetchRemoteCredentials(
  325. sid,
  326. baseUrl,
  327. oauthToken,
  328. cfg.http_timeout_ms,
  329. ),
  330. 'fetchRemoteCredentials (proactive)',
  331. cfg,
  332. )
  333. if (!fresh || tornDown) return
  334. await rebuildTransport(fresh, 'proactive_refresh')
  335. logForDebugging(
  336. '[remote-bridge] Transport rebuilt (proactive refresh)',
  337. )
  338. } catch (err) {
  339. logForDebugging(
  340. `[remote-bridge] Proactive refresh rebuild failed: ${errorMessage(err)}`,
  341. { level: 'error' },
  342. )
  343. logForDiagnosticsNoPII(
  344. 'error',
  345. 'bridge_repl_v2_proactive_refresh_failed',
  346. )
  347. if (!tornDown) {
  348. onStateChange?.('failed', `Refresh failed: ${errorMessage(err)}`)
  349. }
  350. } finally {
  351. authRecoveryInFlight = false
  352. }
  353. })()
  354. },
  355. label: 'remote',
  356. })
  357. refresh.scheduleFromExpiresIn(sessionId, credentials.expires_in)
  358. // ── 6. Wire callbacks (extracted so transport-rebuild can re-wire) ──────
  359. function wireTransportCallbacks(): void {
  360. transport.setOnConnect(() => {
  361. clearTimeout(connectDeadline)
  362. logForDebugging('[remote-bridge] v2 transport connected')
  363. logForDiagnosticsNoPII('info', 'bridge_repl_v2_transport_connected')
  364. logEvent('tengu_bridge_repl_ws_connected', {
  365. v2: true,
  366. cause:
  367. connectCause as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS,
  368. })
  369. if (!initialFlushDone && initialMessages && initialMessages.length > 0) {
  370. initialFlushDone = true
  371. // Capture current transport — if 401/teardown happens mid-flush,
  372. // the stale .finally() must not drain the gate or signal connected.
  373. // (Same guard pattern as replBridge.ts:1119.)
  374. const flushTransport = transport
  375. void flushHistory(initialMessages)
  376. .catch(e =>
  377. logForDebugging(`[remote-bridge] flushHistory failed: ${e}`),
  378. )
  379. .finally(() => {
  380. // authRecoveryInFlight catches the v1-vs-v2 asymmetry: v1 nulls
  381. // transport synchronously in setOnClose (replBridge.ts:1175), so
  382. // transport !== flushTransport trips immediately. v2 doesn't null —
  383. // transport reassigned only at rebuildTransport:346, 3 awaits deep.
  384. // authRecoveryInFlight is set synchronously at rebuildTransport entry.
  385. if (
  386. transport !== flushTransport ||
  387. tornDown ||
  388. authRecoveryInFlight
  389. ) {
  390. return
  391. }
  392. drainFlushGate()
  393. onStateChange?.('connected')
  394. })
  395. } else if (!flushGate.active) {
  396. onStateChange?.('connected')
  397. }
  398. })
  399. transport.setOnData((data: string) => {
  400. handleIngressMessage(
  401. data,
  402. recentPostedUUIDs,
  403. recentInboundUUIDs,
  404. onInboundMessage,
  405. // Remote client answered the permission prompt — the turn resumes.
  406. // Without this the server stays on requires_action until the next
  407. // user message or turn-end result.
  408. onPermissionResponse
  409. ? res => {
  410. transport.reportState('running')
  411. onPermissionResponse(res)
  412. }
  413. : undefined,
  414. req =>
  415. handleServerControlRequest(req, {
  416. transport,
  417. sessionId,
  418. onInterrupt,
  419. onSetModel,
  420. onSetMaxThinkingTokens,
  421. onSetPermissionMode,
  422. outboundOnly,
  423. }),
  424. )
  425. })
  426. transport.setOnClose((code?: number) => {
  427. clearTimeout(connectDeadline)
  428. if (tornDown) return
  429. logForDebugging(`[remote-bridge] v2 transport closed (code=${code})`)
  430. logEvent('tengu_bridge_repl_ws_closed', { code, v2: true })
  431. // onClose fires only for TERMINAL failures: 401 (JWT invalid),
  432. // 4090 (CCR epoch mismatch), 4091 (CCR init failed), or SSE 10-min
  433. // reconnect budget exhausted. Transient disconnects are handled
  434. // transparently inside SSETransport. 401 we can recover from (fetch
  435. // fresh JWT, rebuild transport); all other codes are dead-ends.
  436. if (code === 401 && !authRecoveryInFlight) {
  437. void recoverFromAuthFailure()
  438. return
  439. }
  440. onStateChange?.('failed', `Transport closed (code ${code})`)
  441. })
  442. }
  443. // ── 7. Transport rebuild (shared by proactive refresh + 401 recovery) ──
  444. // Every /bridge call bumps epoch server-side. Both refresh paths must
  445. // rebuild the transport with the new epoch — a JWT-only swap leaves the
  446. // old CCRClient heartbeating stale epoch → 409. SSE resumes from the old
  447. // transport's high-water-mark seq-num so no server-side replay.
  448. // Caller MUST set authRecoveryInFlight = true before calling (synchronously,
  449. // before any await) and clear it in a finally. This function doesn't manage
  450. // the flag — moving it here would be too late to prevent a double /bridge
  451. // fetch, and each fetch bumps epoch.
  452. async function rebuildTransport(
  453. fresh: RemoteCredentials,
  454. cause: Exclude<ConnectCause, 'initial'>,
  455. ): Promise<void> {
  456. connectCause = cause
  457. // Queue writes during rebuild — once /bridge returns, the old transport's
  458. // epoch is stale and its next write/heartbeat 409s. Without this gate,
  459. // writeMessages adds UUIDs to recentPostedUUIDs then writeBatch silently
  460. // no-ops (closed uploader after 409) → permanent silent message loss.
  461. flushGate.start()
  462. try {
  463. const seq = transport.getLastSequenceNum()
  464. transport.close()
  465. transport = await createV2ReplTransport({
  466. sessionUrl: buildCCRv2SdkUrl(fresh.api_base_url, sessionId),
  467. ingressToken: fresh.worker_jwt,
  468. sessionId,
  469. epoch: fresh.worker_epoch,
  470. heartbeatIntervalMs: cfg.heartbeat_interval_ms,
  471. heartbeatJitterFraction: cfg.heartbeat_jitter_fraction,
  472. initialSequenceNum: seq,
  473. getAuthToken: () => fresh.worker_jwt,
  474. outboundOnly,
  475. })
  476. if (tornDown) {
  477. // Teardown fired during the async createV2ReplTransport window.
  478. // Don't wire/connect/schedule — we'd re-arm timers after cancelAll()
  479. // and fire onInboundMessage into a torn-down bridge.
  480. transport.close()
  481. return
  482. }
  483. wireTransportCallbacks()
  484. transport.connect()
  485. connectDeadline = setTimeout(
  486. onConnectTimeout,
  487. cfg.connect_timeout_ms,
  488. connectCause,
  489. )
  490. refresh.scheduleFromExpiresIn(sessionId, fresh.expires_in)
  491. // Drain queued writes into the new uploader. Runs before
  492. // ccr.initialize() resolves (transport.connect() is fire-and-forget),
  493. // but the uploader serializes behind the initial PUT /worker. If
  494. // init fails (4091), events drop — but only recentPostedUUIDs
  495. // (per-instance) is populated, so re-enabling the bridge re-flushes.
  496. drainFlushGate()
  497. } finally {
  498. // End the gate on failure paths too — drainFlushGate already ended
  499. // it on success. Queued messages are dropped (transport still dead).
  500. flushGate.drop()
  501. }
  502. }
  503. // ── 8. 401 recovery (OAuth refresh + rebuild) ───────────────────────────
  504. async function recoverFromAuthFailure(): Promise<void> {
  505. // setOnClose already guards `!authRecoveryInFlight` but that check and
  506. // this set must be atomic against onRefresh — claim synchronously before
  507. // any await. Laptop wake fires both paths ~simultaneously.
  508. if (authRecoveryInFlight) return
  509. authRecoveryInFlight = true
  510. onStateChange?.('reconnecting', 'JWT expired — refreshing')
  511. logForDebugging('[remote-bridge] 401 on SSE — attempting JWT refresh')
  512. try {
  513. // Unconditionally try OAuth refresh — getAccessToken() returns expired
  514. // tokens as non-null strings, so !oauthToken doesn't catch expiry.
  515. // Pass the stale token so handleOAuth401Error's keychain-comparison
  516. // can detect if another tab already refreshed.
  517. const stale = getAccessToken()
  518. if (onAuth401) await onAuth401(stale ?? '')
  519. const oauthToken = getAccessToken() ?? stale
  520. if (!oauthToken || tornDown) {
  521. if (!tornDown) {
  522. onStateChange?.('failed', 'JWT refresh failed: no OAuth token')
  523. }
  524. return
  525. }
  526. const fresh = await withRetry(
  527. () =>
  528. fetchRemoteCredentials(
  529. sessionId,
  530. baseUrl,
  531. oauthToken,
  532. cfg.http_timeout_ms,
  533. ),
  534. 'fetchRemoteCredentials (recovery)',
  535. cfg,
  536. )
  537. if (!fresh || tornDown) {
  538. if (!tornDown) {
  539. onStateChange?.('failed', 'JWT refresh failed after 401')
  540. }
  541. return
  542. }
  543. // If 401 interrupted the initial flush, writeBatch may have silently
  544. // no-op'd on the closed uploader (ccr.close() ran in the SSE wrapper
  545. // before our setOnClose callback). Reset so the new onConnect re-flushes.
  546. // (v1 scopes initialFlushDone inside the per-transport closure at
  547. // replBridge.ts:1027 so it resets naturally; v2 has it at outer scope.)
  548. initialFlushDone = false
  549. await rebuildTransport(fresh, 'auth_401_recovery')
  550. logForDebugging('[remote-bridge] Transport rebuilt after 401')
  551. } catch (err) {
  552. logForDebugging(
  553. `[remote-bridge] 401 recovery failed: ${errorMessage(err)}`,
  554. { level: 'error' },
  555. )
  556. logForDiagnosticsNoPII('error', 'bridge_repl_v2_jwt_refresh_failed')
  557. if (!tornDown) {
  558. onStateChange?.('failed', `JWT refresh failed: ${errorMessage(err)}`)
  559. }
  560. } finally {
  561. authRecoveryInFlight = false
  562. }
  563. }
  564. wireTransportCallbacks()
  565. // Start flushGate BEFORE connect so writeMessages() during handshake
  566. // queues instead of racing the history POST.
  567. if (initialMessages && initialMessages.length > 0) {
  568. flushGate.start()
  569. }
  570. transport.connect()
  571. connectDeadline = setTimeout(
  572. onConnectTimeout,
  573. cfg.connect_timeout_ms,
  574. connectCause,
  575. )
  576. // ── 8. History flush + drain helpers ────────────────────────────────────
  577. function drainFlushGate(): void {
  578. const msgs = flushGate.end()
  579. if (msgs.length === 0) return
  580. for (const msg of msgs) recentPostedUUIDs.add(msg.uuid)
  581. const events = toSDKMessages(msgs).map(m => ({
  582. ...m,
  583. session_id: sessionId,
  584. }))
  585. if (msgs.some(m => m.type === 'user')) {
  586. transport.reportState('running')
  587. }
  588. logForDebugging(
  589. `[remote-bridge] Drained ${msgs.length} queued message(s) after flush`,
  590. )
  591. void transport.writeBatch(events)
  592. }
  593. async function flushHistory(msgs: Message[]): Promise<void> {
  594. // v2 always creates a fresh server session (unconditional createCodeSession
  595. // above) — no session reuse, no double-post risk. Unlike v1, we do NOT
  596. // filter by previouslyFlushedUUIDs: that set persists across REPL enable/
  597. // disable cycles (useRef), so it would wrongly suppress history on re-enable.
  598. const eligible = msgs.filter(isEligibleBridgeMessage)
  599. const capped =
  600. initialHistoryCap > 0 && eligible.length > initialHistoryCap
  601. ? eligible.slice(-initialHistoryCap)
  602. : eligible
  603. if (capped.length < eligible.length) {
  604. logForDebugging(
  605. `[remote-bridge] Capped initial flush: ${eligible.length} -> ${capped.length} (cap=${initialHistoryCap})`,
  606. )
  607. }
  608. const events = toSDKMessages(capped).map(m => ({
  609. ...m,
  610. session_id: sessionId,
  611. }))
  612. if (events.length === 0) return
  613. // Mid-turn init: if Remote Control is enabled while a query is running,
  614. // the last eligible message is a user prompt or tool_result (both 'user'
  615. // type). Without this the init PUT's 'idle' sticks until the next user-
  616. // type message forwards via writeMessages — which for a pure-text turn
  617. // is never (only assistant chunks stream post-init). Check eligible (pre-
  618. // cap), not capped: the cap may truncate to a user message even when the
  619. // actual trailing message is assistant.
  620. if (eligible.at(-1)?.type === 'user') {
  621. transport.reportState('running')
  622. }
  623. logForDebugging(`[remote-bridge] Flushing ${events.length} history events`)
  624. await transport.writeBatch(events)
  625. }
  626. // ── 9. Teardown ───────────────────────────────────────────────────────────
  627. // On SIGINT/SIGTERM/⁠/exit, gracefulShutdown races runCleanupFunctions()
  628. // against a 2s cap before forceExit kills the process. Budget accordingly:
  629. // - archive: teardown_archive_timeout_ms (default 1500, cap 2000)
  630. // - result write: fire-and-forget, archive latency covers the drain
  631. // - 401 retry: only if first archive 401s, shares the same budget
  632. async function teardown(): Promise<void> {
  633. if (tornDown) return
  634. tornDown = true
  635. refresh.cancelAll()
  636. clearTimeout(connectDeadline)
  637. flushGate.drop()
  638. // Fire the result message before archive — transport.write() only awaits
  639. // enqueue (SerialBatchEventUploader resolves once buffered, drain is
  640. // async). Archiving before close() gives the uploader's drain loop a
  641. // window (typical archive ≈ 100-500ms) to POST the result without an
  642. // explicit sleep. close() sets closed=true which interrupts drain at the
  643. // next while-check, so close-before-archive drops the result.
  644. transport.reportState('idle')
  645. void transport.write(makeResultMessage(sessionId))
  646. let token = getAccessToken()
  647. let status = await archiveSession(
  648. sessionId,
  649. baseUrl,
  650. token,
  651. orgUUID,
  652. cfg.teardown_archive_timeout_ms,
  653. )
  654. // Token is usually fresh (refresh scheduler runs 5min before expiry) but
  655. // laptop-wake past the refresh window leaves getAccessToken() returning a
  656. // stale string. Retry once on 401 — onAuth401 (= handleOAuth401Error)
  657. // clears keychain cache + force-refreshes. No proactive refresh on the
  658. // happy path: handleOAuth401Error force-refreshes even valid tokens,
  659. // which would waste budget 99% of the time. try/catch mirrors
  660. // recoverFromAuthFailure: keychain reads can throw (macOS locked after
  661. // wake); an uncaught throw here would skip transport.close + telemetry.
  662. if (status === 401 && onAuth401) {
  663. try {
  664. await onAuth401(token ?? '')
  665. token = getAccessToken()
  666. status = await archiveSession(
  667. sessionId,
  668. baseUrl,
  669. token,
  670. orgUUID,
  671. cfg.teardown_archive_timeout_ms,
  672. )
  673. } catch (err) {
  674. logForDebugging(
  675. `[remote-bridge] Teardown 401 retry threw: ${errorMessage(err)}`,
  676. { level: 'error' },
  677. )
  678. }
  679. }
  680. transport.close()
  681. const archiveStatus: ArchiveTelemetryStatus =
  682. status === 'no_token'
  683. ? 'skipped_no_token'
  684. : status === 'timeout' || status === 'error'
  685. ? 'network_error'
  686. : status >= 500
  687. ? 'server_5xx'
  688. : status >= 400
  689. ? 'server_4xx'
  690. : 'ok'
  691. logForDebugging(`[remote-bridge] Torn down (archive=${status})`)
  692. logForDiagnosticsNoPII('info', 'bridge_repl_v2_teardown')
  693. logEvent(
  694. feature('CCR_MIRROR') && outboundOnly
  695. ? 'tengu_ccr_mirror_teardown'
  696. : 'tengu_bridge_repl_teardown',
  697. {
  698. v2: true,
  699. archive_status:
  700. archiveStatus as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS,
  701. archive_ok: typeof status === 'number' && status < 400,
  702. archive_http_status: typeof status === 'number' ? status : undefined,
  703. archive_timeout: status === 'timeout',
  704. archive_no_token: status === 'no_token',
  705. },
  706. )
  707. }
  708. const unregister = registerCleanup(teardown)
  709. if (feature('CCR_MIRROR') && outboundOnly) {
  710. logEvent('tengu_ccr_mirror_started', {
  711. v2: true,
  712. expires_in_s: credentials.expires_in,
  713. })
  714. } else {
  715. logEvent('tengu_bridge_repl_started', {
  716. has_initial_messages: !!(initialMessages && initialMessages.length > 0),
  717. v2: true,
  718. expires_in_s: credentials.expires_in,
  719. inProtectedNamespace: isInProtectedNamespace(),
  720. })
  721. }
  722. // ── 10. Handle ──────────────────────────────────────────────────────────
  723. return {
  724. bridgeSessionId: sessionId,
  725. environmentId: '',
  726. sessionIngressUrl: credentials.api_base_url,
  727. writeMessages(messages) {
  728. const filtered = messages.filter(
  729. m =>
  730. isEligibleBridgeMessage(m) &&
  731. !initialMessageUUIDs.has(m.uuid) &&
  732. !recentPostedUUIDs.has(m.uuid),
  733. )
  734. if (filtered.length === 0) return
  735. // Fire onUserMessage for title derivation. Scan before the flushGate
  736. // check — prompts are title-worthy even if they queue. Keeps calling
  737. // on every title-worthy message until the callback returns true; the
  738. // caller owns the policy (derive at 1st and 3rd, skip if explicit).
  739. if (!userMessageCallbackDone) {
  740. for (const m of filtered) {
  741. const text = extractTitleText(m)
  742. if (text !== undefined && onUserMessage?.(text, sessionId)) {
  743. userMessageCallbackDone = true
  744. break
  745. }
  746. }
  747. }
  748. if (flushGate.enqueue(...filtered)) {
  749. logForDebugging(
  750. `[remote-bridge] Queued ${filtered.length} message(s) during flush`,
  751. )
  752. return
  753. }
  754. for (const msg of filtered) recentPostedUUIDs.add(msg.uuid)
  755. const events = toSDKMessages(filtered).map(m => ({
  756. ...m,
  757. session_id: sessionId,
  758. }))
  759. // v2 does not derive worker_status from events server-side (unlike v1
  760. // session-ingress session_status_updater.go). Push it from here so the
  761. // CCR web session list shows Running instead of stuck on Idle. A user
  762. // message in the batch marks turn start. CCRClient.reportState dedupes
  763. // consecutive same-state pushes.
  764. if (filtered.some(m => m.type === 'user')) {
  765. transport.reportState('running')
  766. }
  767. logForDebugging(`[remote-bridge] Sending ${filtered.length} message(s)`)
  768. void transport.writeBatch(events)
  769. },
  770. writeSdkMessages(messages: SDKMessage[]) {
  771. const filtered = messages.filter(
  772. m => !m.uuid || !recentPostedUUIDs.has(m.uuid),
  773. )
  774. if (filtered.length === 0) return
  775. for (const msg of filtered) {
  776. if (msg.uuid) recentPostedUUIDs.add(msg.uuid)
  777. }
  778. const events = filtered.map(m => ({ ...m, session_id: sessionId }))
  779. void transport.writeBatch(events)
  780. },
  781. sendControlRequest(request: SDKControlRequest) {
  782. if (authRecoveryInFlight) {
  783. logForDebugging(
  784. `[remote-bridge] Dropping control_request during 401 recovery: ${request.request_id}`,
  785. )
  786. return
  787. }
  788. const event = { ...request, session_id: sessionId }
  789. if (request.request.subtype === 'can_use_tool') {
  790. transport.reportState('requires_action')
  791. }
  792. void transport.write(event)
  793. logForDebugging(
  794. `[remote-bridge] Sent control_request request_id=${request.request_id}`,
  795. )
  796. },
  797. sendControlResponse(response: SDKControlResponse) {
  798. if (authRecoveryInFlight) {
  799. logForDebugging(
  800. '[remote-bridge] Dropping control_response during 401 recovery',
  801. )
  802. return
  803. }
  804. const event = { ...response, session_id: sessionId }
  805. transport.reportState('running')
  806. void transport.write(event)
  807. logForDebugging('[remote-bridge] Sent control_response')
  808. },
  809. sendControlCancelRequest(requestId: string) {
  810. if (authRecoveryInFlight) {
  811. logForDebugging(
  812. `[remote-bridge] Dropping control_cancel_request during 401 recovery: ${requestId}`,
  813. )
  814. return
  815. }
  816. const event = {
  817. type: 'control_cancel_request' as const,
  818. request_id: requestId,
  819. session_id: sessionId,
  820. }
  821. // Hook/classifier/channel/recheck resolved the permission locally —
  822. // interactiveHandler calls only cancelRequest (no sendResponse) on
  823. // those paths, so without this the server stays on requires_action.
  824. transport.reportState('running')
  825. void transport.write(event)
  826. logForDebugging(
  827. `[remote-bridge] Sent control_cancel_request request_id=${requestId}`,
  828. )
  829. },
  830. sendResult() {
  831. if (authRecoveryInFlight) {
  832. logForDebugging('[remote-bridge] Dropping result during 401 recovery')
  833. return
  834. }
  835. transport.reportState('idle')
  836. void transport.write(makeResultMessage(sessionId))
  837. logForDebugging(`[remote-bridge] Sent result`)
  838. },
  839. async teardown() {
  840. unregister()
  841. await teardown()
  842. },
  843. }
  844. }
  845. // ─── Session API (v2 /code/sessions, no env) ─────────────────────────────────
  846. /** Retry an async init call with exponential backoff + jitter. */
  847. async function withRetry<T>(
  848. fn: () => Promise<T | null>,
  849. label: string,
  850. cfg: EnvLessBridgeConfig,
  851. ): Promise<T | null> {
  852. const max = cfg.init_retry_max_attempts
  853. for (let attempt = 1; attempt <= max; attempt++) {
  854. const result = await fn()
  855. if (result !== null) return result
  856. if (attempt < max) {
  857. const base = cfg.init_retry_base_delay_ms * 2 ** (attempt - 1)
  858. const jitter =
  859. base * cfg.init_retry_jitter_fraction * (2 * Math.random() - 1)
  860. const delay = Math.min(base + jitter, cfg.init_retry_max_delay_ms)
  861. logForDebugging(
  862. `[remote-bridge] ${label} failed (attempt ${attempt}/${max}), retrying in ${Math.round(delay)}ms`,
  863. )
  864. await sleep(delay)
  865. }
  866. }
  867. return null
  868. }
  869. // Moved to codeSessionApi.ts so the SDK /bridge subpath can bundle them
  870. // without pulling in this file's heavy CLI tree (analytics, transport).
  871. export {
  872. createCodeSession,
  873. type RemoteCredentials,
  874. } from './codeSessionApi.js'
  875. import {
  876. createCodeSession,
  877. fetchRemoteCredentials as fetchRemoteCredentialsRaw,
  878. type RemoteCredentials,
  879. } from './codeSessionApi.js'
  880. import { getBridgeBaseUrlOverride } from './bridgeConfig.js'
  881. // CLI-side wrapper that applies the CLAUDE_BRIDGE_BASE_URL dev override and
  882. // injects the trusted-device token (both are env/GrowthBook reads that the
  883. // SDK-facing codeSessionApi.ts export must stay free of).
  884. export async function fetchRemoteCredentials(
  885. sessionId: string,
  886. baseUrl: string,
  887. accessToken: string,
  888. timeoutMs: number,
  889. ): Promise<RemoteCredentials | null> {
  890. const creds = await fetchRemoteCredentialsRaw(
  891. sessionId,
  892. baseUrl,
  893. accessToken,
  894. timeoutMs,
  895. getTrustedDeviceToken(),
  896. )
  897. if (!creds) return null
  898. return getBridgeBaseUrlOverride()
  899. ? { ...creds, api_base_url: baseUrl }
  900. : creds
  901. }
  902. type ArchiveStatus = number | 'timeout' | 'error' | 'no_token'
  903. // Single categorical for BQ `GROUP BY archive_status`. The booleans on
  904. // _teardown predate this and are redundant with it (except archive_timeout,
  905. // which distinguishes ECONNABORTED from other network errors — both map to
  906. // 'network_error' here since the dominant cause in a 1.5s window is timeout).
  907. type ArchiveTelemetryStatus =
  908. | 'ok'
  909. | 'skipped_no_token'
  910. | 'network_error'
  911. | 'server_4xx'
  912. | 'server_5xx'
  913. async function archiveSession(
  914. sessionId: string,
  915. baseUrl: string,
  916. accessToken: string | undefined,
  917. orgUUID: string,
  918. timeoutMs: number,
  919. ): Promise<ArchiveStatus> {
  920. if (!accessToken) return 'no_token'
  921. // Archive lives at the compat layer (/v1/sessions/*, not /v1/code/sessions).
  922. // compat.parseSessionID only accepts TagSession (session_*), so retag cse_*.
  923. // anthropic-beta + x-organization-uuid are required — without them the
  924. // compat gateway 404s before reaching the handler.
  925. //
  926. // Unlike bridgeMain.ts (which caches compatId in sessionCompatIds to keep
  927. // in-memory titledSessions/logger keys consistent across a mid-session
  928. // gate flip), this compatId is only a server URL path segment — no
  929. // in-memory state. Fresh compute matches whatever the server currently
  930. // validates: if the gate is OFF, the server has been updated to accept
  931. // cse_* and we correctly send it.
  932. const compatId = toCompatSessionId(sessionId)
  933. try {
  934. const response = await axios.post(
  935. `${baseUrl}/v1/sessions/${compatId}/archive`,
  936. {},
  937. {
  938. headers: {
  939. ...oauthHeaders(accessToken),
  940. 'anthropic-beta': 'ccr-byoc-2025-07-29',
  941. 'x-organization-uuid': orgUUID,
  942. },
  943. timeout: timeoutMs,
  944. validateStatus: () => true,
  945. },
  946. )
  947. logForDebugging(
  948. `[remote-bridge] Archive ${compatId} status=${response.status}`,
  949. )
  950. return response.status
  951. } catch (err) {
  952. const msg = errorMessage(err)
  953. logForDebugging(`[remote-bridge] Archive failed: ${msg}`)
  954. return axios.isAxiosError(err) && err.code === 'ECONNABORTED'
  955. ? 'timeout'
  956. : 'error'
  957. }
  958. }