replBridge.ts 98 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968196919701971197219731974197519761977197819791980198119821983198419851986198719881989199019911992199319941995199619971998199920002001200220032004200520062007200820092010201120122013201420152016201720182019202020212022202320242025202620272028202920302031203220332034203520362037203820392040204120422043204420452046204720482049205020512052205320542055205620572058205920602061206220632064206520662067206820692070207120722073207420752076207720782079208020812082208320842085208620872088208920902091209220932094209520962097209820992100210121022103210421052106210721082109211021112112211321142115211621172118211921202121212221232124212521262127212821292130213121322133213421352136213721382139214021412142214321442145214621472148214921502151215221532154215521562157215821592160216121622163216421652166216721682169217021712172217321742175217621772178217921802181218221832184218521862187218821892190219121922193219421952196219721982199220022012202220322042205220622072208220922102211221222132214221522162217221822192220222122222223222422252226222722282229223022312232223322342235223622372238223922402241224222432244224522462247224822492250225122522253225422552256225722582259226022612262226322642265226622672268226922702271227222732274227522762277227822792280228122822283228422852286228722882289229022912292229322942295229622972298229923002301230223032304230523062307230823092310231123122313231423152316231723182319232023212322232323242325232623272328232923302331233223332334233523362337233823392340234123422343234423452346234723482349235023512352235323542355235623572358235923602361236223632364236523662367236823692370237123722373237423752376237723782379238023812382238323842385238623872388238923902391239223932394239523962397239823992400240124022403240424052406
  1. // biome-ignore-all assist/source/organizeImports: ANT-ONLY import markers must not be reordered
  2. import { randomUUID } from 'crypto'
  3. import {
  4. createBridgeApiClient,
  5. BridgeFatalError,
  6. isExpiredErrorType,
  7. isSuppressible403,
  8. } from './bridgeApi.js'
  9. import type { BridgeConfig, BridgeApiClient } from './types.js'
  10. import { logForDebugging } from '../utils/debug.js'
  11. import { logForDiagnosticsNoPII } from '../utils/diagLogs.js'
  12. import {
  13. type AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS,
  14. logEvent,
  15. } from '../services/analytics/index.js'
  16. import { registerCleanup } from '../utils/cleanupRegistry.js'
  17. import {
  18. handleIngressMessage,
  19. handleServerControlRequest,
  20. makeResultMessage,
  21. isEligibleBridgeMessage,
  22. extractTitleText,
  23. BoundedUUIDSet,
  24. } from './bridgeMessaging.js'
  25. import {
  26. decodeWorkSecret,
  27. buildSdkUrl,
  28. buildCCRv2SdkUrl,
  29. sameSessionId,
  30. } from './workSecret.js'
  31. import { toCompatSessionId, toInfraSessionId } from './sessionIdCompat.js'
  32. import { updateSessionBridgeId } from '../utils/concurrentSessions.js'
  33. import { getTrustedDeviceToken } from './trustedDevice.js'
  34. import { HybridTransport } from '../cli/transports/HybridTransport.js'
  35. import {
  36. type ReplBridgeTransport,
  37. createV1ReplTransport,
  38. createV2ReplTransport,
  39. } from './replBridgeTransport.js'
  40. import { updateSessionIngressAuthToken } from '../utils/sessionIngressAuth.js'
  41. import { isEnvTruthy, isInProtectedNamespace } from '../utils/envUtils.js'
  42. import { validateBridgeId } from './bridgeApi.js'
  43. import {
  44. describeAxiosError,
  45. extractHttpStatus,
  46. logBridgeSkip,
  47. } from './debugUtils.js'
  48. import type { Message } from '../types/message.js'
  49. import type { SDKMessage } from '../entrypoints/agentSdkTypes.js'
  50. import type { PermissionMode } from '../utils/permissions/PermissionMode.js'
  51. import type {
  52. SDKControlRequest,
  53. SDKControlResponse,
  54. } from '../entrypoints/sdk/controlTypes.js'
  55. import { createCapacityWake, type CapacitySignal } from './capacityWake.js'
  56. import { FlushGate } from './flushGate.js'
  57. import {
  58. DEFAULT_POLL_CONFIG,
  59. type PollIntervalConfig,
  60. } from './pollConfigDefaults.js'
  61. import { errorMessage } from '../utils/errors.js'
  62. import { sleep } from '../utils/sleep.js'
  63. import {
  64. wrapApiForFaultInjection,
  65. registerBridgeDebugHandle,
  66. clearBridgeDebugHandle,
  67. injectBridgeFault,
  68. } from './bridgeDebug.js'
  69. export type ReplBridgeHandle = {
  70. bridgeSessionId: string
  71. environmentId: string
  72. sessionIngressUrl: string
  73. writeMessages(messages: Message[]): void
  74. writeSdkMessages(messages: SDKMessage[]): void
  75. sendControlRequest(request: SDKControlRequest): void
  76. sendControlResponse(response: SDKControlResponse): void
  77. sendControlCancelRequest(requestId: string): void
  78. sendResult(): void
  79. teardown(): Promise<void>
  80. }
  81. export type BridgeState = 'ready' | 'connected' | 'reconnecting' | 'failed'
  82. /**
  83. * Explicit-param input to initBridgeCore. Everything initReplBridge reads
  84. * from bootstrap state (cwd, session ID, git, OAuth) becomes a field here.
  85. * A daemon caller (Agent SDK, PR 4) that never runs main.tsx fills these
  86. * in itself.
  87. */
  88. export type BridgeCoreParams = {
  89. dir: string
  90. machineName: string
  91. branch: string
  92. gitRepoUrl: string | null
  93. title: string
  94. baseUrl: string
  95. sessionIngressUrl: string
  96. /**
  97. * Opaque string sent as metadata.worker_type. Use BridgeWorkerType for
  98. * the two CLI-originated values; daemon callers may send any string the
  99. * backend recognizes (it's just a filter key on the web side).
  100. */
  101. workerType: string
  102. getAccessToken: () => string | undefined
  103. /**
  104. * POST /v1/sessions. Injected because `createSession.ts` lazy-loads
  105. * `auth.ts`/`model.ts`/`oauth/client.ts` and `bun --outfile` inlines
  106. * dynamic imports — the lazy-load doesn't help, the whole REPL tree ends
  107. * up in the Agent SDK bundle.
  108. *
  109. * REPL wrapper passes `createBridgeSession` from `createSession.ts`.
  110. * Daemon wrapper passes `createBridgeSessionLean` from `sessionApi.ts`
  111. * (HTTP-only, orgUUID+model supplied by the daemon caller).
  112. *
  113. * Receives `gitRepoUrl`+`branch` so the REPL wrapper can build the git
  114. * source/outcome for claude.ai's session card. Daemon ignores them.
  115. */
  116. createSession: (opts: {
  117. environmentId: string
  118. title: string
  119. gitRepoUrl: string | null
  120. branch: string
  121. signal: AbortSignal
  122. }) => Promise<string | null>
  123. /**
  124. * POST /v1/sessions/{id}/archive. Same injection rationale. Best-effort;
  125. * the callback MUST NOT throw.
  126. */
  127. archiveSession: (sessionId: string) => Promise<void>
  128. /**
  129. * Invoked on reconnect-after-env-lost to refresh the title. REPL wrapper
  130. * reads session storage (picks up /rename); daemon returns the static
  131. * title. Defaults to () => title.
  132. */
  133. getCurrentTitle?: () => string
  134. /**
  135. * Converts internal Message[] → SDKMessage[] for writeMessages() and the
  136. * initial-flush/drain paths. REPL wrapper passes the real toSDKMessages
  137. * from utils/messages/mappers.ts. Daemon callers that only use
  138. * writeSdkMessages() and pass no initialMessages can omit this — those
  139. * code paths are unreachable.
  140. *
  141. * Injected rather than imported because mappers.ts transitively pulls in
  142. * src/commands.ts via messages.ts → api.ts → prompts.ts, dragging the
  143. * entire command registry + React tree into the Agent SDK bundle.
  144. */
  145. toSDKMessages?: (messages: Message[]) => SDKMessage[]
  146. /**
  147. * OAuth 401 refresh handler passed to createBridgeApiClient. REPL wrapper
  148. * passes handleOAuth401Error; daemon passes its AuthManager's handler.
  149. * Injected because utils/auth.ts transitively pulls in the command
  150. * registry via config.ts → file.ts → permissions/filesystem.ts →
  151. * sessionStorage.ts → commands.ts.
  152. */
  153. onAuth401?: (staleAccessToken: string) => Promise<boolean>
  154. /**
  155. * Poll interval config getter for the work-poll heartbeat loop. REPL
  156. * wrapper passes the GrowthBook-backed getPollIntervalConfig (allows ops
  157. * to live-tune poll rates fleet-wide). Daemon passes a static config
  158. * with a 60s heartbeat (5× headroom under the 300s work-lease TTL).
  159. * Injected because growthbook.ts transitively pulls in the command
  160. * registry via the same config.ts chain.
  161. */
  162. getPollIntervalConfig?: () => PollIntervalConfig
  163. /**
  164. * Max initial messages to replay on connect. REPL wrapper reads from the
  165. * tengu_bridge_initial_history_cap GrowthBook flag. Daemon passes no
  166. * initialMessages so this is never read. Default 200 matches the flag
  167. * default.
  168. */
  169. initialHistoryCap?: number
  170. // Same REPL-flush machinery as InitBridgeOptions — daemon omits these.
  171. initialMessages?: Message[]
  172. previouslyFlushedUUIDs?: Set<string>
  173. onInboundMessage?: (msg: SDKMessage) => void
  174. onPermissionResponse?: (response: SDKControlResponse) => void
  175. onInterrupt?: () => void
  176. onSetModel?: (model: string | undefined) => void
  177. onSetMaxThinkingTokens?: (maxTokens: number | null) => void
  178. /**
  179. * Returns a policy verdict so this module can emit an error control_response
  180. * without importing the policy checks itself (bootstrap-isolation constraint).
  181. * The callback must guard `auto` (isAutoModeGateEnabled) and
  182. * `bypassPermissions` (isBypassPermissionsModeDisabled AND
  183. * isBypassPermissionsModeAvailable) BEFORE calling transitionPermissionMode —
  184. * that function's internal auto-gate check is a defensive throw, not a
  185. * graceful guard, and its side-effect order is setAutoModeActive(true) then
  186. * throw, which corrupts the 3-way invariant documented in src/CLAUDE.md if
  187. * the callback lets the throw escape here.
  188. */
  189. onSetPermissionMode?: (
  190. mode: PermissionMode,
  191. ) => { ok: true } | { ok: false; error: string }
  192. onStateChange?: (state: BridgeState, detail?: string) => void
  193. /**
  194. * Fires on each real user message to flow through writeMessages() until
  195. * the callback returns true (done). Mirrors remoteBridgeCore.ts's
  196. * onUserMessage so the REPL bridge can derive a session title from early
  197. * prompts when none was set at init time (e.g. user runs /remote-control
  198. * on an empty conversation, then types). Tool-result wrappers, meta
  199. * messages, and display-tag-only messages are skipped. Receives
  200. * currentSessionId so the wrapper can PATCH the title without a closure
  201. * dance to reach the not-yet-returned handle. The caller owns the
  202. * derive-at-count-1-and-3 policy; the transport just keeps calling until
  203. * told to stop. Not fired for the writeSdkMessages daemon path (daemon
  204. * sets its own title at init). Distinct from SessionSpawnOpts's
  205. * onFirstUserMessage (spawn-bridge, PR #21250), which stays fire-once.
  206. */
  207. onUserMessage?: (text: string, sessionId: string) => boolean
  208. /** See InitBridgeOptions.perpetual. */
  209. perpetual?: boolean
  210. /**
  211. * Seeds lastTransportSequenceNum — the SSE event-stream high-water mark
  212. * that's carried across transport swaps within one process. Daemon callers
  213. * pass the value they persisted at shutdown so the FIRST SSE connect of a
  214. * fresh process sends from_sequence_num and the server doesn't replay full
  215. * history. REPL callers omit (fresh session each run → 0 is correct).
  216. */
  217. initialSSESequenceNum?: number
  218. }
  219. /**
  220. * Superset of ReplBridgeHandle. Adds getSSESequenceNum for daemon callers
  221. * that persist the SSE seq-num across process restarts and pass it back as
  222. * initialSSESequenceNum on the next start.
  223. */
  224. export type BridgeCoreHandle = ReplBridgeHandle & {
  225. /**
  226. * Current SSE sequence-number high-water mark. Updates as transports
  227. * swap. Daemon callers persist this on shutdown and pass it back as
  228. * initialSSESequenceNum on next start.
  229. */
  230. getSSESequenceNum(): number
  231. }
  232. /**
  233. * Poll error recovery constants. When the work poll starts failing (e.g.
  234. * server 500s), we use exponential backoff and give up after this timeout.
  235. * This is deliberately long — the server is the authority on when a session
  236. * is truly dead. As long as the server accepts our poll, we keep waiting
  237. * for it to re-dispatch the work item.
  238. */
  239. const POLL_ERROR_INITIAL_DELAY_MS = 2_000
  240. const POLL_ERROR_MAX_DELAY_MS = 60_000
  241. const POLL_ERROR_GIVE_UP_MS = 15 * 60 * 1000
  242. // Monotonically increasing counter for distinguishing init calls in logs
  243. let initSequence = 0
  244. /**
  245. * Bootstrap-free core: env registration → session creation → poll loop →
  246. * ingress WS → teardown. Reads nothing from bootstrap/state or
  247. * sessionStorage — all context comes from params. Caller (initReplBridge
  248. * below, or a daemon in PR 4) has already passed entitlement gates and
  249. * gathered git/auth/title.
  250. *
  251. * Returns null on registration or session-creation failure.
  252. */
  253. export async function initBridgeCore(
  254. params: BridgeCoreParams,
  255. ): Promise<BridgeCoreHandle | null> {
  256. const {
  257. dir,
  258. machineName,
  259. branch,
  260. gitRepoUrl,
  261. title,
  262. baseUrl,
  263. sessionIngressUrl,
  264. workerType,
  265. getAccessToken,
  266. createSession,
  267. archiveSession,
  268. getCurrentTitle = () => title,
  269. toSDKMessages = () => {
  270. throw new Error(
  271. 'BridgeCoreParams.toSDKMessages not provided. Pass it if you use writeMessages() or initialMessages — daemon callers that only use writeSdkMessages() never hit this path.',
  272. )
  273. },
  274. onAuth401,
  275. getPollIntervalConfig = () => DEFAULT_POLL_CONFIG,
  276. initialHistoryCap = 200,
  277. initialMessages,
  278. previouslyFlushedUUIDs,
  279. onInboundMessage,
  280. onPermissionResponse,
  281. onInterrupt,
  282. onSetModel,
  283. onSetMaxThinkingTokens,
  284. onSetPermissionMode,
  285. onStateChange,
  286. onUserMessage,
  287. perpetual,
  288. initialSSESequenceNum = 0,
  289. } = params
  290. const seq = ++initSequence
  291. // bridgePointer import hoisted: perpetual mode reads it before register;
  292. // non-perpetual writes it after session create; both use clear at teardown.
  293. const { writeBridgePointer, clearBridgePointer, readBridgePointer } =
  294. await import('./bridgePointer.js')
  295. // Perpetual mode: read the crash-recovery pointer and treat it as prior
  296. // state. The pointer is written unconditionally after session create
  297. // (crash-recovery for all sessions); perpetual mode just skips the
  298. // teardown clear so it survives clean exits too. Only reuse 'repl'
  299. // pointers — a crashed standalone bridge (`claude remote-control`)
  300. // writes source:'standalone' with a different workerType.
  301. const rawPrior = perpetual ? await readBridgePointer(dir) : null
  302. const prior = rawPrior?.source === 'repl' ? rawPrior : null
  303. logForDebugging(
  304. `[bridge:repl] initBridgeCore #${seq} starting (initialMessages=${initialMessages?.length ?? 0}${prior ? ` perpetual prior=env:${prior.environmentId}` : ''})`,
  305. )
  306. // 5. Register bridge environment
  307. const rawApi = createBridgeApiClient({
  308. baseUrl,
  309. getAccessToken,
  310. runnerVersion: MACRO.VERSION,
  311. onDebug: logForDebugging,
  312. onAuth401,
  313. getTrustedDeviceToken,
  314. })
  315. // Ant-only: interpose so /bridge-kick can inject poll/register/heartbeat
  316. // failures. Zero cost in external builds (rawApi passes through unchanged).
  317. const api =
  318. process.env.USER_TYPE === 'ant' ? wrapApiForFaultInjection(rawApi) : rawApi
  319. const bridgeConfig: BridgeConfig = {
  320. dir,
  321. machineName,
  322. branch,
  323. gitRepoUrl,
  324. maxSessions: 1,
  325. spawnMode: 'single-session',
  326. verbose: false,
  327. sandbox: false,
  328. bridgeId: randomUUID(),
  329. workerType,
  330. environmentId: randomUUID(),
  331. reuseEnvironmentId: prior?.environmentId,
  332. apiBaseUrl: baseUrl,
  333. sessionIngressUrl,
  334. }
  335. let environmentId: string
  336. let environmentSecret: string
  337. try {
  338. const reg = await api.registerBridgeEnvironment(bridgeConfig)
  339. environmentId = reg.environment_id
  340. environmentSecret = reg.environment_secret
  341. } catch (err) {
  342. logBridgeSkip(
  343. 'registration_failed',
  344. `[bridge:repl] Environment registration failed: ${errorMessage(err)}`,
  345. )
  346. // Stale pointer may be the cause (expired/deleted env) — clear it so
  347. // the next start doesn't retry the same dead ID.
  348. if (prior) {
  349. await clearBridgePointer(dir)
  350. }
  351. onStateChange?.('failed', errorMessage(err))
  352. return null
  353. }
  354. logForDebugging(`[bridge:repl] Environment registered: ${environmentId}`)
  355. logForDiagnosticsNoPII('info', 'bridge_repl_env_registered')
  356. logEvent('tengu_bridge_repl_env_registered', {})
  357. /**
  358. * Reconnect-in-place: if the just-registered environmentId matches what
  359. * was requested, call reconnectSession to force-stop stale workers and
  360. * re-queue the session. Used at init (perpetual mode — env is alive but
  361. * idle after clean teardown) and in doReconnect() Strategy 1 (env lost
  362. * then resurrected). Returns true on success; caller falls back to
  363. * fresh session creation on false.
  364. */
  365. async function tryReconnectInPlace(
  366. requestedEnvId: string,
  367. sessionId: string,
  368. ): Promise<boolean> {
  369. if (environmentId !== requestedEnvId) {
  370. logForDebugging(
  371. `[bridge:repl] Env mismatch (requested ${requestedEnvId}, got ${environmentId}) — cannot reconnect in place`,
  372. )
  373. return false
  374. }
  375. // The pointer stores what createBridgeSession returned (session_*,
  376. // compat/convert.go:41). /bridge/reconnect is an environments-layer
  377. // endpoint — once the server's ccr_v2_compat_enabled gate is on it
  378. // looks sessions up by their infra tag (cse_*) and returns "Session
  379. // not found" for the session_* costume. We don't know the gate state
  380. // pre-poll, so try both; the re-tag is a no-op if the ID is already
  381. // cse_* (doReconnect Strategy 1 path — currentSessionId never mutates
  382. // to cse_* but future-proof the check).
  383. const infraId = toInfraSessionId(sessionId)
  384. const candidates =
  385. infraId === sessionId ? [sessionId] : [sessionId, infraId]
  386. for (const id of candidates) {
  387. try {
  388. await api.reconnectSession(environmentId, id)
  389. logForDebugging(
  390. `[bridge:repl] Reconnected session ${id} in place on env ${environmentId}`,
  391. )
  392. return true
  393. } catch (err) {
  394. logForDebugging(
  395. `[bridge:repl] reconnectSession(${id}) failed: ${errorMessage(err)}`,
  396. )
  397. }
  398. }
  399. logForDebugging(
  400. '[bridge:repl] reconnectSession exhausted — falling through to fresh session',
  401. )
  402. return false
  403. }
  404. // Perpetual init: env is alive but has no queued work after clean
  405. // teardown. reconnectSession re-queues it. doReconnect() has the same
  406. // call but only fires on poll 404 (env dead);
  407. // here the env is alive but idle.
  408. const reusedPriorSession = prior
  409. ? await tryReconnectInPlace(prior.environmentId, prior.sessionId)
  410. : false
  411. if (prior && !reusedPriorSession) {
  412. await clearBridgePointer(dir)
  413. }
  414. // 6. Create session on the bridge. Initial messages are NOT included as
  415. // session creation events because those use STREAM_ONLY persistence and
  416. // are published before the CCR UI subscribes, so they get lost. Instead,
  417. // initial messages are flushed via the ingress WebSocket once it connects.
  418. // Mutable session ID — updated when the environment+session pair is
  419. // re-created after a connection loss.
  420. let currentSessionId: string
  421. if (reusedPriorSession && prior) {
  422. currentSessionId = prior.sessionId
  423. logForDebugging(
  424. `[bridge:repl] Perpetual session reused: ${currentSessionId}`,
  425. )
  426. // Server already has all initialMessages from the prior CLI run. Mark
  427. // them as previously-flushed so the initial flush filter excludes them
  428. // (previouslyFlushedUUIDs is a fresh Set on every CLI start). Duplicate
  429. // UUIDs cause the server to kill the WebSocket.
  430. if (initialMessages && previouslyFlushedUUIDs) {
  431. for (const msg of initialMessages) {
  432. previouslyFlushedUUIDs.add(msg.uuid)
  433. }
  434. }
  435. } else {
  436. const createdSessionId = await createSession({
  437. environmentId,
  438. title,
  439. gitRepoUrl,
  440. branch,
  441. signal: AbortSignal.timeout(15_000),
  442. })
  443. if (!createdSessionId) {
  444. logForDebugging(
  445. '[bridge:repl] Session creation failed, deregistering environment',
  446. )
  447. logEvent('tengu_bridge_repl_session_failed', {})
  448. await api.deregisterEnvironment(environmentId).catch(() => {})
  449. onStateChange?.('failed', 'Session creation failed')
  450. return null
  451. }
  452. currentSessionId = createdSessionId
  453. logForDebugging(`[bridge:repl] Session created: ${currentSessionId}`)
  454. }
  455. // Crash-recovery pointer: written now so a kill -9 at any point after
  456. // this leaves a recoverable trail. Cleared in teardown (non-perpetual)
  457. // or left alone (perpetual mode — pointer survives clean exit too).
  458. // `claude remote-control --continue` from the same directory will detect
  459. // it and offer to resume.
  460. await writeBridgePointer(dir, {
  461. sessionId: currentSessionId,
  462. environmentId,
  463. source: 'repl',
  464. })
  465. logForDiagnosticsNoPII('info', 'bridge_repl_session_created')
  466. logEvent('tengu_bridge_repl_started', {
  467. has_initial_messages: !!(initialMessages && initialMessages.length > 0),
  468. inProtectedNamespace: isInProtectedNamespace(),
  469. })
  470. // UUIDs of initial messages. Used for dedup in writeMessages to avoid
  471. // re-sending messages that were already flushed on WebSocket open.
  472. const initialMessageUUIDs = new Set<string>()
  473. if (initialMessages) {
  474. for (const msg of initialMessages) {
  475. initialMessageUUIDs.add(msg.uuid)
  476. }
  477. }
  478. // Bounded ring buffer of UUIDs for messages we've already sent to the
  479. // server via the ingress WebSocket. Serves two purposes:
  480. // 1. Echo filtering — ignore our own messages bouncing back on the WS.
  481. // 2. Secondary dedup in writeMessages — catch race conditions where
  482. // the hook's index-based tracking isn't sufficient.
  483. //
  484. // Seeded with initialMessageUUIDs so that when the server echoes back
  485. // the initial conversation context over the ingress WebSocket, those
  486. // messages are recognized as echoes and not re-injected into the REPL.
  487. //
  488. // Capacity of 2000 covers well over any realistic echo window (echoes
  489. // arrive within milliseconds) and any messages that might be re-encountered
  490. // after compaction. The hook's lastWrittenIndexRef is the primary dedup;
  491. // this is a safety net.
  492. const recentPostedUUIDs = new BoundedUUIDSet(2000)
  493. for (const uuid of initialMessageUUIDs) {
  494. recentPostedUUIDs.add(uuid)
  495. }
  496. // Bounded set of INBOUND prompt UUIDs we've already forwarded to the REPL.
  497. // Defensive dedup for when the server re-delivers prompts (seq-num
  498. // negotiation failure, server edge cases, transport swap races). The
  499. // seq-num carryover below is the primary fix; this is the safety net.
  500. const recentInboundUUIDs = new BoundedUUIDSet(2000)
  501. // 7. Start poll loop for work items — this is what makes the session
  502. // "live" on claude.ai. When a user types there, the backend dispatches
  503. // a work item to our environment. We poll for it, get the ingress token,
  504. // and connect the ingress WebSocket.
  505. //
  506. // The poll loop keeps running: when work arrives it connects the ingress
  507. // WebSocket, and if the WebSocket drops unexpectedly (code != 1000) it
  508. // resumes polling to get a fresh ingress token and reconnect.
  509. const pollController = new AbortController()
  510. // Adapter over either HybridTransport (v1: WS reads + POST writes to
  511. // Session-Ingress) or SSETransport+CCRClient (v2: SSE reads + POST
  512. // writes to CCR /worker/*). The v1/v2 choice is made in onWorkReceived:
  513. // server-driven via secret.use_code_sessions, with CLAUDE_BRIDGE_USE_CCR_V2
  514. // as an ant-dev override.
  515. let transport: ReplBridgeTransport | null = null
  516. // Bumped on every onWorkReceived. Captured in createV2ReplTransport's .then()
  517. // closure to detect stale resolutions: if two calls race while transport is
  518. // null, both registerWorker() (bumping server epoch), and whichever resolves
  519. // SECOND is the correct one — but the transport !== null check gets this
  520. // backwards (first-to-resolve installs, second discards). The generation
  521. // counter catches it independent of transport state.
  522. let v2Generation = 0
  523. // SSE sequence-number high-water mark carried across transport swaps.
  524. // Without this, each new SSETransport starts at 0, sends no
  525. // from_sequence_num / Last-Event-ID on its first connect, and the server
  526. // replays the entire session event history — every prompt ever sent
  527. // re-delivered as fresh inbound messages on every onWorkReceived.
  528. //
  529. // Seed only when we actually reconnected the prior session. If
  530. // `reusedPriorSession` is false we fell through to `createSession()` —
  531. // the caller's persisted seq-num belongs to a dead session and applying
  532. // it to the fresh stream (starting at 1) silently drops events. Same
  533. // hazard as doReconnect Strategy 2; same fix as the reset there.
  534. let lastTransportSequenceNum = reusedPriorSession ? initialSSESequenceNum : 0
  535. // Track the current work ID so teardown can call stopWork
  536. let currentWorkId: string | null = null
  537. // Session ingress JWT for the current work item — used for heartbeat auth.
  538. let currentIngressToken: string | null = null
  539. // Signal to wake the at-capacity sleep early when the transport is lost,
  540. // so the poll loop immediately switches back to fast polling for new work.
  541. const capacityWake = createCapacityWake(pollController.signal)
  542. const wakePollLoop = capacityWake.wake
  543. const capacitySignal = capacityWake.signal
  544. // Gates message writes during the initial flush to prevent ordering
  545. // races where new messages arrive at the server interleaved with history.
  546. const flushGate = new FlushGate<Message>()
  547. // Latch for onUserMessage — flips true when the callback returns true
  548. // (policy says "done deriving"). If no callback, skip scanning entirely
  549. // (daemon path — no title derivation needed).
  550. let userMessageCallbackDone = !onUserMessage
  551. // Shared counter for environment re-creations, used by both
  552. // onEnvironmentLost and the abnormal-close handler.
  553. const MAX_ENVIRONMENT_RECREATIONS = 3
  554. let environmentRecreations = 0
  555. let reconnectPromise: Promise<boolean> | null = null
  556. /**
  557. * Recover from onEnvironmentLost (poll returned 404 — env was reaped
  558. * server-side). Tries two strategies in order:
  559. *
  560. * 1. Reconnect-in-place: idempotent re-register with reuseEnvironmentId
  561. * → if the backend returns the same env ID, call reconnectSession()
  562. * to re-queue the existing session. currentSessionId stays the same;
  563. * the URL on the user's phone stays valid; previouslyFlushedUUIDs is
  564. * preserved so history isn't re-sent.
  565. *
  566. * 2. Fresh session fallback: if the backend returns a different env ID
  567. * (original TTL-expired, e.g. laptop slept >4h) or reconnectSession()
  568. * throws, archive the old session and create a new one on the
  569. * now-registered env. Old behavior before #20460 primitives landed.
  570. *
  571. * Uses a promise-based reentrancy guard so concurrent callers share the
  572. * same reconnection attempt.
  573. */
  574. async function reconnectEnvironmentWithSession(): Promise<boolean> {
  575. if (reconnectPromise) {
  576. return reconnectPromise
  577. }
  578. reconnectPromise = doReconnect()
  579. try {
  580. return await reconnectPromise
  581. } finally {
  582. reconnectPromise = null
  583. }
  584. }
  585. async function doReconnect(): Promise<boolean> {
  586. environmentRecreations++
  587. // Invalidate any in-flight v2 handshake — the environment is being
  588. // recreated, so a stale transport arriving post-reconnect would be
  589. // pointed at a dead session.
  590. v2Generation++
  591. logForDebugging(
  592. `[bridge:repl] Reconnecting after env lost (attempt ${environmentRecreations}/${MAX_ENVIRONMENT_RECREATIONS})`,
  593. )
  594. if (environmentRecreations > MAX_ENVIRONMENT_RECREATIONS) {
  595. logForDebugging(
  596. `[bridge:repl] Environment reconnect limit reached (${MAX_ENVIRONMENT_RECREATIONS}), giving up`,
  597. )
  598. return false
  599. }
  600. // Close the stale transport. Capture seq BEFORE close — if Strategy 1
  601. // (tryReconnectInPlace) succeeds we keep the SAME session, and the
  602. // next transport must resume where this one left off, not replay from
  603. // the last transport-swap checkpoint.
  604. if (transport) {
  605. const seq = transport.getLastSequenceNum()
  606. if (seq > lastTransportSequenceNum) {
  607. lastTransportSequenceNum = seq
  608. }
  609. transport.close()
  610. transport = null
  611. }
  612. // Transport is gone — wake the poll loop out of its at-capacity
  613. // heartbeat sleep so it can fast-poll for re-dispatched work.
  614. wakePollLoop()
  615. // Reset flush gate so writeMessages() hits the !transport guard
  616. // instead of silently queuing into a dead buffer.
  617. flushGate.drop()
  618. // Release the current work item (force=false — we may want the session
  619. // back). Best-effort: the env is probably gone, so this likely 404s.
  620. if (currentWorkId) {
  621. const workIdBeingCleared = currentWorkId
  622. await api
  623. .stopWork(environmentId, workIdBeingCleared, false)
  624. .catch(() => {})
  625. // When doReconnect runs concurrently with the poll loop (ws_closed
  626. // handler case — void-called, unlike the awaited onEnvironmentLost
  627. // path), onWorkReceived can fire during the stopWork await and set
  628. // a fresh currentWorkId. If it did, the poll loop has already
  629. // recovered on its own — defer to it rather than proceeding to
  630. // archiveSession, which would destroy the session its new
  631. // transport is connected to.
  632. if (currentWorkId !== workIdBeingCleared) {
  633. logForDebugging(
  634. '[bridge:repl] Poll loop recovered during stopWork await — deferring to it',
  635. )
  636. environmentRecreations = 0
  637. return true
  638. }
  639. currentWorkId = null
  640. currentIngressToken = null
  641. }
  642. // Bail out if teardown started while we were awaiting
  643. if (pollController.signal.aborted) {
  644. logForDebugging('[bridge:repl] Reconnect aborted by teardown')
  645. return false
  646. }
  647. // Strategy 1: idempotent re-register with the server-issued env ID.
  648. // If the backend resurrects the same env (fresh secret), we can
  649. // reconnect the existing session. If it hands back a different ID, the
  650. // original env is truly gone and we fall through to a fresh session.
  651. const requestedEnvId = environmentId
  652. bridgeConfig.reuseEnvironmentId = requestedEnvId
  653. try {
  654. const reg = await api.registerBridgeEnvironment(bridgeConfig)
  655. environmentId = reg.environment_id
  656. environmentSecret = reg.environment_secret
  657. } catch (err) {
  658. bridgeConfig.reuseEnvironmentId = undefined
  659. logForDebugging(
  660. `[bridge:repl] Environment re-registration failed: ${errorMessage(err)}`,
  661. )
  662. return false
  663. }
  664. // Clear before any await — a stale value would poison the next fresh
  665. // registration if doReconnect runs again.
  666. bridgeConfig.reuseEnvironmentId = undefined
  667. logForDebugging(
  668. `[bridge:repl] Re-registered: requested=${requestedEnvId} got=${environmentId}`,
  669. )
  670. // Bail out if teardown started while we were registering
  671. if (pollController.signal.aborted) {
  672. logForDebugging(
  673. '[bridge:repl] Reconnect aborted after env registration, cleaning up',
  674. )
  675. await api.deregisterEnvironment(environmentId).catch(() => {})
  676. return false
  677. }
  678. // Same race as above, narrower window: poll loop may have set up a
  679. // transport during the registerBridgeEnvironment await. Bail before
  680. // tryReconnectInPlace/archiveSession kill it server-side.
  681. if (transport !== null) {
  682. logForDebugging(
  683. '[bridge:repl] Poll loop recovered during registerBridgeEnvironment await — deferring to it',
  684. )
  685. environmentRecreations = 0
  686. return true
  687. }
  688. // Strategy 1: same helper as perpetual init. currentSessionId stays
  689. // the same on success; URL on mobile/web stays valid;
  690. // previouslyFlushedUUIDs preserved (no re-flush).
  691. if (await tryReconnectInPlace(requestedEnvId, currentSessionId)) {
  692. logEvent('tengu_bridge_repl_reconnected_in_place', {})
  693. environmentRecreations = 0
  694. return true
  695. }
  696. // Env differs → TTL-expired/reaped; or reconnect failed.
  697. // Don't deregister — we have a fresh secret for this env either way.
  698. if (environmentId !== requestedEnvId) {
  699. logEvent('tengu_bridge_repl_env_expired_fresh_session', {})
  700. }
  701. // Strategy 2: fresh session on the now-registered environment.
  702. // Archive the old session first — it's orphaned (bound to a dead env,
  703. // or reconnectSession rejected it). Don't deregister the env — we just
  704. // got a fresh secret for it and are about to use it.
  705. await archiveSession(currentSessionId)
  706. // Bail out if teardown started while we were archiving
  707. if (pollController.signal.aborted) {
  708. logForDebugging(
  709. '[bridge:repl] Reconnect aborted after archive, cleaning up',
  710. )
  711. await api.deregisterEnvironment(environmentId).catch(() => {})
  712. return false
  713. }
  714. // Re-read the current title in case the user renamed the session.
  715. // REPL wrapper reads session storage; daemon wrapper returns the
  716. // original title (nothing to refresh).
  717. const currentTitle = getCurrentTitle()
  718. // Create a new session on the now-registered environment
  719. const newSessionId = await createSession({
  720. environmentId,
  721. title: currentTitle,
  722. gitRepoUrl,
  723. branch,
  724. signal: AbortSignal.timeout(15_000),
  725. })
  726. if (!newSessionId) {
  727. logForDebugging(
  728. '[bridge:repl] Session creation failed during reconnection',
  729. )
  730. return false
  731. }
  732. // Bail out if teardown started during session creation (up to 15s)
  733. if (pollController.signal.aborted) {
  734. logForDebugging(
  735. '[bridge:repl] Reconnect aborted after session creation, cleaning up',
  736. )
  737. await archiveSession(newSessionId)
  738. return false
  739. }
  740. currentSessionId = newSessionId
  741. // Re-publish to the PID file so peer dedup (peerRegistry.ts) picks up the
  742. // new ID — setReplBridgeHandle only fires at init/teardown, not reconnect.
  743. void updateSessionBridgeId(toCompatSessionId(newSessionId)).catch(() => {})
  744. // Reset per-session transport state IMMEDIATELY after the session swap,
  745. // before any await. If this runs after `await writeBridgePointer` below,
  746. // there's a window where handle.bridgeSessionId already returns session B
  747. // but getSSESequenceNum() still returns session A's seq — a daemon
  748. // persistState() in that window writes {bridgeSessionId: B, seq: OLD_A},
  749. // which PASSES the session-ID validation check and defeats it entirely.
  750. //
  751. // The SSE seq-num is scoped to the session's event stream — carrying it
  752. // over leaves the transport's lastSequenceNum stuck high (seq only
  753. // advances when received > last), and its next internal reconnect would
  754. // send from_sequence_num=OLD_SEQ against a stream starting at 1 → all
  755. // events in the gap silently dropped. Inbound UUID dedup is also
  756. // session-scoped.
  757. lastTransportSequenceNum = 0
  758. recentInboundUUIDs.clear()
  759. // Title derivation is session-scoped too: if the user typed during the
  760. // createSession await above, the callback fired against the OLD archived
  761. // session ID (PATCH lost) and the new session got `currentTitle` captured
  762. // BEFORE they typed. Reset so the next prompt can re-derive. Self-
  763. // correcting: if the caller's policy is already done (explicit title or
  764. // count ≥ 3), it returns true on the first post-reset call and re-latches.
  765. userMessageCallbackDone = !onUserMessage
  766. logForDebugging(`[bridge:repl] Re-created session: ${currentSessionId}`)
  767. // Rewrite the crash-recovery pointer with the new IDs so a crash after
  768. // this point resumes the right session. (The reconnect-in-place path
  769. // above doesn't touch the pointer — same session, same env.)
  770. await writeBridgePointer(dir, {
  771. sessionId: currentSessionId,
  772. environmentId,
  773. source: 'repl',
  774. })
  775. // Clear flushed UUIDs so initial messages are re-sent to the new session.
  776. // UUIDs are scoped per-session on the server, so re-flushing is safe.
  777. previouslyFlushedUUIDs?.clear()
  778. // Reset the counter so independent reconnections hours apart don't
  779. // exhaust the limit — it guards against rapid consecutive failures,
  780. // not lifetime total.
  781. environmentRecreations = 0
  782. return true
  783. }
  784. // Helper: get the current OAuth access token for session ingress auth.
  785. // Unlike the JWT path, OAuth tokens are refreshed by the standard OAuth
  786. // flow — no proactive scheduler needed.
  787. function getOAuthToken(): string | undefined {
  788. return getAccessToken()
  789. }
  790. // Drain any messages that were queued during the initial flush.
  791. // Called after writeBatch completes (or fails) so queued messages
  792. // are sent in order after the historical messages.
  793. function drainFlushGate(): void {
  794. const msgs = flushGate.end()
  795. if (msgs.length === 0) return
  796. if (!transport) {
  797. logForDebugging(
  798. `[bridge:repl] Cannot drain ${msgs.length} pending message(s): no transport`,
  799. )
  800. return
  801. }
  802. for (const msg of msgs) {
  803. recentPostedUUIDs.add(msg.uuid)
  804. }
  805. const sdkMessages = toSDKMessages(msgs)
  806. const events = sdkMessages.map(sdkMsg => ({
  807. ...sdkMsg,
  808. session_id: currentSessionId,
  809. }))
  810. logForDebugging(
  811. `[bridge:repl] Drained ${msgs.length} pending message(s) after flush`,
  812. )
  813. void transport.writeBatch(events)
  814. }
  815. // Teardown reference — set after definition below. All callers are async
  816. // callbacks that run after assignment, so the reference is always valid.
  817. let doTeardownImpl: (() => Promise<void>) | null = null
  818. function triggerTeardown(): void {
  819. void doTeardownImpl?.()
  820. }
  821. /**
  822. * Body of the transport's setOnClose callback, hoisted to initBridgeCore
  823. * scope so /bridge-kick can fire it directly. setOnClose wraps this with
  824. * a stale-transport guard; debugFireClose calls it bare.
  825. *
  826. * With autoReconnect:true, this only fires on: clean close (1000),
  827. * permanent server rejection (4001/1002/4003), or 10-min budget
  828. * exhaustion. Transient drops are retried internally by the transport.
  829. */
  830. function handleTransportPermanentClose(closeCode: number | undefined): void {
  831. logForDebugging(
  832. `[bridge:repl] Transport permanently closed: code=${closeCode}`,
  833. )
  834. logEvent('tengu_bridge_repl_ws_closed', {
  835. code: closeCode,
  836. })
  837. // Capture SSE seq high-water mark before nulling. When called from
  838. // setOnClose the guard guarantees transport !== null; when fired from
  839. // /bridge-kick it may already be null (e.g. fired twice) — skip.
  840. if (transport) {
  841. const closedSeq = transport.getLastSequenceNum()
  842. if (closedSeq > lastTransportSequenceNum) {
  843. lastTransportSequenceNum = closedSeq
  844. }
  845. transport = null
  846. }
  847. // Transport is gone — wake the poll loop out of its at-capacity
  848. // heartbeat sleep so it's fast-polling by the time the reconnect
  849. // below completes and the server re-queues work.
  850. wakePollLoop()
  851. // Reset flush state so writeMessages() hits the !transport guard
  852. // (with a warning log) instead of silently queuing into a buffer
  853. // that will never be drained. Unlike onWorkReceived (which
  854. // preserves pending messages for the new transport), onClose is
  855. // a permanent close — no new transport will drain these.
  856. const dropped = flushGate.drop()
  857. if (dropped > 0) {
  858. logForDebugging(
  859. `[bridge:repl] Dropping ${dropped} pending message(s) on transport close (code=${closeCode})`,
  860. { level: 'warn' },
  861. )
  862. }
  863. if (closeCode === 1000) {
  864. // Clean close — session ended normally. Tear down the bridge.
  865. onStateChange?.('failed', 'session ended')
  866. pollController.abort()
  867. triggerTeardown()
  868. return
  869. }
  870. // Transport reconnect budget exhausted or permanent server
  871. // rejection. By this point the env has usually been reaped
  872. // server-side (BQ 2026-03-12: ~98% of ws_closed never recover
  873. // via poll alone). stopWork(force=false) can't re-dispatch work
  874. // from an archived env; reconnectEnvironmentWithSession can
  875. // re-activate it via POST /bridge/reconnect, or fall through
  876. // to a fresh session if the env is truly gone. The poll loop
  877. // (already woken above) picks up the re-queued work once
  878. // doReconnect completes.
  879. onStateChange?.(
  880. 'reconnecting',
  881. `Remote Control connection lost (code ${closeCode})`,
  882. )
  883. logForDebugging(
  884. `[bridge:repl] Transport reconnect budget exhausted (code=${closeCode}), attempting env reconnect`,
  885. )
  886. void reconnectEnvironmentWithSession().then(success => {
  887. if (success) return
  888. // doReconnect has four abort-check return-false sites for
  889. // teardown-in-progress. Don't pollute the BQ failure signal
  890. // or double-teardown when the user just quit.
  891. if (pollController.signal.aborted) return
  892. // doReconnect returns false (never throws) on genuine failure.
  893. // The dangerous case: registerBridgeEnvironment succeeded (so
  894. // environmentId now points at a fresh valid env) but
  895. // createSession failed — poll loop would poll a sessionless
  896. // env getting null work with no errors, never hitting any
  897. // give-up path. Tear down explicitly.
  898. logForDebugging(
  899. '[bridge:repl] reconnectEnvironmentWithSession resolved false — tearing down',
  900. )
  901. logEvent('tengu_bridge_repl_reconnect_failed', {
  902. close_code: closeCode,
  903. })
  904. onStateChange?.('failed', 'reconnection failed')
  905. triggerTeardown()
  906. })
  907. }
  908. // Ant-only: SIGUSR2 → force doReconnect() for manual testing. Skips the
  909. // ~30s poll wait — fire-and-observe in the debug log immediately.
  910. // Windows has no USR signals; `process.on` would throw there.
  911. let sigusr2Handler: (() => void) | undefined
  912. if (process.env.USER_TYPE === 'ant' && process.platform !== 'win32') {
  913. sigusr2Handler = () => {
  914. logForDebugging(
  915. '[bridge:repl] SIGUSR2 received — forcing doReconnect() for testing',
  916. )
  917. void reconnectEnvironmentWithSession()
  918. }
  919. process.on('SIGUSR2', sigusr2Handler)
  920. }
  921. // Ant-only: /bridge-kick fault injection. handleTransportPermanentClose
  922. // is defined below and assigned into this slot so the slash command can
  923. // invoke it directly — the real setOnClose callback is buried inside
  924. // wireTransport which is itself inside onWorkReceived.
  925. let debugFireClose: ((code: number) => void) | null = null
  926. if (process.env.USER_TYPE === 'ant') {
  927. registerBridgeDebugHandle({
  928. fireClose: code => {
  929. if (!debugFireClose) {
  930. logForDebugging('[bridge:debug] fireClose: no transport wired yet')
  931. return
  932. }
  933. logForDebugging(`[bridge:debug] fireClose(${code}) — injecting`)
  934. debugFireClose(code)
  935. },
  936. forceReconnect: () => {
  937. logForDebugging('[bridge:debug] forceReconnect — injecting')
  938. void reconnectEnvironmentWithSession()
  939. },
  940. injectFault: injectBridgeFault,
  941. wakePollLoop,
  942. describe: () =>
  943. `env=${environmentId} session=${currentSessionId} transport=${transport?.getStateLabel() ?? 'null'} workId=${currentWorkId ?? 'null'}`,
  944. })
  945. }
  946. const pollOpts = {
  947. api,
  948. getCredentials: () => ({ environmentId, environmentSecret }),
  949. signal: pollController.signal,
  950. getPollIntervalConfig,
  951. onStateChange,
  952. getWsState: () => transport?.getStateLabel() ?? 'null',
  953. // REPL bridge is single-session: having any transport == at capacity.
  954. // No need to check isConnectedStatus() — even while the transport is
  955. // auto-reconnecting internally (up to 10 min), poll is heartbeat-only.
  956. isAtCapacity: () => transport !== null,
  957. capacitySignal,
  958. onFatalError: triggerTeardown,
  959. getHeartbeatInfo: () => {
  960. if (!currentWorkId || !currentIngressToken) {
  961. return null
  962. }
  963. return {
  964. environmentId,
  965. workId: currentWorkId,
  966. sessionToken: currentIngressToken,
  967. }
  968. },
  969. // Work-item JWT expired (or work gone). The transport is useless —
  970. // SSE reconnects and CCR writes use the same stale token. Without
  971. // this callback the poll loop would do a 10-min at-capacity backoff,
  972. // during which the work lease (300s TTL) expires and the server stops
  973. // forwarding prompts → ~25-min dead window observed in daemon logs.
  974. // Kill the transport + work state so isAtCapacity()=false; the loop
  975. // fast-polls and picks up the server's re-dispatched work in seconds.
  976. onHeartbeatFatal: (err: BridgeFatalError) => {
  977. logForDebugging(
  978. `[bridge:repl] heartbeatWork fatal (status=${err.status}) — tearing down work item for fast re-dispatch`,
  979. )
  980. if (transport) {
  981. const seq = transport.getLastSequenceNum()
  982. if (seq > lastTransportSequenceNum) {
  983. lastTransportSequenceNum = seq
  984. }
  985. transport.close()
  986. transport = null
  987. }
  988. flushGate.drop()
  989. // force=false → server re-queues. Likely already expired, but
  990. // idempotent and makes re-dispatch immediate if not.
  991. if (currentWorkId) {
  992. void api
  993. .stopWork(environmentId, currentWorkId, false)
  994. .catch((e: unknown) => {
  995. logForDebugging(
  996. `[bridge:repl] stopWork after heartbeat fatal: ${errorMessage(e)}`,
  997. )
  998. })
  999. }
  1000. currentWorkId = null
  1001. currentIngressToken = null
  1002. wakePollLoop()
  1003. onStateChange?.(
  1004. 'reconnecting',
  1005. 'Work item lease expired, fetching fresh token',
  1006. )
  1007. },
  1008. async onEnvironmentLost() {
  1009. const success = await reconnectEnvironmentWithSession()
  1010. if (!success) {
  1011. return null
  1012. }
  1013. return { environmentId, environmentSecret }
  1014. },
  1015. onWorkReceived: (
  1016. workSessionId: string,
  1017. ingressToken: string,
  1018. workId: string,
  1019. serverUseCcrV2: boolean,
  1020. ) => {
  1021. // When new work arrives while a transport is already open, the
  1022. // server has decided to re-dispatch (e.g. token rotation, server
  1023. // restart). Close the existing transport and reconnect — discarding
  1024. // the work causes a stuck 'reconnecting' state if the old WS dies
  1025. // shortly after (the server won't re-dispatch a work item it
  1026. // already delivered).
  1027. // ingressToken (JWT) is stored for heartbeat auth (both v1 and v2).
  1028. // Transport auth diverges — see the v1/v2 split below.
  1029. if (transport?.isConnectedStatus()) {
  1030. logForDebugging(
  1031. `[bridge:repl] Work received while transport connected, replacing with fresh token (workId=${workId})`,
  1032. )
  1033. }
  1034. logForDebugging(
  1035. `[bridge:repl] Work received: workId=${workId} workSessionId=${workSessionId} currentSessionId=${currentSessionId} match=${sameSessionId(workSessionId, currentSessionId)}`,
  1036. )
  1037. // Refresh the crash-recovery pointer's mtime. Staleness checks file
  1038. // mtime (not embedded timestamp) so this re-write bumps the clock —
  1039. // a 5h+ session that crashes still has a fresh pointer. Fires once
  1040. // per work dispatch (infrequent — bounded by user message rate).
  1041. void writeBridgePointer(dir, {
  1042. sessionId: currentSessionId,
  1043. environmentId,
  1044. source: 'repl',
  1045. })
  1046. // Reject foreign session IDs — the server shouldn't assign sessions
  1047. // from other environments. Since we create env+session as a pair,
  1048. // a mismatch indicates an unexpected server-side reassignment.
  1049. //
  1050. // Compare by underlying UUID, not by tagged-ID prefix. When CCR
  1051. // v2's compat layer serves the session, createBridgeSession gets
  1052. // session_* from the v1-facing API (compat/convert.go:41) but the
  1053. // infrastructure layer delivers cse_* in the work queue
  1054. // (container_manager.go:129). Same UUID, different tag.
  1055. if (!sameSessionId(workSessionId, currentSessionId)) {
  1056. logForDebugging(
  1057. `[bridge:repl] Rejecting foreign session: expected=${currentSessionId} got=${workSessionId}`,
  1058. )
  1059. return
  1060. }
  1061. currentWorkId = workId
  1062. currentIngressToken = ingressToken
  1063. // Server decides per-session (secret.use_code_sessions from the work
  1064. // secret, threaded through runWorkPollLoop). The env var is an ant-dev
  1065. // override for forcing v2 before the server flag is on for your user —
  1066. // requires ccr_v2_compat_enabled server-side or registerWorker 404s.
  1067. //
  1068. // Kept separate from CLAUDE_CODE_USE_CCR_V2 (the child-SDK transport
  1069. // selector set by sessionRunner/environment-manager) to avoid the
  1070. // inheritance hazard in spawn mode where the parent's orchestrator
  1071. // var would leak into a v1 child.
  1072. const useCcrV2 =
  1073. serverUseCcrV2 || isEnvTruthy(process.env.CLAUDE_BRIDGE_USE_CCR_V2)
  1074. // Auth is the one place v1 and v2 diverge hard:
  1075. //
  1076. // - v1 (Session-Ingress): accepts OAuth OR JWT. We prefer OAuth
  1077. // because the standard OAuth refresh flow handles expiry — no
  1078. // separate JWT refresh scheduler needed.
  1079. //
  1080. // - v2 (CCR /worker/*): REQUIRES the JWT. register_worker.go:32
  1081. // validates the session_id claim, which OAuth tokens don't carry.
  1082. // The JWT from the work secret has both that claim and the worker
  1083. // role (environment_auth.py:856). JWT refresh: when it expires the
  1084. // server re-dispatches work with a fresh one, and onWorkReceived
  1085. // fires again. createV2ReplTransport stores it via
  1086. // updateSessionIngressAuthToken() before touching the network.
  1087. let v1OauthToken: string | undefined
  1088. if (!useCcrV2) {
  1089. v1OauthToken = getOAuthToken()
  1090. if (!v1OauthToken) {
  1091. logForDebugging(
  1092. '[bridge:repl] No OAuth token available for session ingress, skipping work',
  1093. )
  1094. return
  1095. }
  1096. updateSessionIngressAuthToken(v1OauthToken)
  1097. }
  1098. logEvent('tengu_bridge_repl_work_received', {})
  1099. // Close the previous transport. Nullify BEFORE calling close() so
  1100. // the close callback doesn't treat the programmatic close as
  1101. // "session ended normally" and trigger a full teardown.
  1102. if (transport) {
  1103. const oldTransport = transport
  1104. transport = null
  1105. // Capture the SSE sequence high-water mark so the next transport
  1106. // resumes the stream instead of replaying from seq 0. Use max() —
  1107. // a transport that died early (never received any frames) would
  1108. // otherwise reset a non-zero mark back to 0.
  1109. const oldSeq = oldTransport.getLastSequenceNum()
  1110. if (oldSeq > lastTransportSequenceNum) {
  1111. lastTransportSequenceNum = oldSeq
  1112. }
  1113. oldTransport.close()
  1114. }
  1115. // Reset flush state — the old flush (if any) is no longer relevant.
  1116. // Preserve pending messages so they're drained after the new
  1117. // transport's flush completes (the hook has already advanced its
  1118. // lastWrittenIndex and won't re-send them).
  1119. flushGate.deactivate()
  1120. // Closure adapter over the shared handleServerControlRequest —
  1121. // captures transport/currentSessionId so the transport.setOnData
  1122. // callback below doesn't need to thread them through.
  1123. const onServerControlRequest = (request: SDKControlRequest): void =>
  1124. handleServerControlRequest(request, {
  1125. transport,
  1126. sessionId: currentSessionId,
  1127. onInterrupt,
  1128. onSetModel,
  1129. onSetMaxThinkingTokens,
  1130. onSetPermissionMode,
  1131. })
  1132. let initialFlushDone = false
  1133. // Wire callbacks onto a freshly constructed transport and connect.
  1134. // Extracted so the (sync) v1 and (async) v2 construction paths can
  1135. // share the identical callback + flush machinery.
  1136. const wireTransport = (newTransport: ReplBridgeTransport): void => {
  1137. transport = newTransport
  1138. newTransport.setOnConnect(() => {
  1139. // Guard: if transport was replaced by a newer onWorkReceived call
  1140. // while the WS was connecting, ignore this stale callback.
  1141. if (transport !== newTransport) return
  1142. logForDebugging('[bridge:repl] Ingress transport connected')
  1143. logEvent('tengu_bridge_repl_ws_connected', {})
  1144. // Update the env var with the latest OAuth token so POST writes
  1145. // (which read via getSessionIngressAuthToken()) use a fresh token.
  1146. // v2 skips this — createV2ReplTransport already stored the JWT,
  1147. // and overwriting it with OAuth would break subsequent /worker/*
  1148. // requests (session_id claim check).
  1149. if (!useCcrV2) {
  1150. const freshToken = getOAuthToken()
  1151. if (freshToken) {
  1152. updateSessionIngressAuthToken(freshToken)
  1153. }
  1154. }
  1155. // Reset teardownStarted so future teardowns are not blocked.
  1156. teardownStarted = false
  1157. // Flush initial messages only on first connect, not on every
  1158. // WS reconnection. Re-flushing would cause duplicate messages.
  1159. // IMPORTANT: onStateChange('connected') is deferred until the
  1160. // flush completes. This prevents writeMessages() from sending
  1161. // new messages that could arrive at the server interleaved with
  1162. // the historical messages, and delays the web UI from showing
  1163. // the session as active until history is persisted.
  1164. if (
  1165. !initialFlushDone &&
  1166. initialMessages &&
  1167. initialMessages.length > 0
  1168. ) {
  1169. initialFlushDone = true
  1170. // Cap the initial flush to the most recent N messages. The full
  1171. // history is UI-only (model doesn't see it) and large replays cause
  1172. // slow session-ingress persistence (each event is a threadstore write)
  1173. // plus elevated Firestore pressure. A 0 or negative cap disables it.
  1174. const historyCap = initialHistoryCap
  1175. const eligibleMessages = initialMessages.filter(
  1176. m =>
  1177. isEligibleBridgeMessage(m) &&
  1178. !previouslyFlushedUUIDs?.has(m.uuid),
  1179. )
  1180. const cappedMessages =
  1181. historyCap > 0 && eligibleMessages.length > historyCap
  1182. ? eligibleMessages.slice(-historyCap)
  1183. : eligibleMessages
  1184. if (cappedMessages.length < eligibleMessages.length) {
  1185. logForDebugging(
  1186. `[bridge:repl] Capped initial flush: ${eligibleMessages.length} -> ${cappedMessages.length} (cap=${historyCap})`,
  1187. )
  1188. logEvent('tengu_bridge_repl_history_capped', {
  1189. eligible_count: eligibleMessages.length,
  1190. capped_count: cappedMessages.length,
  1191. })
  1192. }
  1193. const sdkMessages = toSDKMessages(cappedMessages)
  1194. if (sdkMessages.length > 0) {
  1195. logForDebugging(
  1196. `[bridge:repl] Flushing ${sdkMessages.length} initial message(s) via transport`,
  1197. )
  1198. const events = sdkMessages.map(sdkMsg => ({
  1199. ...sdkMsg,
  1200. session_id: currentSessionId,
  1201. }))
  1202. const dropsBefore = newTransport.droppedBatchCount
  1203. void newTransport
  1204. .writeBatch(events)
  1205. .then(() => {
  1206. // If any batch was dropped during this flush (SI down for
  1207. // maxConsecutiveFailures attempts), flush() still resolved
  1208. // normally but the events were NOT delivered. Don't mark
  1209. // UUIDs as flushed — keep them eligible for re-send on the
  1210. // next onWorkReceived (JWT refresh re-dispatch, line ~1144).
  1211. if (newTransport.droppedBatchCount > dropsBefore) {
  1212. logForDebugging(
  1213. `[bridge:repl] Initial flush dropped ${newTransport.droppedBatchCount - dropsBefore} batch(es) — not marking ${sdkMessages.length} UUID(s) as flushed`,
  1214. )
  1215. return
  1216. }
  1217. if (previouslyFlushedUUIDs) {
  1218. for (const sdkMsg of sdkMessages) {
  1219. if (sdkMsg.uuid) {
  1220. previouslyFlushedUUIDs.add(sdkMsg.uuid)
  1221. }
  1222. }
  1223. }
  1224. })
  1225. .catch(e =>
  1226. logForDebugging(`[bridge:repl] Initial flush failed: ${e}`),
  1227. )
  1228. .finally(() => {
  1229. // Guard: if transport was replaced during the flush,
  1230. // don't signal connected or drain — the new transport
  1231. // owns the lifecycle now.
  1232. if (transport !== newTransport) return
  1233. drainFlushGate()
  1234. onStateChange?.('connected')
  1235. })
  1236. } else {
  1237. // All initial messages were already flushed (filtered by
  1238. // previouslyFlushedUUIDs). No flush POST needed — clear
  1239. // the flag and signal connected immediately. This is the
  1240. // first connect for this transport (inside !initialFlushDone),
  1241. // so no flush POST is in-flight — the flag was set before
  1242. // connect() and must be cleared here.
  1243. drainFlushGate()
  1244. onStateChange?.('connected')
  1245. }
  1246. } else if (!flushGate.active) {
  1247. // No initial messages or already flushed on first connect.
  1248. // WS auto-reconnect path — only signal connected if no flush
  1249. // POST is in-flight. If one is, .finally() owns the lifecycle.
  1250. onStateChange?.('connected')
  1251. }
  1252. })
  1253. newTransport.setOnData(data => {
  1254. handleIngressMessage(
  1255. data,
  1256. recentPostedUUIDs,
  1257. recentInboundUUIDs,
  1258. onInboundMessage,
  1259. onPermissionResponse,
  1260. onServerControlRequest,
  1261. )
  1262. })
  1263. // Body lives at initBridgeCore scope so /bridge-kick can call it
  1264. // directly via debugFireClose. All referenced closures (transport,
  1265. // wakePollLoop, flushGate, reconnectEnvironmentWithSession, etc.)
  1266. // are already at that scope. The only lexical dependency on
  1267. // wireTransport was `newTransport.getLastSequenceNum()` — but after
  1268. // the guard below passes we know transport === newTransport.
  1269. debugFireClose = handleTransportPermanentClose
  1270. newTransport.setOnClose(closeCode => {
  1271. // Guard: if transport was replaced, ignore stale close.
  1272. if (transport !== newTransport) return
  1273. handleTransportPermanentClose(closeCode)
  1274. })
  1275. // Start the flush gate before connect() to cover the WS handshake
  1276. // window. Between transport assignment and setOnConnect firing,
  1277. // writeMessages() could send messages via HTTP POST before the
  1278. // initial flush starts. Starting the gate here ensures those
  1279. // calls are queued. If there are no initial messages, the gate
  1280. // stays inactive.
  1281. if (
  1282. !initialFlushDone &&
  1283. initialMessages &&
  1284. initialMessages.length > 0
  1285. ) {
  1286. flushGate.start()
  1287. }
  1288. newTransport.connect()
  1289. } // end wireTransport
  1290. // Bump unconditionally — ANY new transport (v1 or v2) invalidates an
  1291. // in-flight v2 handshake. Also bumped in doReconnect().
  1292. v2Generation++
  1293. if (useCcrV2) {
  1294. // workSessionId is the cse_* form (infrastructure-layer ID from the
  1295. // work queue), which is what /v1/code/sessions/{id}/worker/* wants.
  1296. // The session_* form (currentSessionId) is NOT usable here —
  1297. // handler/convert.go:30 validates TagCodeSession.
  1298. const sessionUrl = buildCCRv2SdkUrl(baseUrl, workSessionId)
  1299. const thisGen = v2Generation
  1300. logForDebugging(
  1301. `[bridge:repl] CCR v2: sessionUrl=${sessionUrl} session=${workSessionId} gen=${thisGen}`,
  1302. )
  1303. void createV2ReplTransport({
  1304. sessionUrl,
  1305. ingressToken,
  1306. sessionId: workSessionId,
  1307. initialSequenceNum: lastTransportSequenceNum,
  1308. }).then(
  1309. t => {
  1310. // Teardown started while registerWorker was in flight. Teardown
  1311. // saw transport === null and skipped close(); installing now
  1312. // would leak CCRClient heartbeat timers and reset
  1313. // teardownStarted via wireTransport's side effects.
  1314. if (pollController.signal.aborted) {
  1315. t.close()
  1316. return
  1317. }
  1318. // onWorkReceived may have fired again while registerWorker()
  1319. // was in flight (server re-dispatch with a fresh JWT). The
  1320. // transport !== null check alone gets the race wrong when BOTH
  1321. // attempts saw transport === null — it keeps the first resolver
  1322. // (stale epoch) and discards the second (correct epoch). The
  1323. // generation check catches it regardless of transport state.
  1324. if (thisGen !== v2Generation) {
  1325. logForDebugging(
  1326. `[bridge:repl] CCR v2: discarding stale handshake gen=${thisGen} current=${v2Generation}`,
  1327. )
  1328. t.close()
  1329. return
  1330. }
  1331. wireTransport(t)
  1332. },
  1333. (err: unknown) => {
  1334. logForDebugging(
  1335. `[bridge:repl] CCR v2: createV2ReplTransport failed: ${errorMessage(err)}`,
  1336. { level: 'error' },
  1337. )
  1338. logEvent('tengu_bridge_repl_ccr_v2_init_failed', {})
  1339. // If a newer attempt is in flight or already succeeded, don't
  1340. // touch its work item — our failure is irrelevant.
  1341. if (thisGen !== v2Generation) return
  1342. // Release the work item so the server re-dispatches immediately
  1343. // instead of waiting for its own timeout. currentWorkId was set
  1344. // above; without this, the session looks stuck to the user.
  1345. if (currentWorkId) {
  1346. void api
  1347. .stopWork(environmentId, currentWorkId, false)
  1348. .catch((e: unknown) => {
  1349. logForDebugging(
  1350. `[bridge:repl] stopWork after v2 init failure: ${errorMessage(e)}`,
  1351. )
  1352. })
  1353. currentWorkId = null
  1354. currentIngressToken = null
  1355. }
  1356. wakePollLoop()
  1357. },
  1358. )
  1359. } else {
  1360. // v1: HybridTransport (WS reads + POST writes to Session-Ingress).
  1361. // autoReconnect is true (default) — when the WS dies, the transport
  1362. // reconnects automatically with exponential backoff. POST writes
  1363. // continue during reconnection (they use getSessionIngressAuthToken()
  1364. // independently of WS state). The poll loop remains as a secondary
  1365. // fallback if the reconnect budget is exhausted (10 min).
  1366. //
  1367. // Auth: uses OAuth tokens directly instead of the JWT from the work
  1368. // secret. refreshHeaders picks up the latest OAuth token on each
  1369. // WS reconnect attempt.
  1370. const wsUrl = buildSdkUrl(sessionIngressUrl, workSessionId)
  1371. logForDebugging(`[bridge:repl] Ingress URL: ${wsUrl}`)
  1372. logForDebugging(
  1373. `[bridge:repl] Creating HybridTransport: session=${workSessionId}`,
  1374. )
  1375. // v1OauthToken was validated non-null above (we'd have returned early).
  1376. const oauthToken = v1OauthToken ?? ''
  1377. wireTransport(
  1378. createV1ReplTransport(
  1379. new HybridTransport(
  1380. new URL(wsUrl),
  1381. {
  1382. Authorization: `Bearer ${oauthToken}`,
  1383. 'anthropic-version': '2023-06-01',
  1384. },
  1385. workSessionId,
  1386. () => ({
  1387. Authorization: `Bearer ${getOAuthToken() ?? oauthToken}`,
  1388. 'anthropic-version': '2023-06-01',
  1389. }),
  1390. // Cap retries so a persistently-failing session-ingress can't
  1391. // pin the uploader drain loop for the lifetime of the bridge.
  1392. // 50 attempts ≈ 20 min (15s POST timeout + 8s backoff + jitter
  1393. // per cycle at steady state). Bridge-only — 1P keeps indefinite.
  1394. {
  1395. maxConsecutiveFailures: 50,
  1396. isBridge: true,
  1397. onBatchDropped: () => {
  1398. onStateChange?.(
  1399. 'reconnecting',
  1400. 'Lost sync with Remote Control — events could not be delivered',
  1401. )
  1402. // SI has been down ~20 min. Wake the poll loop so that when
  1403. // SI recovers, next poll → onWorkReceived → fresh transport
  1404. // → initial flush succeeds → onStateChange('connected') at
  1405. // ~line 1420. Without this, state stays 'reconnecting' even
  1406. // after SI recovers — daemon.ts:437 denies all permissions,
  1407. // useReplBridge.ts:311 keeps replBridgeSessionActive=false.
  1408. // If the env was archived during the outage, poll 404 →
  1409. // onEnvironmentLost recovery path handles it.
  1410. wakePollLoop()
  1411. },
  1412. },
  1413. ),
  1414. ),
  1415. )
  1416. }
  1417. },
  1418. }
  1419. void startWorkPollLoop(pollOpts)
  1420. // Perpetual mode: hourly mtime refresh of the crash-recovery pointer.
  1421. // The onWorkReceived refresh only fires per user prompt — a
  1422. // daemon idle for >4h would have a stale pointer, and the next restart
  1423. // would clear it (readBridgePointer TTL check) → fresh session. The
  1424. // standalone bridge (bridgeMain.ts) has an identical hourly timer.
  1425. const pointerRefreshTimer = perpetual
  1426. ? setInterval(() => {
  1427. // doReconnect() reassigns currentSessionId/environmentId non-
  1428. // atomically (env at ~:634, session at ~:719, awaits in between).
  1429. // If this timer fires in that window, its fire-and-forget write can
  1430. // race with (and overwrite) doReconnect's own pointer write at ~:740,
  1431. // leaving the pointer at the now-archived old session. doReconnect
  1432. // writes the pointer itself, so skipping here is free.
  1433. if (reconnectPromise) return
  1434. void writeBridgePointer(dir, {
  1435. sessionId: currentSessionId,
  1436. environmentId,
  1437. source: 'repl',
  1438. })
  1439. }, 60 * 60_000)
  1440. : null
  1441. pointerRefreshTimer?.unref?.()
  1442. // Push a silent keep_alive frame on a fixed interval so upstream proxies
  1443. // and the session-ingress layer don't GC an otherwise-idle remote control
  1444. // session. The keep_alive type is filtered before reaching any client UI
  1445. // (Query.ts drops it; web/iOS/Android never see it in their message loop).
  1446. // Interval comes from GrowthBook (tengu_bridge_poll_interval_config
  1447. // session_keepalive_interval_v2_ms, default 120s); 0 = disabled.
  1448. const keepAliveIntervalMs =
  1449. getPollIntervalConfig().session_keepalive_interval_v2_ms
  1450. const keepAliveTimer =
  1451. keepAliveIntervalMs > 0
  1452. ? setInterval(() => {
  1453. if (!transport) return
  1454. logForDebugging('[bridge:repl] keep_alive sent')
  1455. void transport.write({ type: 'keep_alive' }).catch((err: unknown) => {
  1456. logForDebugging(
  1457. `[bridge:repl] keep_alive write failed: ${errorMessage(err)}`,
  1458. )
  1459. })
  1460. }, keepAliveIntervalMs)
  1461. : null
  1462. keepAliveTimer?.unref?.()
  1463. // Shared teardown sequence used by both cleanup registration and
  1464. // the explicit teardown() method on the returned handle.
  1465. let teardownStarted = false
  1466. doTeardownImpl = async (): Promise<void> => {
  1467. if (teardownStarted) {
  1468. logForDebugging(
  1469. `[bridge:repl] Teardown already in progress, skipping duplicate call env=${environmentId} session=${currentSessionId}`,
  1470. )
  1471. return
  1472. }
  1473. teardownStarted = true
  1474. const teardownStart = Date.now()
  1475. logForDebugging(
  1476. `[bridge:repl] Teardown starting: env=${environmentId} session=${currentSessionId} workId=${currentWorkId ?? 'none'} transportState=${transport?.getStateLabel() ?? 'null'}`,
  1477. )
  1478. if (pointerRefreshTimer !== null) {
  1479. clearInterval(pointerRefreshTimer)
  1480. }
  1481. if (keepAliveTimer !== null) {
  1482. clearInterval(keepAliveTimer)
  1483. }
  1484. if (sigusr2Handler) {
  1485. process.off('SIGUSR2', sigusr2Handler)
  1486. }
  1487. if (process.env.USER_TYPE === 'ant') {
  1488. clearBridgeDebugHandle()
  1489. debugFireClose = null
  1490. }
  1491. pollController.abort()
  1492. logForDebugging('[bridge:repl] Teardown: poll loop aborted')
  1493. // Capture the live transport's seq BEFORE close() — close() is sync
  1494. // (just aborts the SSE fetch) and does NOT invoke onClose, so the
  1495. // setOnClose capture path never runs for explicit teardown.
  1496. // Without this, getSSESequenceNum() after teardown returns the stale
  1497. // lastTransportSequenceNum (captured at the last transport swap), and
  1498. // daemon callers persisting that value lose all events since then.
  1499. if (transport) {
  1500. const finalSeq = transport.getLastSequenceNum()
  1501. if (finalSeq > lastTransportSequenceNum) {
  1502. lastTransportSequenceNum = finalSeq
  1503. }
  1504. }
  1505. if (perpetual) {
  1506. // Perpetual teardown is LOCAL-ONLY — do not send result, do not call
  1507. // stopWork, do not close the transport. All of those signal the
  1508. // server (and any mobile/attach subscribers) that the session is
  1509. // ending. Instead: stop polling, let the socket die with the
  1510. // process; the backend times the work-item lease back to pending on
  1511. // its own (TTL 300s). Next daemon start reads the pointer and
  1512. // reconnectSession re-queues work.
  1513. transport = null
  1514. flushGate.drop()
  1515. // Refresh the pointer mtime so that sessions lasting longer than
  1516. // BRIDGE_POINTER_TTL_MS (4h) don't appear stale on next start.
  1517. await writeBridgePointer(dir, {
  1518. sessionId: currentSessionId,
  1519. environmentId,
  1520. source: 'repl',
  1521. })
  1522. logForDebugging(
  1523. `[bridge:repl] Teardown (perpetual): leaving env=${environmentId} session=${currentSessionId} alive on server, duration=${Date.now() - teardownStart}ms`,
  1524. )
  1525. return
  1526. }
  1527. // Fire the result message, then archive, THEN close. transport.write()
  1528. // only enqueues (SerialBatchEventUploader resolves on buffer-add); the
  1529. // stopWork/archive latency (~200-500ms) is the drain window for the
  1530. // result POST. Closing BEFORE archive meant relying on HybridTransport's
  1531. // void-ed 3s grace period, which nothing awaits — forceExit can kill the
  1532. // socket mid-POST. Same reorder as remoteBridgeCore.ts teardown (#22803).
  1533. const teardownTransport = transport
  1534. transport = null
  1535. flushGate.drop()
  1536. if (teardownTransport) {
  1537. void teardownTransport.write(makeResultMessage(currentSessionId))
  1538. }
  1539. const stopWorkP = currentWorkId
  1540. ? api
  1541. .stopWork(environmentId, currentWorkId, true)
  1542. .then(() => {
  1543. logForDebugging('[bridge:repl] Teardown: stopWork completed')
  1544. })
  1545. .catch((err: unknown) => {
  1546. logForDebugging(
  1547. `[bridge:repl] Teardown stopWork failed: ${errorMessage(err)}`,
  1548. )
  1549. })
  1550. : Promise.resolve()
  1551. // Run stopWork and archiveSession in parallel. gracefulShutdown.ts:407
  1552. // races runCleanupFunctions() against 2s (NOT the 5s outer failsafe),
  1553. // so archive is capped at 1.5s at the injection site to stay under budget.
  1554. // archiveSession is contractually no-throw; the injected implementations
  1555. // log their own success/failure internally.
  1556. await Promise.all([stopWorkP, archiveSession(currentSessionId)])
  1557. teardownTransport?.close()
  1558. logForDebugging('[bridge:repl] Teardown: transport closed')
  1559. await api.deregisterEnvironment(environmentId).catch((err: unknown) => {
  1560. logForDebugging(
  1561. `[bridge:repl] Teardown deregister failed: ${errorMessage(err)}`,
  1562. )
  1563. })
  1564. // Clear the crash-recovery pointer — explicit disconnect or clean REPL
  1565. // exit means the user is done with this session. Crash/kill-9 never
  1566. // reaches this line, leaving the pointer for next-launch recovery.
  1567. await clearBridgePointer(dir)
  1568. logForDebugging(
  1569. `[bridge:repl] Teardown complete: env=${environmentId} duration=${Date.now() - teardownStart}ms`,
  1570. )
  1571. }
  1572. // 8. Register cleanup for graceful shutdown
  1573. const unregister = registerCleanup(() => doTeardownImpl?.())
  1574. logForDebugging(
  1575. `[bridge:repl] Ready: env=${environmentId} session=${currentSessionId}`,
  1576. )
  1577. onStateChange?.('ready')
  1578. return {
  1579. get bridgeSessionId() {
  1580. return currentSessionId
  1581. },
  1582. get environmentId() {
  1583. return environmentId
  1584. },
  1585. getSSESequenceNum() {
  1586. // lastTransportSequenceNum only updates when a transport is CLOSED
  1587. // (captured at swap/onClose). During normal operation the CURRENT
  1588. // transport's live seq isn't reflected there. Merge both so callers
  1589. // (e.g. daemon persistState()) get the actual high-water mark.
  1590. const live = transport?.getLastSequenceNum() ?? 0
  1591. return Math.max(lastTransportSequenceNum, live)
  1592. },
  1593. sessionIngressUrl,
  1594. writeMessages(messages) {
  1595. // Filter to user/assistant messages that haven't already been sent.
  1596. // Two layers of dedup:
  1597. // - initialMessageUUIDs: messages sent as session creation events
  1598. // - recentPostedUUIDs: messages recently sent via POST
  1599. const filtered = messages.filter(
  1600. m =>
  1601. isEligibleBridgeMessage(m) &&
  1602. !initialMessageUUIDs.has(m.uuid) &&
  1603. !recentPostedUUIDs.has(m.uuid),
  1604. )
  1605. if (filtered.length === 0) return
  1606. // Fire onUserMessage for title derivation. Scan before the flushGate
  1607. // check — prompts are title-worthy even if they queue behind the
  1608. // initial history flush. Keeps calling on every title-worthy message
  1609. // until the callback returns true; the caller owns the policy.
  1610. if (!userMessageCallbackDone) {
  1611. for (const m of filtered) {
  1612. const text = extractTitleText(m)
  1613. if (text !== undefined && onUserMessage?.(text, currentSessionId)) {
  1614. userMessageCallbackDone = true
  1615. break
  1616. }
  1617. }
  1618. }
  1619. // Queue messages while the initial flush is in progress to prevent
  1620. // them from arriving at the server interleaved with history.
  1621. if (flushGate.enqueue(...filtered)) {
  1622. logForDebugging(
  1623. `[bridge:repl] Queued ${filtered.length} message(s) during initial flush`,
  1624. )
  1625. return
  1626. }
  1627. if (!transport) {
  1628. const types = filtered.map(m => m.type).join(',')
  1629. logForDebugging(
  1630. `[bridge:repl] Transport not configured, dropping ${filtered.length} message(s) [${types}] for session=${currentSessionId}`,
  1631. { level: 'warn' },
  1632. )
  1633. return
  1634. }
  1635. // Track in the bounded ring buffer for echo filtering and dedup.
  1636. for (const msg of filtered) {
  1637. recentPostedUUIDs.add(msg.uuid)
  1638. }
  1639. logForDebugging(
  1640. `[bridge:repl] Sending ${filtered.length} message(s) via transport`,
  1641. )
  1642. // Convert to SDK format and send via HTTP POST (HybridTransport).
  1643. // The web UI receives them via the subscribe WebSocket.
  1644. const sdkMessages = toSDKMessages(filtered)
  1645. const events = sdkMessages.map(sdkMsg => ({
  1646. ...sdkMsg,
  1647. session_id: currentSessionId,
  1648. }))
  1649. void transport.writeBatch(events)
  1650. },
  1651. writeSdkMessages(messages) {
  1652. // Daemon path: query() already yields SDKMessage, skip conversion.
  1653. // Still run echo dedup (server bounces writes back on the WS).
  1654. // No initialMessageUUIDs filter — daemon has no initial messages.
  1655. // No flushGate — daemon never starts it (no initial flush).
  1656. const filtered = messages.filter(
  1657. m => !m.uuid || !recentPostedUUIDs.has(m.uuid),
  1658. )
  1659. if (filtered.length === 0) return
  1660. if (!transport) {
  1661. logForDebugging(
  1662. `[bridge:repl] Transport not configured, dropping ${filtered.length} SDK message(s) for session=${currentSessionId}`,
  1663. { level: 'warn' },
  1664. )
  1665. return
  1666. }
  1667. for (const msg of filtered) {
  1668. if (msg.uuid) recentPostedUUIDs.add(msg.uuid)
  1669. }
  1670. const events = filtered.map(m => ({ ...m, session_id: currentSessionId }))
  1671. void transport.writeBatch(events)
  1672. },
  1673. sendControlRequest(request: SDKControlRequest) {
  1674. if (!transport) {
  1675. logForDebugging(
  1676. '[bridge:repl] Transport not configured, skipping control_request',
  1677. )
  1678. return
  1679. }
  1680. const event = { ...request, session_id: currentSessionId }
  1681. void transport.write(event)
  1682. logForDebugging(
  1683. `[bridge:repl] Sent control_request request_id=${request.request_id}`,
  1684. )
  1685. },
  1686. sendControlResponse(response: SDKControlResponse) {
  1687. if (!transport) {
  1688. logForDebugging(
  1689. '[bridge:repl] Transport not configured, skipping control_response',
  1690. )
  1691. return
  1692. }
  1693. const event = { ...response, session_id: currentSessionId }
  1694. void transport.write(event)
  1695. logForDebugging('[bridge:repl] Sent control_response')
  1696. },
  1697. sendControlCancelRequest(requestId: string) {
  1698. if (!transport) {
  1699. logForDebugging(
  1700. '[bridge:repl] Transport not configured, skipping control_cancel_request',
  1701. )
  1702. return
  1703. }
  1704. const event = {
  1705. type: 'control_cancel_request' as const,
  1706. request_id: requestId,
  1707. session_id: currentSessionId,
  1708. }
  1709. void transport.write(event)
  1710. logForDebugging(
  1711. `[bridge:repl] Sent control_cancel_request request_id=${requestId}`,
  1712. )
  1713. },
  1714. sendResult() {
  1715. if (!transport) {
  1716. logForDebugging(
  1717. `[bridge:repl] sendResult: skipping, transport not configured session=${currentSessionId}`,
  1718. )
  1719. return
  1720. }
  1721. void transport.write(makeResultMessage(currentSessionId))
  1722. logForDebugging(
  1723. `[bridge:repl] Sent result for session=${currentSessionId}`,
  1724. )
  1725. },
  1726. async teardown() {
  1727. unregister()
  1728. await doTeardownImpl?.()
  1729. logForDebugging('[bridge:repl] Torn down')
  1730. logEvent('tengu_bridge_repl_teardown', {})
  1731. },
  1732. }
  1733. }
  1734. /**
  1735. * Persistent poll loop for work items. Runs in the background for the
  1736. * lifetime of the bridge connection.
  1737. *
  1738. * When a work item arrives, acknowledges it and calls onWorkReceived
  1739. * with the session ID and ingress token (which connects the ingress
  1740. * WebSocket). Then continues polling — the server will dispatch a new
  1741. * work item if the ingress WebSocket drops, allowing automatic
  1742. * reconnection without tearing down the bridge.
  1743. */
  1744. async function startWorkPollLoop({
  1745. api,
  1746. getCredentials,
  1747. signal,
  1748. onStateChange,
  1749. onWorkReceived,
  1750. onEnvironmentLost,
  1751. getWsState,
  1752. isAtCapacity,
  1753. capacitySignal,
  1754. onFatalError,
  1755. getPollIntervalConfig = () => DEFAULT_POLL_CONFIG,
  1756. getHeartbeatInfo,
  1757. onHeartbeatFatal,
  1758. }: {
  1759. api: BridgeApiClient
  1760. getCredentials: () => { environmentId: string; environmentSecret: string }
  1761. signal: AbortSignal
  1762. onStateChange?: (state: BridgeState, detail?: string) => void
  1763. onWorkReceived: (
  1764. sessionId: string,
  1765. ingressToken: string,
  1766. workId: string,
  1767. useCodeSessions: boolean,
  1768. ) => void
  1769. /** Called when the environment has been deleted. Returns new credentials or null. */
  1770. onEnvironmentLost?: () => Promise<{
  1771. environmentId: string
  1772. environmentSecret: string
  1773. } | null>
  1774. /** Returns the current WebSocket readyState label for diagnostic logging. */
  1775. getWsState?: () => string
  1776. /**
  1777. * Returns true when the caller cannot accept new work (transport already
  1778. * connected). When true, the loop polls at the configured at-capacity
  1779. * interval as a heartbeat only. Server-side BRIDGE_LAST_POLL_TTL is
  1780. * 4 hours — anything shorter than that is sufficient for liveness.
  1781. */
  1782. isAtCapacity?: () => boolean
  1783. /**
  1784. * Produces a signal that aborts when capacity frees up (transport lost),
  1785. * merged with the loop signal. Used to interrupt the at-capacity sleep
  1786. * so recovery polling starts immediately.
  1787. */
  1788. capacitySignal?: () => CapacitySignal
  1789. /** Called on unrecoverable errors (e.g. server-side expiry) to trigger full teardown. */
  1790. onFatalError?: () => void
  1791. /** Poll interval config getter — defaults to DEFAULT_POLL_CONFIG. */
  1792. getPollIntervalConfig?: () => PollIntervalConfig
  1793. /**
  1794. * Returns the current work ID and session ingress token for heartbeat.
  1795. * When null, heartbeat is not possible (no active work item).
  1796. */
  1797. getHeartbeatInfo?: () => {
  1798. environmentId: string
  1799. workId: string
  1800. sessionToken: string
  1801. } | null
  1802. /**
  1803. * Called when heartbeatWork throws BridgeFatalError (401/403/404/410 —
  1804. * JWT expired or work item gone). Caller should tear down the transport
  1805. * + work state so isAtCapacity() flips to false and the loop fast-polls
  1806. * for the server's re-dispatched work item. When provided, the loop
  1807. * SKIPS the at-capacity backoff sleep (which would otherwise cause a
  1808. * ~10-minute dead window before recovery). When omitted, falls back to
  1809. * the backoff sleep to avoid a tight poll+heartbeat loop.
  1810. */
  1811. onHeartbeatFatal?: (err: BridgeFatalError) => void
  1812. }): Promise<void> {
  1813. const MAX_ENVIRONMENT_RECREATIONS = 3
  1814. logForDebugging(
  1815. `[bridge:repl] Starting work poll loop for env=${getCredentials().environmentId}`,
  1816. )
  1817. let consecutiveErrors = 0
  1818. let firstErrorTime: number | null = null
  1819. let lastPollErrorTime: number | null = null
  1820. let environmentRecreations = 0
  1821. // Set when the at-capacity sleep overruns its deadline by a large margin
  1822. // (process suspension). Consumed at the top of the next iteration to
  1823. // force one fast-poll cycle — isAtCapacity() is `transport !== null`,
  1824. // which stays true while the transport auto-reconnects, so the poll
  1825. // loop would otherwise go straight back to a 10-minute sleep on a
  1826. // transport that may be pointed at a dead socket.
  1827. let suspensionDetected = false
  1828. while (!signal.aborted) {
  1829. // Capture credentials outside try so the catch block can detect
  1830. // whether a concurrent reconnection replaced the environment.
  1831. const { environmentId: envId, environmentSecret: envSecret } =
  1832. getCredentials()
  1833. const pollConfig = getPollIntervalConfig()
  1834. try {
  1835. const work = await api.pollForWork(
  1836. envId,
  1837. envSecret,
  1838. signal,
  1839. pollConfig.reclaim_older_than_ms,
  1840. )
  1841. // A successful poll proves the env is genuinely healthy — reset the
  1842. // env-loss counter so events hours apart each start fresh. Outside
  1843. // the state-change guard below because onEnvLost's success path
  1844. // already emits 'ready'; emitting again here would be a duplicate.
  1845. // (onEnvLost returning creds does NOT reset this — that would break
  1846. // oscillation protection when the new env immediately dies.)
  1847. environmentRecreations = 0
  1848. // Reset error tracking on successful poll
  1849. if (consecutiveErrors > 0) {
  1850. logForDebugging(
  1851. `[bridge:repl] Poll recovered after ${consecutiveErrors} consecutive error(s)`,
  1852. )
  1853. consecutiveErrors = 0
  1854. firstErrorTime = null
  1855. lastPollErrorTime = null
  1856. onStateChange?.('ready')
  1857. }
  1858. if (!work) {
  1859. // Read-and-clear: after a detected suspension, skip the at-capacity
  1860. // branch exactly once. The pollForWork above already refreshed the
  1861. // server's BRIDGE_LAST_POLL_TTL; this fast cycle gives any
  1862. // re-dispatched work item a chance to land before we go back under.
  1863. const skipAtCapacityOnce = suspensionDetected
  1864. suspensionDetected = false
  1865. if (isAtCapacity?.() && capacitySignal && !skipAtCapacityOnce) {
  1866. const atCapMs = pollConfig.poll_interval_ms_at_capacity
  1867. // Heartbeat loops WITHOUT polling. When at-capacity polling is also
  1868. // enabled (atCapMs > 0), the loop tracks a deadline and breaks out
  1869. // to poll at that interval — heartbeat and poll compose instead of
  1870. // one suppressing the other. Breaks out when:
  1871. // - Poll deadline reached (atCapMs > 0 only)
  1872. // - Auth fails (JWT expired → poll refreshes tokens)
  1873. // - Capacity wake fires (transport lost → poll for new work)
  1874. // - Heartbeat config disabled (GrowthBook update)
  1875. // - Loop aborted (shutdown)
  1876. if (
  1877. pollConfig.non_exclusive_heartbeat_interval_ms > 0 &&
  1878. getHeartbeatInfo
  1879. ) {
  1880. logEvent('tengu_bridge_heartbeat_mode_entered', {
  1881. heartbeat_interval_ms:
  1882. pollConfig.non_exclusive_heartbeat_interval_ms,
  1883. })
  1884. // Deadline computed once at entry — GB updates to atCapMs don't
  1885. // shift an in-flight deadline (next entry picks up the new value).
  1886. const pollDeadline = atCapMs > 0 ? Date.now() + atCapMs : null
  1887. let needsBackoff = false
  1888. let hbCycles = 0
  1889. while (
  1890. !signal.aborted &&
  1891. isAtCapacity() &&
  1892. (pollDeadline === null || Date.now() < pollDeadline)
  1893. ) {
  1894. const hbConfig = getPollIntervalConfig()
  1895. if (hbConfig.non_exclusive_heartbeat_interval_ms <= 0) break
  1896. const info = getHeartbeatInfo()
  1897. if (!info) break
  1898. // Capture capacity signal BEFORE the async heartbeat call so
  1899. // a transport loss during the HTTP request is caught by the
  1900. // subsequent sleep.
  1901. const cap = capacitySignal()
  1902. try {
  1903. await api.heartbeatWork(
  1904. info.environmentId,
  1905. info.workId,
  1906. info.sessionToken,
  1907. )
  1908. } catch (err) {
  1909. logForDebugging(
  1910. `[bridge:repl:heartbeat] Failed: ${errorMessage(err)}`,
  1911. )
  1912. if (err instanceof BridgeFatalError) {
  1913. cap.cleanup()
  1914. logEvent('tengu_bridge_heartbeat_error', {
  1915. status:
  1916. err.status as unknown as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS,
  1917. error_type: (err.status === 401 || err.status === 403
  1918. ? 'auth_failed'
  1919. : 'fatal') as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS,
  1920. })
  1921. // JWT expired (401/403) or work item gone (404/410).
  1922. // Either way the current transport is dead — SSE
  1923. // reconnects and CCR writes will fail on the same
  1924. // stale token. If the caller gave us a recovery hook,
  1925. // tear down work state and skip backoff: isAtCapacity()
  1926. // flips to false, next outer-loop iteration fast-polls
  1927. // for the server's re-dispatched work item. Without
  1928. // the hook, backoff to avoid tight poll+heartbeat loop.
  1929. if (onHeartbeatFatal) {
  1930. onHeartbeatFatal(err)
  1931. logForDebugging(
  1932. `[bridge:repl:heartbeat] Fatal (status=${err.status}), work state cleared — fast-polling for re-dispatch`,
  1933. )
  1934. } else {
  1935. needsBackoff = true
  1936. }
  1937. break
  1938. }
  1939. }
  1940. hbCycles++
  1941. await sleep(
  1942. hbConfig.non_exclusive_heartbeat_interval_ms,
  1943. cap.signal,
  1944. )
  1945. cap.cleanup()
  1946. }
  1947. const exitReason = needsBackoff
  1948. ? 'error'
  1949. : signal.aborted
  1950. ? 'shutdown'
  1951. : !isAtCapacity()
  1952. ? 'capacity_changed'
  1953. : pollDeadline !== null && Date.now() >= pollDeadline
  1954. ? 'poll_due'
  1955. : 'config_disabled'
  1956. logEvent('tengu_bridge_heartbeat_mode_exited', {
  1957. reason:
  1958. exitReason as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS,
  1959. heartbeat_cycles: hbCycles,
  1960. })
  1961. // On auth_failed or fatal, backoff before polling to avoid a
  1962. // tight poll+heartbeat loop. Fall through to the shared sleep
  1963. // below — it's the same capacitySignal-wrapped sleep the legacy
  1964. // path uses, and both need the suspension-overrun check.
  1965. if (!needsBackoff) {
  1966. if (exitReason === 'poll_due') {
  1967. // bridgeApi throttles empty-poll logs (EMPTY_POLL_LOG_INTERVAL=100)
  1968. // so the once-per-10min poll_due poll is invisible at counter=2.
  1969. // Log it here so verification runs see both endpoints in the debug log.
  1970. logForDebugging(
  1971. `[bridge:repl] Heartbeat poll_due after ${hbCycles} cycles — falling through to pollForWork`,
  1972. )
  1973. }
  1974. continue
  1975. }
  1976. }
  1977. // At-capacity sleep — reached by both the legacy path (heartbeat
  1978. // disabled) and the heartbeat-backoff path (needsBackoff=true).
  1979. // Merged so the suspension detector covers both; previously the
  1980. // backoff path had no overrun check and could go straight back
  1981. // under for 10 min after a laptop wake. Use atCapMs when enabled,
  1982. // else the heartbeat interval as a floor (guaranteed > 0 on the
  1983. // backoff path) so heartbeat-only configs don't tight-loop.
  1984. const sleepMs =
  1985. atCapMs > 0
  1986. ? atCapMs
  1987. : pollConfig.non_exclusive_heartbeat_interval_ms
  1988. if (sleepMs > 0) {
  1989. const cap = capacitySignal()
  1990. const sleepStart = Date.now()
  1991. await sleep(sleepMs, cap.signal)
  1992. cap.cleanup()
  1993. // Process-suspension detector. A setTimeout overshooting its
  1994. // deadline by 60s means the process was suspended (laptop lid,
  1995. // SIGSTOP, VM pause) — even a pathological GC pause is seconds,
  1996. // not minutes. Early aborts (wakePollLoop → cap.signal) produce
  1997. // overrun < 0 and fall through. Note: this only catches sleeps
  1998. // that outlast their deadline; WebSocketTransport's ping
  1999. // interval (10s granularity) is the primary detector for shorter
  2000. // suspensions. This is the backstop for when that detector isn't
  2001. // running (transport mid-reconnect, interval stopped).
  2002. const overrun = Date.now() - sleepStart - sleepMs
  2003. if (overrun > 60_000) {
  2004. logForDebugging(
  2005. `[bridge:repl] At-capacity sleep overran by ${Math.round(overrun / 1000)}s — process suspension detected, forcing one fast-poll cycle`,
  2006. )
  2007. logEvent('tengu_bridge_repl_suspension_detected', {
  2008. overrun_ms: overrun,
  2009. })
  2010. suspensionDetected = true
  2011. }
  2012. }
  2013. } else {
  2014. await sleep(pollConfig.poll_interval_ms_not_at_capacity, signal)
  2015. }
  2016. continue
  2017. }
  2018. // Decode before type dispatch — need the JWT for the explicit ack.
  2019. let secret
  2020. try {
  2021. secret = decodeWorkSecret(work.secret)
  2022. } catch (err) {
  2023. logForDebugging(
  2024. `[bridge:repl] Failed to decode work secret: ${errorMessage(err)}`,
  2025. )
  2026. logEvent('tengu_bridge_repl_work_secret_failed', {})
  2027. // Can't ack (needs the JWT we failed to decode). stopWork uses OAuth.
  2028. // Prevents XAUTOCLAIM re-delivering this poisoned item every cycle.
  2029. await api.stopWork(envId, work.id, false).catch(() => {})
  2030. continue
  2031. }
  2032. // Explicitly acknowledge to prevent redelivery. Non-fatal on failure:
  2033. // server re-delivers, and the onWorkReceived callback handles dedup.
  2034. logForDebugging(`[bridge:repl] Acknowledging workId=${work.id}`)
  2035. try {
  2036. await api.acknowledgeWork(envId, work.id, secret.session_ingress_token)
  2037. } catch (err) {
  2038. logForDebugging(
  2039. `[bridge:repl] Acknowledge failed workId=${work.id}: ${errorMessage(err)}`,
  2040. )
  2041. }
  2042. if (work.data.type === 'healthcheck') {
  2043. logForDebugging('[bridge:repl] Healthcheck received')
  2044. continue
  2045. }
  2046. if (work.data.type === 'session') {
  2047. const workSessionId = work.data.id
  2048. try {
  2049. validateBridgeId(workSessionId, 'session_id')
  2050. } catch {
  2051. logForDebugging(
  2052. `[bridge:repl] Invalid session_id in work: ${workSessionId}`,
  2053. )
  2054. continue
  2055. }
  2056. onWorkReceived(
  2057. workSessionId,
  2058. secret.session_ingress_token,
  2059. work.id,
  2060. secret.use_code_sessions === true,
  2061. )
  2062. logForDebugging('[bridge:repl] Work accepted, continuing poll loop')
  2063. }
  2064. } catch (err) {
  2065. if (signal.aborted) break
  2066. // Detect permanent "environment deleted" error — no amount of
  2067. // retrying will recover. Re-register a new environment instead.
  2068. // Checked BEFORE the generic BridgeFatalError bail. pollForWork uses
  2069. // validateStatus: s => s < 500, so 404 is always wrapped into a
  2070. // BridgeFatalError by handleErrorStatus() — never an axios-shaped
  2071. // error. The poll endpoint's only path param is the env ID; 404
  2072. // unambiguously means env-gone (no-work is a 200 with null body).
  2073. // The server sends error.type='not_found_error' (standard Anthropic
  2074. // API shape), not a bridge-specific string — but status===404 is
  2075. // the real signal and survives body-shape changes.
  2076. if (
  2077. err instanceof BridgeFatalError &&
  2078. err.status === 404 &&
  2079. onEnvironmentLost
  2080. ) {
  2081. // If credentials have already been refreshed by a concurrent
  2082. // reconnection (e.g. WS close handler), the stale poll's error
  2083. // is expected — skip onEnvironmentLost and retry with fresh creds.
  2084. const currentEnvId = getCredentials().environmentId
  2085. if (envId !== currentEnvId) {
  2086. logForDebugging(
  2087. `[bridge:repl] Stale poll error for old env=${envId}, current env=${currentEnvId} — skipping onEnvironmentLost`,
  2088. )
  2089. consecutiveErrors = 0
  2090. firstErrorTime = null
  2091. continue
  2092. }
  2093. environmentRecreations++
  2094. logForDebugging(
  2095. `[bridge:repl] Environment deleted, attempting re-registration (attempt ${environmentRecreations}/${MAX_ENVIRONMENT_RECREATIONS})`,
  2096. )
  2097. logEvent('tengu_bridge_repl_env_lost', {
  2098. attempt: environmentRecreations,
  2099. } as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS)
  2100. if (environmentRecreations > MAX_ENVIRONMENT_RECREATIONS) {
  2101. logForDebugging(
  2102. `[bridge:repl] Environment re-registration limit reached (${MAX_ENVIRONMENT_RECREATIONS}), giving up`,
  2103. )
  2104. onStateChange?.(
  2105. 'failed',
  2106. 'Environment deleted and re-registration limit reached',
  2107. )
  2108. onFatalError?.()
  2109. break
  2110. }
  2111. onStateChange?.('reconnecting', 'environment lost, recreating session')
  2112. const newCreds = await onEnvironmentLost()
  2113. // doReconnect() makes several sequential network calls (1-5s).
  2114. // If the user triggered teardown during that window, its internal
  2115. // abort checks return false — but we need to re-check here to
  2116. // avoid emitting a spurious 'failed' + onFatalError() during
  2117. // graceful shutdown.
  2118. if (signal.aborted) break
  2119. if (newCreds) {
  2120. // Credentials are updated in the outer scope via
  2121. // reconnectEnvironmentWithSession — getCredentials() will
  2122. // return the fresh values on the next poll iteration.
  2123. // Do NOT reset environmentRecreations here — onEnvLost returning
  2124. // creds only proves we tried to fix it, not that the env is
  2125. // healthy. A successful poll (above) is the reset point; if the
  2126. // new env immediately dies again we still want the limit to fire.
  2127. consecutiveErrors = 0
  2128. firstErrorTime = null
  2129. onStateChange?.('ready')
  2130. logForDebugging(
  2131. `[bridge:repl] Re-registered environment: ${newCreds.environmentId}`,
  2132. )
  2133. continue
  2134. }
  2135. onStateChange?.(
  2136. 'failed',
  2137. 'Environment deleted and re-registration failed',
  2138. )
  2139. onFatalError?.()
  2140. break
  2141. }
  2142. // Fatal errors (401/403/404/410) — no point retrying
  2143. if (err instanceof BridgeFatalError) {
  2144. const isExpiry = isExpiredErrorType(err.errorType)
  2145. const isSuppressible = isSuppressible403(err)
  2146. logForDebugging(
  2147. `[bridge:repl] Fatal poll error: ${err.message} (status=${err.status}, type=${err.errorType ?? 'unknown'})${isSuppressible ? ' (suppressed)' : ''}`,
  2148. )
  2149. logEvent('tengu_bridge_repl_fatal_error', {
  2150. status: err.status,
  2151. error_type:
  2152. err.errorType as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS,
  2153. })
  2154. logForDiagnosticsNoPII(
  2155. isExpiry ? 'info' : 'error',
  2156. 'bridge_repl_fatal_error',
  2157. { status: err.status, error_type: err.errorType },
  2158. )
  2159. // Cosmetic 403 errors (e.g., external_poll_sessions scope,
  2160. // environments:manage permission) — suppress user-visible error
  2161. // but always trigger teardown so cleanup runs.
  2162. if (!isSuppressible) {
  2163. onStateChange?.(
  2164. 'failed',
  2165. isExpiry
  2166. ? 'session expired · /remote-control to reconnect'
  2167. : err.message,
  2168. )
  2169. }
  2170. // Always trigger teardown — matches bridgeMain.ts where fatalExit=true
  2171. // is unconditional and post-loop cleanup always runs.
  2172. onFatalError?.()
  2173. break
  2174. }
  2175. const now = Date.now()
  2176. // Detect system sleep/wake: if the gap since the last poll error
  2177. // greatly exceeds the max backoff delay, the machine likely slept.
  2178. // Reset error tracking so we retry with a fresh budget instead of
  2179. // immediately giving up.
  2180. if (
  2181. lastPollErrorTime !== null &&
  2182. now - lastPollErrorTime > POLL_ERROR_MAX_DELAY_MS * 2
  2183. ) {
  2184. logForDebugging(
  2185. `[bridge:repl] Detected system sleep (${Math.round((now - lastPollErrorTime) / 1000)}s gap), resetting poll error budget`,
  2186. )
  2187. logForDiagnosticsNoPII('info', 'bridge_repl_poll_sleep_detected', {
  2188. gapMs: now - lastPollErrorTime,
  2189. })
  2190. consecutiveErrors = 0
  2191. firstErrorTime = null
  2192. }
  2193. lastPollErrorTime = now
  2194. consecutiveErrors++
  2195. if (firstErrorTime === null) {
  2196. firstErrorTime = now
  2197. }
  2198. const elapsed = now - firstErrorTime
  2199. const httpStatus = extractHttpStatus(err)
  2200. const errMsg = describeAxiosError(err)
  2201. const wsLabel = getWsState?.() ?? 'unknown'
  2202. logForDebugging(
  2203. `[bridge:repl] Poll error (attempt ${consecutiveErrors}, elapsed ${Math.round(elapsed / 1000)}s, ws=${wsLabel}): ${errMsg}`,
  2204. )
  2205. logEvent('tengu_bridge_repl_poll_error', {
  2206. status: httpStatus,
  2207. consecutiveErrors,
  2208. elapsedMs: elapsed,
  2209. } as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS)
  2210. // Only transition to 'reconnecting' on the first error — stay
  2211. // there until a successful poll (avoid flickering the UI state).
  2212. if (consecutiveErrors === 1) {
  2213. onStateChange?.('reconnecting', errMsg)
  2214. }
  2215. // Give up after continuous failures
  2216. if (elapsed >= POLL_ERROR_GIVE_UP_MS) {
  2217. logForDebugging(
  2218. `[bridge:repl] Poll failures exceeded ${POLL_ERROR_GIVE_UP_MS / 1000}s (${consecutiveErrors} errors), giving up`,
  2219. )
  2220. logForDiagnosticsNoPII('info', 'bridge_repl_poll_give_up')
  2221. logEvent('tengu_bridge_repl_poll_give_up', {
  2222. consecutiveErrors,
  2223. elapsedMs: elapsed,
  2224. lastStatus: httpStatus,
  2225. } as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS)
  2226. onStateChange?.('failed', 'connection to server lost')
  2227. break
  2228. }
  2229. // Exponential backoff: 2s → 4s → 8s → 16s → 32s → 60s (cap)
  2230. const backoff = Math.min(
  2231. POLL_ERROR_INITIAL_DELAY_MS * 2 ** (consecutiveErrors - 1),
  2232. POLL_ERROR_MAX_DELAY_MS,
  2233. )
  2234. // The poll_due heartbeat-loop exit leaves a healthy lease exposed to
  2235. // this backoff path. Heartbeat before each sleep so /poll outages
  2236. // (the VerifyEnvironmentSecretAuth DB path heartbeat was introduced to
  2237. // avoid) don't kill the 300s lease TTL.
  2238. if (getPollIntervalConfig().non_exclusive_heartbeat_interval_ms > 0) {
  2239. const info = getHeartbeatInfo?.()
  2240. if (info) {
  2241. try {
  2242. await api.heartbeatWork(
  2243. info.environmentId,
  2244. info.workId,
  2245. info.sessionToken,
  2246. )
  2247. } catch {
  2248. // Best-effort — if heartbeat also fails the lease dies, same as
  2249. // pre-poll_due behavior (where the only heartbeat-loop exits were
  2250. // ones where the lease was already dying).
  2251. }
  2252. }
  2253. }
  2254. await sleep(backoff, signal)
  2255. }
  2256. }
  2257. logForDebugging(
  2258. `[bridge:repl] Work poll loop ended (aborted=${signal.aborted}) env=${getCredentials().environmentId}`,
  2259. )
  2260. }
  2261. // Exported for testing only
  2262. export {
  2263. startWorkPollLoop as _startWorkPollLoopForTesting,
  2264. POLL_ERROR_INITIAL_DELAY_MS as _POLL_ERROR_INITIAL_DELAY_MS_ForTesting,
  2265. POLL_ERROR_MAX_DELAY_MS as _POLL_ERROR_MAX_DELAY_MS_ForTesting,
  2266. POLL_ERROR_GIVE_UP_MS as _POLL_ERROR_GIVE_UP_MS_ForTesting,
  2267. }