| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968196919701971197219731974197519761977197819791980198119821983198419851986198719881989199019911992199319941995199619971998199920002001200220032004200520062007200820092010201120122013201420152016201720182019202020212022202320242025202620272028202920302031203220332034203520362037203820392040204120422043204420452046204720482049205020512052205320542055205620572058205920602061206220632064206520662067206820692070207120722073207420752076207720782079208020812082208320842085208620872088208920902091209220932094209520962097209820992100210121022103210421052106210721082109211021112112211321142115211621172118211921202121212221232124212521262127212821292130213121322133213421352136213721382139214021412142214321442145214621472148214921502151215221532154215521562157215821592160216121622163216421652166216721682169217021712172217321742175217621772178217921802181218221832184218521862187218821892190219121922193219421952196219721982199220022012202220322042205220622072208220922102211221222132214221522162217221822192220222122222223222422252226222722282229223022312232223322342235223622372238223922402241224222432244224522462247224822492250225122522253225422552256225722582259226022612262226322642265226622672268226922702271227222732274227522762277227822792280228122822283228422852286228722882289229022912292229322942295229622972298229923002301230223032304230523062307230823092310231123122313231423152316231723182319232023212322232323242325232623272328232923302331233223332334233523362337233823392340234123422343234423452346234723482349235023512352235323542355235623572358235923602361236223632364236523662367236823692370237123722373237423752376237723782379238023812382238323842385238623872388238923902391239223932394239523962397239823992400240124022403240424052406 |
- // biome-ignore-all assist/source/organizeImports: ANT-ONLY import markers must not be reordered
- import { randomUUID } from 'crypto'
- import {
- createBridgeApiClient,
- BridgeFatalError,
- isExpiredErrorType,
- isSuppressible403,
- } from './bridgeApi.js'
- import type { BridgeConfig, BridgeApiClient } from './types.js'
- import { logForDebugging } from '../utils/debug.js'
- import { logForDiagnosticsNoPII } from '../utils/diagLogs.js'
- import {
- type AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS,
- logEvent,
- } from '../services/analytics/index.js'
- import { registerCleanup } from '../utils/cleanupRegistry.js'
- import {
- handleIngressMessage,
- handleServerControlRequest,
- makeResultMessage,
- isEligibleBridgeMessage,
- extractTitleText,
- BoundedUUIDSet,
- } from './bridgeMessaging.js'
- import {
- decodeWorkSecret,
- buildSdkUrl,
- buildCCRv2SdkUrl,
- sameSessionId,
- } from './workSecret.js'
- import { toCompatSessionId, toInfraSessionId } from './sessionIdCompat.js'
- import { updateSessionBridgeId } from '../utils/concurrentSessions.js'
- import { getTrustedDeviceToken } from './trustedDevice.js'
- import { HybridTransport } from '../cli/transports/HybridTransport.js'
- import {
- type ReplBridgeTransport,
- createV1ReplTransport,
- createV2ReplTransport,
- } from './replBridgeTransport.js'
- import { updateSessionIngressAuthToken } from '../utils/sessionIngressAuth.js'
- import { isEnvTruthy, isInProtectedNamespace } from '../utils/envUtils.js'
- import { validateBridgeId } from './bridgeApi.js'
- import {
- describeAxiosError,
- extractHttpStatus,
- logBridgeSkip,
- } from './debugUtils.js'
- import type { Message } from '../types/message.js'
- import type { SDKMessage } from '../entrypoints/agentSdkTypes.js'
- import type { PermissionMode } from '../utils/permissions/PermissionMode.js'
- import type {
- SDKControlRequest,
- SDKControlResponse,
- } from '../entrypoints/sdk/controlTypes.js'
- import { createCapacityWake, type CapacitySignal } from './capacityWake.js'
- import { FlushGate } from './flushGate.js'
- import {
- DEFAULT_POLL_CONFIG,
- type PollIntervalConfig,
- } from './pollConfigDefaults.js'
- import { errorMessage } from '../utils/errors.js'
- import { sleep } from '../utils/sleep.js'
- import {
- wrapApiForFaultInjection,
- registerBridgeDebugHandle,
- clearBridgeDebugHandle,
- injectBridgeFault,
- } from './bridgeDebug.js'
- export type ReplBridgeHandle = {
- bridgeSessionId: string
- environmentId: string
- sessionIngressUrl: string
- writeMessages(messages: Message[]): void
- writeSdkMessages(messages: SDKMessage[]): void
- sendControlRequest(request: SDKControlRequest): void
- sendControlResponse(response: SDKControlResponse): void
- sendControlCancelRequest(requestId: string): void
- sendResult(): void
- teardown(): Promise<void>
- }
- export type BridgeState = 'ready' | 'connected' | 'reconnecting' | 'failed'
- /**
- * Explicit-param input to initBridgeCore. Everything initReplBridge reads
- * from bootstrap state (cwd, session ID, git, OAuth) becomes a field here.
- * A daemon caller (Agent SDK, PR 4) that never runs main.tsx fills these
- * in itself.
- */
- export type BridgeCoreParams = {
- dir: string
- machineName: string
- branch: string
- gitRepoUrl: string | null
- title: string
- baseUrl: string
- sessionIngressUrl: string
- /**
- * Opaque string sent as metadata.worker_type. Use BridgeWorkerType for
- * the two CLI-originated values; daemon callers may send any string the
- * backend recognizes (it's just a filter key on the web side).
- */
- workerType: string
- getAccessToken: () => string | undefined
- /**
- * POST /v1/sessions. Injected because `createSession.ts` lazy-loads
- * `auth.ts`/`model.ts`/`oauth/client.ts` and `bun --outfile` inlines
- * dynamic imports — the lazy-load doesn't help, the whole REPL tree ends
- * up in the Agent SDK bundle.
- *
- * REPL wrapper passes `createBridgeSession` from `createSession.ts`.
- * Daemon wrapper passes `createBridgeSessionLean` from `sessionApi.ts`
- * (HTTP-only, orgUUID+model supplied by the daemon caller).
- *
- * Receives `gitRepoUrl`+`branch` so the REPL wrapper can build the git
- * source/outcome for claude.ai's session card. Daemon ignores them.
- */
- createSession: (opts: {
- environmentId: string
- title: string
- gitRepoUrl: string | null
- branch: string
- signal: AbortSignal
- }) => Promise<string | null>
- /**
- * POST /v1/sessions/{id}/archive. Same injection rationale. Best-effort;
- * the callback MUST NOT throw.
- */
- archiveSession: (sessionId: string) => Promise<void>
- /**
- * Invoked on reconnect-after-env-lost to refresh the title. REPL wrapper
- * reads session storage (picks up /rename); daemon returns the static
- * title. Defaults to () => title.
- */
- getCurrentTitle?: () => string
- /**
- * Converts internal Message[] → SDKMessage[] for writeMessages() and the
- * initial-flush/drain paths. REPL wrapper passes the real toSDKMessages
- * from utils/messages/mappers.ts. Daemon callers that only use
- * writeSdkMessages() and pass no initialMessages can omit this — those
- * code paths are unreachable.
- *
- * Injected rather than imported because mappers.ts transitively pulls in
- * src/commands.ts via messages.ts → api.ts → prompts.ts, dragging the
- * entire command registry + React tree into the Agent SDK bundle.
- */
- toSDKMessages?: (messages: Message[]) => SDKMessage[]
- /**
- * OAuth 401 refresh handler passed to createBridgeApiClient. REPL wrapper
- * passes handleOAuth401Error; daemon passes its AuthManager's handler.
- * Injected because utils/auth.ts transitively pulls in the command
- * registry via config.ts → file.ts → permissions/filesystem.ts →
- * sessionStorage.ts → commands.ts.
- */
- onAuth401?: (staleAccessToken: string) => Promise<boolean>
- /**
- * Poll interval config getter for the work-poll heartbeat loop. REPL
- * wrapper passes the GrowthBook-backed getPollIntervalConfig (allows ops
- * to live-tune poll rates fleet-wide). Daemon passes a static config
- * with a 60s heartbeat (5× headroom under the 300s work-lease TTL).
- * Injected because growthbook.ts transitively pulls in the command
- * registry via the same config.ts chain.
- */
- getPollIntervalConfig?: () => PollIntervalConfig
- /**
- * Max initial messages to replay on connect. REPL wrapper reads from the
- * tengu_bridge_initial_history_cap GrowthBook flag. Daemon passes no
- * initialMessages so this is never read. Default 200 matches the flag
- * default.
- */
- initialHistoryCap?: number
- // Same REPL-flush machinery as InitBridgeOptions — daemon omits these.
- initialMessages?: Message[]
- previouslyFlushedUUIDs?: Set<string>
- onInboundMessage?: (msg: SDKMessage) => void
- onPermissionResponse?: (response: SDKControlResponse) => void
- onInterrupt?: () => void
- onSetModel?: (model: string | undefined) => void
- onSetMaxThinkingTokens?: (maxTokens: number | null) => void
- /**
- * Returns a policy verdict so this module can emit an error control_response
- * without importing the policy checks itself (bootstrap-isolation constraint).
- * The callback must guard `auto` (isAutoModeGateEnabled) and
- * `bypassPermissions` (isBypassPermissionsModeDisabled AND
- * isBypassPermissionsModeAvailable) BEFORE calling transitionPermissionMode —
- * that function's internal auto-gate check is a defensive throw, not a
- * graceful guard, and its side-effect order is setAutoModeActive(true) then
- * throw, which corrupts the 3-way invariant documented in src/CLAUDE.md if
- * the callback lets the throw escape here.
- */
- onSetPermissionMode?: (
- mode: PermissionMode,
- ) => { ok: true } | { ok: false; error: string }
- onStateChange?: (state: BridgeState, detail?: string) => void
- /**
- * Fires on each real user message to flow through writeMessages() until
- * the callback returns true (done). Mirrors remoteBridgeCore.ts's
- * onUserMessage so the REPL bridge can derive a session title from early
- * prompts when none was set at init time (e.g. user runs /remote-control
- * on an empty conversation, then types). Tool-result wrappers, meta
- * messages, and display-tag-only messages are skipped. Receives
- * currentSessionId so the wrapper can PATCH the title without a closure
- * dance to reach the not-yet-returned handle. The caller owns the
- * derive-at-count-1-and-3 policy; the transport just keeps calling until
- * told to stop. Not fired for the writeSdkMessages daemon path (daemon
- * sets its own title at init). Distinct from SessionSpawnOpts's
- * onFirstUserMessage (spawn-bridge, PR #21250), which stays fire-once.
- */
- onUserMessage?: (text: string, sessionId: string) => boolean
- /** See InitBridgeOptions.perpetual. */
- perpetual?: boolean
- /**
- * Seeds lastTransportSequenceNum — the SSE event-stream high-water mark
- * that's carried across transport swaps within one process. Daemon callers
- * pass the value they persisted at shutdown so the FIRST SSE connect of a
- * fresh process sends from_sequence_num and the server doesn't replay full
- * history. REPL callers omit (fresh session each run → 0 is correct).
- */
- initialSSESequenceNum?: number
- }
- /**
- * Superset of ReplBridgeHandle. Adds getSSESequenceNum for daemon callers
- * that persist the SSE seq-num across process restarts and pass it back as
- * initialSSESequenceNum on the next start.
- */
- export type BridgeCoreHandle = ReplBridgeHandle & {
- /**
- * Current SSE sequence-number high-water mark. Updates as transports
- * swap. Daemon callers persist this on shutdown and pass it back as
- * initialSSESequenceNum on next start.
- */
- getSSESequenceNum(): number
- }
- /**
- * Poll error recovery constants. When the work poll starts failing (e.g.
- * server 500s), we use exponential backoff and give up after this timeout.
- * This is deliberately long — the server is the authority on when a session
- * is truly dead. As long as the server accepts our poll, we keep waiting
- * for it to re-dispatch the work item.
- */
- const POLL_ERROR_INITIAL_DELAY_MS = 2_000
- const POLL_ERROR_MAX_DELAY_MS = 60_000
- const POLL_ERROR_GIVE_UP_MS = 15 * 60 * 1000
- // Monotonically increasing counter for distinguishing init calls in logs
- let initSequence = 0
- /**
- * Bootstrap-free core: env registration → session creation → poll loop →
- * ingress WS → teardown. Reads nothing from bootstrap/state or
- * sessionStorage — all context comes from params. Caller (initReplBridge
- * below, or a daemon in PR 4) has already passed entitlement gates and
- * gathered git/auth/title.
- *
- * Returns null on registration or session-creation failure.
- */
- export async function initBridgeCore(
- params: BridgeCoreParams,
- ): Promise<BridgeCoreHandle | null> {
- const {
- dir,
- machineName,
- branch,
- gitRepoUrl,
- title,
- baseUrl,
- sessionIngressUrl,
- workerType,
- getAccessToken,
- createSession,
- archiveSession,
- getCurrentTitle = () => title,
- toSDKMessages = () => {
- throw new Error(
- 'BridgeCoreParams.toSDKMessages not provided. Pass it if you use writeMessages() or initialMessages — daemon callers that only use writeSdkMessages() never hit this path.',
- )
- },
- onAuth401,
- getPollIntervalConfig = () => DEFAULT_POLL_CONFIG,
- initialHistoryCap = 200,
- initialMessages,
- previouslyFlushedUUIDs,
- onInboundMessage,
- onPermissionResponse,
- onInterrupt,
- onSetModel,
- onSetMaxThinkingTokens,
- onSetPermissionMode,
- onStateChange,
- onUserMessage,
- perpetual,
- initialSSESequenceNum = 0,
- } = params
- const seq = ++initSequence
- // bridgePointer import hoisted: perpetual mode reads it before register;
- // non-perpetual writes it after session create; both use clear at teardown.
- const { writeBridgePointer, clearBridgePointer, readBridgePointer } =
- await import('./bridgePointer.js')
- // Perpetual mode: read the crash-recovery pointer and treat it as prior
- // state. The pointer is written unconditionally after session create
- // (crash-recovery for all sessions); perpetual mode just skips the
- // teardown clear so it survives clean exits too. Only reuse 'repl'
- // pointers — a crashed standalone bridge (`claude remote-control`)
- // writes source:'standalone' with a different workerType.
- const rawPrior = perpetual ? await readBridgePointer(dir) : null
- const prior = rawPrior?.source === 'repl' ? rawPrior : null
- logForDebugging(
- `[bridge:repl] initBridgeCore #${seq} starting (initialMessages=${initialMessages?.length ?? 0}${prior ? ` perpetual prior=env:${prior.environmentId}` : ''})`,
- )
- // 5. Register bridge environment
- const rawApi = createBridgeApiClient({
- baseUrl,
- getAccessToken,
- runnerVersion: MACRO.VERSION,
- onDebug: logForDebugging,
- onAuth401,
- getTrustedDeviceToken,
- })
- // Ant-only: interpose so /bridge-kick can inject poll/register/heartbeat
- // failures. Zero cost in external builds (rawApi passes through unchanged).
- const api =
- process.env.USER_TYPE === 'ant' ? wrapApiForFaultInjection(rawApi) : rawApi
- const bridgeConfig: BridgeConfig = {
- dir,
- machineName,
- branch,
- gitRepoUrl,
- maxSessions: 1,
- spawnMode: 'single-session',
- verbose: false,
- sandbox: false,
- bridgeId: randomUUID(),
- workerType,
- environmentId: randomUUID(),
- reuseEnvironmentId: prior?.environmentId,
- apiBaseUrl: baseUrl,
- sessionIngressUrl,
- }
- let environmentId: string
- let environmentSecret: string
- try {
- const reg = await api.registerBridgeEnvironment(bridgeConfig)
- environmentId = reg.environment_id
- environmentSecret = reg.environment_secret
- } catch (err) {
- logBridgeSkip(
- 'registration_failed',
- `[bridge:repl] Environment registration failed: ${errorMessage(err)}`,
- )
- // Stale pointer may be the cause (expired/deleted env) — clear it so
- // the next start doesn't retry the same dead ID.
- if (prior) {
- await clearBridgePointer(dir)
- }
- onStateChange?.('failed', errorMessage(err))
- return null
- }
- logForDebugging(`[bridge:repl] Environment registered: ${environmentId}`)
- logForDiagnosticsNoPII('info', 'bridge_repl_env_registered')
- logEvent('tengu_bridge_repl_env_registered', {})
- /**
- * Reconnect-in-place: if the just-registered environmentId matches what
- * was requested, call reconnectSession to force-stop stale workers and
- * re-queue the session. Used at init (perpetual mode — env is alive but
- * idle after clean teardown) and in doReconnect() Strategy 1 (env lost
- * then resurrected). Returns true on success; caller falls back to
- * fresh session creation on false.
- */
- async function tryReconnectInPlace(
- requestedEnvId: string,
- sessionId: string,
- ): Promise<boolean> {
- if (environmentId !== requestedEnvId) {
- logForDebugging(
- `[bridge:repl] Env mismatch (requested ${requestedEnvId}, got ${environmentId}) — cannot reconnect in place`,
- )
- return false
- }
- // The pointer stores what createBridgeSession returned (session_*,
- // compat/convert.go:41). /bridge/reconnect is an environments-layer
- // endpoint — once the server's ccr_v2_compat_enabled gate is on it
- // looks sessions up by their infra tag (cse_*) and returns "Session
- // not found" for the session_* costume. We don't know the gate state
- // pre-poll, so try both; the re-tag is a no-op if the ID is already
- // cse_* (doReconnect Strategy 1 path — currentSessionId never mutates
- // to cse_* but future-proof the check).
- const infraId = toInfraSessionId(sessionId)
- const candidates =
- infraId === sessionId ? [sessionId] : [sessionId, infraId]
- for (const id of candidates) {
- try {
- await api.reconnectSession(environmentId, id)
- logForDebugging(
- `[bridge:repl] Reconnected session ${id} in place on env ${environmentId}`,
- )
- return true
- } catch (err) {
- logForDebugging(
- `[bridge:repl] reconnectSession(${id}) failed: ${errorMessage(err)}`,
- )
- }
- }
- logForDebugging(
- '[bridge:repl] reconnectSession exhausted — falling through to fresh session',
- )
- return false
- }
- // Perpetual init: env is alive but has no queued work after clean
- // teardown. reconnectSession re-queues it. doReconnect() has the same
- // call but only fires on poll 404 (env dead);
- // here the env is alive but idle.
- const reusedPriorSession = prior
- ? await tryReconnectInPlace(prior.environmentId, prior.sessionId)
- : false
- if (prior && !reusedPriorSession) {
- await clearBridgePointer(dir)
- }
- // 6. Create session on the bridge. Initial messages are NOT included as
- // session creation events because those use STREAM_ONLY persistence and
- // are published before the CCR UI subscribes, so they get lost. Instead,
- // initial messages are flushed via the ingress WebSocket once it connects.
- // Mutable session ID — updated when the environment+session pair is
- // re-created after a connection loss.
- let currentSessionId: string
- if (reusedPriorSession && prior) {
- currentSessionId = prior.sessionId
- logForDebugging(
- `[bridge:repl] Perpetual session reused: ${currentSessionId}`,
- )
- // Server already has all initialMessages from the prior CLI run. Mark
- // them as previously-flushed so the initial flush filter excludes them
- // (previouslyFlushedUUIDs is a fresh Set on every CLI start). Duplicate
- // UUIDs cause the server to kill the WebSocket.
- if (initialMessages && previouslyFlushedUUIDs) {
- for (const msg of initialMessages) {
- previouslyFlushedUUIDs.add(msg.uuid)
- }
- }
- } else {
- const createdSessionId = await createSession({
- environmentId,
- title,
- gitRepoUrl,
- branch,
- signal: AbortSignal.timeout(15_000),
- })
- if (!createdSessionId) {
- logForDebugging(
- '[bridge:repl] Session creation failed, deregistering environment',
- )
- logEvent('tengu_bridge_repl_session_failed', {})
- await api.deregisterEnvironment(environmentId).catch(() => {})
- onStateChange?.('failed', 'Session creation failed')
- return null
- }
- currentSessionId = createdSessionId
- logForDebugging(`[bridge:repl] Session created: ${currentSessionId}`)
- }
- // Crash-recovery pointer: written now so a kill -9 at any point after
- // this leaves a recoverable trail. Cleared in teardown (non-perpetual)
- // or left alone (perpetual mode — pointer survives clean exit too).
- // `claude remote-control --continue` from the same directory will detect
- // it and offer to resume.
- await writeBridgePointer(dir, {
- sessionId: currentSessionId,
- environmentId,
- source: 'repl',
- })
- logForDiagnosticsNoPII('info', 'bridge_repl_session_created')
- logEvent('tengu_bridge_repl_started', {
- has_initial_messages: !!(initialMessages && initialMessages.length > 0),
- inProtectedNamespace: isInProtectedNamespace(),
- })
- // UUIDs of initial messages. Used for dedup in writeMessages to avoid
- // re-sending messages that were already flushed on WebSocket open.
- const initialMessageUUIDs = new Set<string>()
- if (initialMessages) {
- for (const msg of initialMessages) {
- initialMessageUUIDs.add(msg.uuid)
- }
- }
- // Bounded ring buffer of UUIDs for messages we've already sent to the
- // server via the ingress WebSocket. Serves two purposes:
- // 1. Echo filtering — ignore our own messages bouncing back on the WS.
- // 2. Secondary dedup in writeMessages — catch race conditions where
- // the hook's index-based tracking isn't sufficient.
- //
- // Seeded with initialMessageUUIDs so that when the server echoes back
- // the initial conversation context over the ingress WebSocket, those
- // messages are recognized as echoes and not re-injected into the REPL.
- //
- // Capacity of 2000 covers well over any realistic echo window (echoes
- // arrive within milliseconds) and any messages that might be re-encountered
- // after compaction. The hook's lastWrittenIndexRef is the primary dedup;
- // this is a safety net.
- const recentPostedUUIDs = new BoundedUUIDSet(2000)
- for (const uuid of initialMessageUUIDs) {
- recentPostedUUIDs.add(uuid)
- }
- // Bounded set of INBOUND prompt UUIDs we've already forwarded to the REPL.
- // Defensive dedup for when the server re-delivers prompts (seq-num
- // negotiation failure, server edge cases, transport swap races). The
- // seq-num carryover below is the primary fix; this is the safety net.
- const recentInboundUUIDs = new BoundedUUIDSet(2000)
- // 7. Start poll loop for work items — this is what makes the session
- // "live" on claude.ai. When a user types there, the backend dispatches
- // a work item to our environment. We poll for it, get the ingress token,
- // and connect the ingress WebSocket.
- //
- // The poll loop keeps running: when work arrives it connects the ingress
- // WebSocket, and if the WebSocket drops unexpectedly (code != 1000) it
- // resumes polling to get a fresh ingress token and reconnect.
- const pollController = new AbortController()
- // Adapter over either HybridTransport (v1: WS reads + POST writes to
- // Session-Ingress) or SSETransport+CCRClient (v2: SSE reads + POST
- // writes to CCR /worker/*). The v1/v2 choice is made in onWorkReceived:
- // server-driven via secret.use_code_sessions, with CLAUDE_BRIDGE_USE_CCR_V2
- // as an ant-dev override.
- let transport: ReplBridgeTransport | null = null
- // Bumped on every onWorkReceived. Captured in createV2ReplTransport's .then()
- // closure to detect stale resolutions: if two calls race while transport is
- // null, both registerWorker() (bumping server epoch), and whichever resolves
- // SECOND is the correct one — but the transport !== null check gets this
- // backwards (first-to-resolve installs, second discards). The generation
- // counter catches it independent of transport state.
- let v2Generation = 0
- // SSE sequence-number high-water mark carried across transport swaps.
- // Without this, each new SSETransport starts at 0, sends no
- // from_sequence_num / Last-Event-ID on its first connect, and the server
- // replays the entire session event history — every prompt ever sent
- // re-delivered as fresh inbound messages on every onWorkReceived.
- //
- // Seed only when we actually reconnected the prior session. If
- // `reusedPriorSession` is false we fell through to `createSession()` —
- // the caller's persisted seq-num belongs to a dead session and applying
- // it to the fresh stream (starting at 1) silently drops events. Same
- // hazard as doReconnect Strategy 2; same fix as the reset there.
- let lastTransportSequenceNum = reusedPriorSession ? initialSSESequenceNum : 0
- // Track the current work ID so teardown can call stopWork
- let currentWorkId: string | null = null
- // Session ingress JWT for the current work item — used for heartbeat auth.
- let currentIngressToken: string | null = null
- // Signal to wake the at-capacity sleep early when the transport is lost,
- // so the poll loop immediately switches back to fast polling for new work.
- const capacityWake = createCapacityWake(pollController.signal)
- const wakePollLoop = capacityWake.wake
- const capacitySignal = capacityWake.signal
- // Gates message writes during the initial flush to prevent ordering
- // races where new messages arrive at the server interleaved with history.
- const flushGate = new FlushGate<Message>()
- // Latch for onUserMessage — flips true when the callback returns true
- // (policy says "done deriving"). If no callback, skip scanning entirely
- // (daemon path — no title derivation needed).
- let userMessageCallbackDone = !onUserMessage
- // Shared counter for environment re-creations, used by both
- // onEnvironmentLost and the abnormal-close handler.
- const MAX_ENVIRONMENT_RECREATIONS = 3
- let environmentRecreations = 0
- let reconnectPromise: Promise<boolean> | null = null
- /**
- * Recover from onEnvironmentLost (poll returned 404 — env was reaped
- * server-side). Tries two strategies in order:
- *
- * 1. Reconnect-in-place: idempotent re-register with reuseEnvironmentId
- * → if the backend returns the same env ID, call reconnectSession()
- * to re-queue the existing session. currentSessionId stays the same;
- * the URL on the user's phone stays valid; previouslyFlushedUUIDs is
- * preserved so history isn't re-sent.
- *
- * 2. Fresh session fallback: if the backend returns a different env ID
- * (original TTL-expired, e.g. laptop slept >4h) or reconnectSession()
- * throws, archive the old session and create a new one on the
- * now-registered env. Old behavior before #20460 primitives landed.
- *
- * Uses a promise-based reentrancy guard so concurrent callers share the
- * same reconnection attempt.
- */
- async function reconnectEnvironmentWithSession(): Promise<boolean> {
- if (reconnectPromise) {
- return reconnectPromise
- }
- reconnectPromise = doReconnect()
- try {
- return await reconnectPromise
- } finally {
- reconnectPromise = null
- }
- }
- async function doReconnect(): Promise<boolean> {
- environmentRecreations++
- // Invalidate any in-flight v2 handshake — the environment is being
- // recreated, so a stale transport arriving post-reconnect would be
- // pointed at a dead session.
- v2Generation++
- logForDebugging(
- `[bridge:repl] Reconnecting after env lost (attempt ${environmentRecreations}/${MAX_ENVIRONMENT_RECREATIONS})`,
- )
- if (environmentRecreations > MAX_ENVIRONMENT_RECREATIONS) {
- logForDebugging(
- `[bridge:repl] Environment reconnect limit reached (${MAX_ENVIRONMENT_RECREATIONS}), giving up`,
- )
- return false
- }
- // Close the stale transport. Capture seq BEFORE close — if Strategy 1
- // (tryReconnectInPlace) succeeds we keep the SAME session, and the
- // next transport must resume where this one left off, not replay from
- // the last transport-swap checkpoint.
- if (transport) {
- const seq = transport.getLastSequenceNum()
- if (seq > lastTransportSequenceNum) {
- lastTransportSequenceNum = seq
- }
- transport.close()
- transport = null
- }
- // Transport is gone — wake the poll loop out of its at-capacity
- // heartbeat sleep so it can fast-poll for re-dispatched work.
- wakePollLoop()
- // Reset flush gate so writeMessages() hits the !transport guard
- // instead of silently queuing into a dead buffer.
- flushGate.drop()
- // Release the current work item (force=false — we may want the session
- // back). Best-effort: the env is probably gone, so this likely 404s.
- if (currentWorkId) {
- const workIdBeingCleared = currentWorkId
- await api
- .stopWork(environmentId, workIdBeingCleared, false)
- .catch(() => {})
- // When doReconnect runs concurrently with the poll loop (ws_closed
- // handler case — void-called, unlike the awaited onEnvironmentLost
- // path), onWorkReceived can fire during the stopWork await and set
- // a fresh currentWorkId. If it did, the poll loop has already
- // recovered on its own — defer to it rather than proceeding to
- // archiveSession, which would destroy the session its new
- // transport is connected to.
- if (currentWorkId !== workIdBeingCleared) {
- logForDebugging(
- '[bridge:repl] Poll loop recovered during stopWork await — deferring to it',
- )
- environmentRecreations = 0
- return true
- }
- currentWorkId = null
- currentIngressToken = null
- }
- // Bail out if teardown started while we were awaiting
- if (pollController.signal.aborted) {
- logForDebugging('[bridge:repl] Reconnect aborted by teardown')
- return false
- }
- // Strategy 1: idempotent re-register with the server-issued env ID.
- // If the backend resurrects the same env (fresh secret), we can
- // reconnect the existing session. If it hands back a different ID, the
- // original env is truly gone and we fall through to a fresh session.
- const requestedEnvId = environmentId
- bridgeConfig.reuseEnvironmentId = requestedEnvId
- try {
- const reg = await api.registerBridgeEnvironment(bridgeConfig)
- environmentId = reg.environment_id
- environmentSecret = reg.environment_secret
- } catch (err) {
- bridgeConfig.reuseEnvironmentId = undefined
- logForDebugging(
- `[bridge:repl] Environment re-registration failed: ${errorMessage(err)}`,
- )
- return false
- }
- // Clear before any await — a stale value would poison the next fresh
- // registration if doReconnect runs again.
- bridgeConfig.reuseEnvironmentId = undefined
- logForDebugging(
- `[bridge:repl] Re-registered: requested=${requestedEnvId} got=${environmentId}`,
- )
- // Bail out if teardown started while we were registering
- if (pollController.signal.aborted) {
- logForDebugging(
- '[bridge:repl] Reconnect aborted after env registration, cleaning up',
- )
- await api.deregisterEnvironment(environmentId).catch(() => {})
- return false
- }
- // Same race as above, narrower window: poll loop may have set up a
- // transport during the registerBridgeEnvironment await. Bail before
- // tryReconnectInPlace/archiveSession kill it server-side.
- if (transport !== null) {
- logForDebugging(
- '[bridge:repl] Poll loop recovered during registerBridgeEnvironment await — deferring to it',
- )
- environmentRecreations = 0
- return true
- }
- // Strategy 1: same helper as perpetual init. currentSessionId stays
- // the same on success; URL on mobile/web stays valid;
- // previouslyFlushedUUIDs preserved (no re-flush).
- if (await tryReconnectInPlace(requestedEnvId, currentSessionId)) {
- logEvent('tengu_bridge_repl_reconnected_in_place', {})
- environmentRecreations = 0
- return true
- }
- // Env differs → TTL-expired/reaped; or reconnect failed.
- // Don't deregister — we have a fresh secret for this env either way.
- if (environmentId !== requestedEnvId) {
- logEvent('tengu_bridge_repl_env_expired_fresh_session', {})
- }
- // Strategy 2: fresh session on the now-registered environment.
- // Archive the old session first — it's orphaned (bound to a dead env,
- // or reconnectSession rejected it). Don't deregister the env — we just
- // got a fresh secret for it and are about to use it.
- await archiveSession(currentSessionId)
- // Bail out if teardown started while we were archiving
- if (pollController.signal.aborted) {
- logForDebugging(
- '[bridge:repl] Reconnect aborted after archive, cleaning up',
- )
- await api.deregisterEnvironment(environmentId).catch(() => {})
- return false
- }
- // Re-read the current title in case the user renamed the session.
- // REPL wrapper reads session storage; daemon wrapper returns the
- // original title (nothing to refresh).
- const currentTitle = getCurrentTitle()
- // Create a new session on the now-registered environment
- const newSessionId = await createSession({
- environmentId,
- title: currentTitle,
- gitRepoUrl,
- branch,
- signal: AbortSignal.timeout(15_000),
- })
- if (!newSessionId) {
- logForDebugging(
- '[bridge:repl] Session creation failed during reconnection',
- )
- return false
- }
- // Bail out if teardown started during session creation (up to 15s)
- if (pollController.signal.aborted) {
- logForDebugging(
- '[bridge:repl] Reconnect aborted after session creation, cleaning up',
- )
- await archiveSession(newSessionId)
- return false
- }
- currentSessionId = newSessionId
- // Re-publish to the PID file so peer dedup (peerRegistry.ts) picks up the
- // new ID — setReplBridgeHandle only fires at init/teardown, not reconnect.
- void updateSessionBridgeId(toCompatSessionId(newSessionId)).catch(() => {})
- // Reset per-session transport state IMMEDIATELY after the session swap,
- // before any await. If this runs after `await writeBridgePointer` below,
- // there's a window where handle.bridgeSessionId already returns session B
- // but getSSESequenceNum() still returns session A's seq — a daemon
- // persistState() in that window writes {bridgeSessionId: B, seq: OLD_A},
- // which PASSES the session-ID validation check and defeats it entirely.
- //
- // The SSE seq-num is scoped to the session's event stream — carrying it
- // over leaves the transport's lastSequenceNum stuck high (seq only
- // advances when received > last), and its next internal reconnect would
- // send from_sequence_num=OLD_SEQ against a stream starting at 1 → all
- // events in the gap silently dropped. Inbound UUID dedup is also
- // session-scoped.
- lastTransportSequenceNum = 0
- recentInboundUUIDs.clear()
- // Title derivation is session-scoped too: if the user typed during the
- // createSession await above, the callback fired against the OLD archived
- // session ID (PATCH lost) and the new session got `currentTitle` captured
- // BEFORE they typed. Reset so the next prompt can re-derive. Self-
- // correcting: if the caller's policy is already done (explicit title or
- // count ≥ 3), it returns true on the first post-reset call and re-latches.
- userMessageCallbackDone = !onUserMessage
- logForDebugging(`[bridge:repl] Re-created session: ${currentSessionId}`)
- // Rewrite the crash-recovery pointer with the new IDs so a crash after
- // this point resumes the right session. (The reconnect-in-place path
- // above doesn't touch the pointer — same session, same env.)
- await writeBridgePointer(dir, {
- sessionId: currentSessionId,
- environmentId,
- source: 'repl',
- })
- // Clear flushed UUIDs so initial messages are re-sent to the new session.
- // UUIDs are scoped per-session on the server, so re-flushing is safe.
- previouslyFlushedUUIDs?.clear()
- // Reset the counter so independent reconnections hours apart don't
- // exhaust the limit — it guards against rapid consecutive failures,
- // not lifetime total.
- environmentRecreations = 0
- return true
- }
- // Helper: get the current OAuth access token for session ingress auth.
- // Unlike the JWT path, OAuth tokens are refreshed by the standard OAuth
- // flow — no proactive scheduler needed.
- function getOAuthToken(): string | undefined {
- return getAccessToken()
- }
- // Drain any messages that were queued during the initial flush.
- // Called after writeBatch completes (or fails) so queued messages
- // are sent in order after the historical messages.
- function drainFlushGate(): void {
- const msgs = flushGate.end()
- if (msgs.length === 0) return
- if (!transport) {
- logForDebugging(
- `[bridge:repl] Cannot drain ${msgs.length} pending message(s): no transport`,
- )
- return
- }
- for (const msg of msgs) {
- recentPostedUUIDs.add(msg.uuid)
- }
- const sdkMessages = toSDKMessages(msgs)
- const events = sdkMessages.map(sdkMsg => ({
- ...sdkMsg,
- session_id: currentSessionId,
- }))
- logForDebugging(
- `[bridge:repl] Drained ${msgs.length} pending message(s) after flush`,
- )
- void transport.writeBatch(events)
- }
- // Teardown reference — set after definition below. All callers are async
- // callbacks that run after assignment, so the reference is always valid.
- let doTeardownImpl: (() => Promise<void>) | null = null
- function triggerTeardown(): void {
- void doTeardownImpl?.()
- }
- /**
- * Body of the transport's setOnClose callback, hoisted to initBridgeCore
- * scope so /bridge-kick can fire it directly. setOnClose wraps this with
- * a stale-transport guard; debugFireClose calls it bare.
- *
- * With autoReconnect:true, this only fires on: clean close (1000),
- * permanent server rejection (4001/1002/4003), or 10-min budget
- * exhaustion. Transient drops are retried internally by the transport.
- */
- function handleTransportPermanentClose(closeCode: number | undefined): void {
- logForDebugging(
- `[bridge:repl] Transport permanently closed: code=${closeCode}`,
- )
- logEvent('tengu_bridge_repl_ws_closed', {
- code: closeCode,
- })
- // Capture SSE seq high-water mark before nulling. When called from
- // setOnClose the guard guarantees transport !== null; when fired from
- // /bridge-kick it may already be null (e.g. fired twice) — skip.
- if (transport) {
- const closedSeq = transport.getLastSequenceNum()
- if (closedSeq > lastTransportSequenceNum) {
- lastTransportSequenceNum = closedSeq
- }
- transport = null
- }
- // Transport is gone — wake the poll loop out of its at-capacity
- // heartbeat sleep so it's fast-polling by the time the reconnect
- // below completes and the server re-queues work.
- wakePollLoop()
- // Reset flush state so writeMessages() hits the !transport guard
- // (with a warning log) instead of silently queuing into a buffer
- // that will never be drained. Unlike onWorkReceived (which
- // preserves pending messages for the new transport), onClose is
- // a permanent close — no new transport will drain these.
- const dropped = flushGate.drop()
- if (dropped > 0) {
- logForDebugging(
- `[bridge:repl] Dropping ${dropped} pending message(s) on transport close (code=${closeCode})`,
- { level: 'warn' },
- )
- }
- if (closeCode === 1000) {
- // Clean close — session ended normally. Tear down the bridge.
- onStateChange?.('failed', 'session ended')
- pollController.abort()
- triggerTeardown()
- return
- }
- // Transport reconnect budget exhausted or permanent server
- // rejection. By this point the env has usually been reaped
- // server-side (BQ 2026-03-12: ~98% of ws_closed never recover
- // via poll alone). stopWork(force=false) can't re-dispatch work
- // from an archived env; reconnectEnvironmentWithSession can
- // re-activate it via POST /bridge/reconnect, or fall through
- // to a fresh session if the env is truly gone. The poll loop
- // (already woken above) picks up the re-queued work once
- // doReconnect completes.
- onStateChange?.(
- 'reconnecting',
- `Remote Control connection lost (code ${closeCode})`,
- )
- logForDebugging(
- `[bridge:repl] Transport reconnect budget exhausted (code=${closeCode}), attempting env reconnect`,
- )
- void reconnectEnvironmentWithSession().then(success => {
- if (success) return
- // doReconnect has four abort-check return-false sites for
- // teardown-in-progress. Don't pollute the BQ failure signal
- // or double-teardown when the user just quit.
- if (pollController.signal.aborted) return
- // doReconnect returns false (never throws) on genuine failure.
- // The dangerous case: registerBridgeEnvironment succeeded (so
- // environmentId now points at a fresh valid env) but
- // createSession failed — poll loop would poll a sessionless
- // env getting null work with no errors, never hitting any
- // give-up path. Tear down explicitly.
- logForDebugging(
- '[bridge:repl] reconnectEnvironmentWithSession resolved false — tearing down',
- )
- logEvent('tengu_bridge_repl_reconnect_failed', {
- close_code: closeCode,
- })
- onStateChange?.('failed', 'reconnection failed')
- triggerTeardown()
- })
- }
- // Ant-only: SIGUSR2 → force doReconnect() for manual testing. Skips the
- // ~30s poll wait — fire-and-observe in the debug log immediately.
- // Windows has no USR signals; `process.on` would throw there.
- let sigusr2Handler: (() => void) | undefined
- if (process.env.USER_TYPE === 'ant' && process.platform !== 'win32') {
- sigusr2Handler = () => {
- logForDebugging(
- '[bridge:repl] SIGUSR2 received — forcing doReconnect() for testing',
- )
- void reconnectEnvironmentWithSession()
- }
- process.on('SIGUSR2', sigusr2Handler)
- }
- // Ant-only: /bridge-kick fault injection. handleTransportPermanentClose
- // is defined below and assigned into this slot so the slash command can
- // invoke it directly — the real setOnClose callback is buried inside
- // wireTransport which is itself inside onWorkReceived.
- let debugFireClose: ((code: number) => void) | null = null
- if (process.env.USER_TYPE === 'ant') {
- registerBridgeDebugHandle({
- fireClose: code => {
- if (!debugFireClose) {
- logForDebugging('[bridge:debug] fireClose: no transport wired yet')
- return
- }
- logForDebugging(`[bridge:debug] fireClose(${code}) — injecting`)
- debugFireClose(code)
- },
- forceReconnect: () => {
- logForDebugging('[bridge:debug] forceReconnect — injecting')
- void reconnectEnvironmentWithSession()
- },
- injectFault: injectBridgeFault,
- wakePollLoop,
- describe: () =>
- `env=${environmentId} session=${currentSessionId} transport=${transport?.getStateLabel() ?? 'null'} workId=${currentWorkId ?? 'null'}`,
- })
- }
- const pollOpts = {
- api,
- getCredentials: () => ({ environmentId, environmentSecret }),
- signal: pollController.signal,
- getPollIntervalConfig,
- onStateChange,
- getWsState: () => transport?.getStateLabel() ?? 'null',
- // REPL bridge is single-session: having any transport == at capacity.
- // No need to check isConnectedStatus() — even while the transport is
- // auto-reconnecting internally (up to 10 min), poll is heartbeat-only.
- isAtCapacity: () => transport !== null,
- capacitySignal,
- onFatalError: triggerTeardown,
- getHeartbeatInfo: () => {
- if (!currentWorkId || !currentIngressToken) {
- return null
- }
- return {
- environmentId,
- workId: currentWorkId,
- sessionToken: currentIngressToken,
- }
- },
- // Work-item JWT expired (or work gone). The transport is useless —
- // SSE reconnects and CCR writes use the same stale token. Without
- // this callback the poll loop would do a 10-min at-capacity backoff,
- // during which the work lease (300s TTL) expires and the server stops
- // forwarding prompts → ~25-min dead window observed in daemon logs.
- // Kill the transport + work state so isAtCapacity()=false; the loop
- // fast-polls and picks up the server's re-dispatched work in seconds.
- onHeartbeatFatal: (err: BridgeFatalError) => {
- logForDebugging(
- `[bridge:repl] heartbeatWork fatal (status=${err.status}) — tearing down work item for fast re-dispatch`,
- )
- if (transport) {
- const seq = transport.getLastSequenceNum()
- if (seq > lastTransportSequenceNum) {
- lastTransportSequenceNum = seq
- }
- transport.close()
- transport = null
- }
- flushGate.drop()
- // force=false → server re-queues. Likely already expired, but
- // idempotent and makes re-dispatch immediate if not.
- if (currentWorkId) {
- void api
- .stopWork(environmentId, currentWorkId, false)
- .catch((e: unknown) => {
- logForDebugging(
- `[bridge:repl] stopWork after heartbeat fatal: ${errorMessage(e)}`,
- )
- })
- }
- currentWorkId = null
- currentIngressToken = null
- wakePollLoop()
- onStateChange?.(
- 'reconnecting',
- 'Work item lease expired, fetching fresh token',
- )
- },
- async onEnvironmentLost() {
- const success = await reconnectEnvironmentWithSession()
- if (!success) {
- return null
- }
- return { environmentId, environmentSecret }
- },
- onWorkReceived: (
- workSessionId: string,
- ingressToken: string,
- workId: string,
- serverUseCcrV2: boolean,
- ) => {
- // When new work arrives while a transport is already open, the
- // server has decided to re-dispatch (e.g. token rotation, server
- // restart). Close the existing transport and reconnect — discarding
- // the work causes a stuck 'reconnecting' state if the old WS dies
- // shortly after (the server won't re-dispatch a work item it
- // already delivered).
- // ingressToken (JWT) is stored for heartbeat auth (both v1 and v2).
- // Transport auth diverges — see the v1/v2 split below.
- if (transport?.isConnectedStatus()) {
- logForDebugging(
- `[bridge:repl] Work received while transport connected, replacing with fresh token (workId=${workId})`,
- )
- }
- logForDebugging(
- `[bridge:repl] Work received: workId=${workId} workSessionId=${workSessionId} currentSessionId=${currentSessionId} match=${sameSessionId(workSessionId, currentSessionId)}`,
- )
- // Refresh the crash-recovery pointer's mtime. Staleness checks file
- // mtime (not embedded timestamp) so this re-write bumps the clock —
- // a 5h+ session that crashes still has a fresh pointer. Fires once
- // per work dispatch (infrequent — bounded by user message rate).
- void writeBridgePointer(dir, {
- sessionId: currentSessionId,
- environmentId,
- source: 'repl',
- })
- // Reject foreign session IDs — the server shouldn't assign sessions
- // from other environments. Since we create env+session as a pair,
- // a mismatch indicates an unexpected server-side reassignment.
- //
- // Compare by underlying UUID, not by tagged-ID prefix. When CCR
- // v2's compat layer serves the session, createBridgeSession gets
- // session_* from the v1-facing API (compat/convert.go:41) but the
- // infrastructure layer delivers cse_* in the work queue
- // (container_manager.go:129). Same UUID, different tag.
- if (!sameSessionId(workSessionId, currentSessionId)) {
- logForDebugging(
- `[bridge:repl] Rejecting foreign session: expected=${currentSessionId} got=${workSessionId}`,
- )
- return
- }
- currentWorkId = workId
- currentIngressToken = ingressToken
- // Server decides per-session (secret.use_code_sessions from the work
- // secret, threaded through runWorkPollLoop). The env var is an ant-dev
- // override for forcing v2 before the server flag is on for your user —
- // requires ccr_v2_compat_enabled server-side or registerWorker 404s.
- //
- // Kept separate from CLAUDE_CODE_USE_CCR_V2 (the child-SDK transport
- // selector set by sessionRunner/environment-manager) to avoid the
- // inheritance hazard in spawn mode where the parent's orchestrator
- // var would leak into a v1 child.
- const useCcrV2 =
- serverUseCcrV2 || isEnvTruthy(process.env.CLAUDE_BRIDGE_USE_CCR_V2)
- // Auth is the one place v1 and v2 diverge hard:
- //
- // - v1 (Session-Ingress): accepts OAuth OR JWT. We prefer OAuth
- // because the standard OAuth refresh flow handles expiry — no
- // separate JWT refresh scheduler needed.
- //
- // - v2 (CCR /worker/*): REQUIRES the JWT. register_worker.go:32
- // validates the session_id claim, which OAuth tokens don't carry.
- // The JWT from the work secret has both that claim and the worker
- // role (environment_auth.py:856). JWT refresh: when it expires the
- // server re-dispatches work with a fresh one, and onWorkReceived
- // fires again. createV2ReplTransport stores it via
- // updateSessionIngressAuthToken() before touching the network.
- let v1OauthToken: string | undefined
- if (!useCcrV2) {
- v1OauthToken = getOAuthToken()
- if (!v1OauthToken) {
- logForDebugging(
- '[bridge:repl] No OAuth token available for session ingress, skipping work',
- )
- return
- }
- updateSessionIngressAuthToken(v1OauthToken)
- }
- logEvent('tengu_bridge_repl_work_received', {})
- // Close the previous transport. Nullify BEFORE calling close() so
- // the close callback doesn't treat the programmatic close as
- // "session ended normally" and trigger a full teardown.
- if (transport) {
- const oldTransport = transport
- transport = null
- // Capture the SSE sequence high-water mark so the next transport
- // resumes the stream instead of replaying from seq 0. Use max() —
- // a transport that died early (never received any frames) would
- // otherwise reset a non-zero mark back to 0.
- const oldSeq = oldTransport.getLastSequenceNum()
- if (oldSeq > lastTransportSequenceNum) {
- lastTransportSequenceNum = oldSeq
- }
- oldTransport.close()
- }
- // Reset flush state — the old flush (if any) is no longer relevant.
- // Preserve pending messages so they're drained after the new
- // transport's flush completes (the hook has already advanced its
- // lastWrittenIndex and won't re-send them).
- flushGate.deactivate()
- // Closure adapter over the shared handleServerControlRequest —
- // captures transport/currentSessionId so the transport.setOnData
- // callback below doesn't need to thread them through.
- const onServerControlRequest = (request: SDKControlRequest): void =>
- handleServerControlRequest(request, {
- transport,
- sessionId: currentSessionId,
- onInterrupt,
- onSetModel,
- onSetMaxThinkingTokens,
- onSetPermissionMode,
- })
- let initialFlushDone = false
- // Wire callbacks onto a freshly constructed transport and connect.
- // Extracted so the (sync) v1 and (async) v2 construction paths can
- // share the identical callback + flush machinery.
- const wireTransport = (newTransport: ReplBridgeTransport): void => {
- transport = newTransport
- newTransport.setOnConnect(() => {
- // Guard: if transport was replaced by a newer onWorkReceived call
- // while the WS was connecting, ignore this stale callback.
- if (transport !== newTransport) return
- logForDebugging('[bridge:repl] Ingress transport connected')
- logEvent('tengu_bridge_repl_ws_connected', {})
- // Update the env var with the latest OAuth token so POST writes
- // (which read via getSessionIngressAuthToken()) use a fresh token.
- // v2 skips this — createV2ReplTransport already stored the JWT,
- // and overwriting it with OAuth would break subsequent /worker/*
- // requests (session_id claim check).
- if (!useCcrV2) {
- const freshToken = getOAuthToken()
- if (freshToken) {
- updateSessionIngressAuthToken(freshToken)
- }
- }
- // Reset teardownStarted so future teardowns are not blocked.
- teardownStarted = false
- // Flush initial messages only on first connect, not on every
- // WS reconnection. Re-flushing would cause duplicate messages.
- // IMPORTANT: onStateChange('connected') is deferred until the
- // flush completes. This prevents writeMessages() from sending
- // new messages that could arrive at the server interleaved with
- // the historical messages, and delays the web UI from showing
- // the session as active until history is persisted.
- if (
- !initialFlushDone &&
- initialMessages &&
- initialMessages.length > 0
- ) {
- initialFlushDone = true
- // Cap the initial flush to the most recent N messages. The full
- // history is UI-only (model doesn't see it) and large replays cause
- // slow session-ingress persistence (each event is a threadstore write)
- // plus elevated Firestore pressure. A 0 or negative cap disables it.
- const historyCap = initialHistoryCap
- const eligibleMessages = initialMessages.filter(
- m =>
- isEligibleBridgeMessage(m) &&
- !previouslyFlushedUUIDs?.has(m.uuid),
- )
- const cappedMessages =
- historyCap > 0 && eligibleMessages.length > historyCap
- ? eligibleMessages.slice(-historyCap)
- : eligibleMessages
- if (cappedMessages.length < eligibleMessages.length) {
- logForDebugging(
- `[bridge:repl] Capped initial flush: ${eligibleMessages.length} -> ${cappedMessages.length} (cap=${historyCap})`,
- )
- logEvent('tengu_bridge_repl_history_capped', {
- eligible_count: eligibleMessages.length,
- capped_count: cappedMessages.length,
- })
- }
- const sdkMessages = toSDKMessages(cappedMessages)
- if (sdkMessages.length > 0) {
- logForDebugging(
- `[bridge:repl] Flushing ${sdkMessages.length} initial message(s) via transport`,
- )
- const events = sdkMessages.map(sdkMsg => ({
- ...sdkMsg,
- session_id: currentSessionId,
- }))
- const dropsBefore = newTransport.droppedBatchCount
- void newTransport
- .writeBatch(events)
- .then(() => {
- // If any batch was dropped during this flush (SI down for
- // maxConsecutiveFailures attempts), flush() still resolved
- // normally but the events were NOT delivered. Don't mark
- // UUIDs as flushed — keep them eligible for re-send on the
- // next onWorkReceived (JWT refresh re-dispatch, line ~1144).
- if (newTransport.droppedBatchCount > dropsBefore) {
- logForDebugging(
- `[bridge:repl] Initial flush dropped ${newTransport.droppedBatchCount - dropsBefore} batch(es) — not marking ${sdkMessages.length} UUID(s) as flushed`,
- )
- return
- }
- if (previouslyFlushedUUIDs) {
- for (const sdkMsg of sdkMessages) {
- if (sdkMsg.uuid) {
- previouslyFlushedUUIDs.add(sdkMsg.uuid as string)
- }
- }
- }
- })
- .catch(e =>
- logForDebugging(`[bridge:repl] Initial flush failed: ${e}`),
- )
- .finally(() => {
- // Guard: if transport was replaced during the flush,
- // don't signal connected or drain — the new transport
- // owns the lifecycle now.
- if (transport !== newTransport) return
- drainFlushGate()
- onStateChange?.('connected')
- })
- } else {
- // All initial messages were already flushed (filtered by
- // previouslyFlushedUUIDs). No flush POST needed — clear
- // the flag and signal connected immediately. This is the
- // first connect for this transport (inside !initialFlushDone),
- // so no flush POST is in-flight — the flag was set before
- // connect() and must be cleared here.
- drainFlushGate()
- onStateChange?.('connected')
- }
- } else if (!flushGate.active) {
- // No initial messages or already flushed on first connect.
- // WS auto-reconnect path — only signal connected if no flush
- // POST is in-flight. If one is, .finally() owns the lifecycle.
- onStateChange?.('connected')
- }
- })
- newTransport.setOnData(data => {
- handleIngressMessage(
- data,
- recentPostedUUIDs,
- recentInboundUUIDs,
- onInboundMessage,
- onPermissionResponse,
- onServerControlRequest,
- )
- })
- // Body lives at initBridgeCore scope so /bridge-kick can call it
- // directly via debugFireClose. All referenced closures (transport,
- // wakePollLoop, flushGate, reconnectEnvironmentWithSession, etc.)
- // are already at that scope. The only lexical dependency on
- // wireTransport was `newTransport.getLastSequenceNum()` — but after
- // the guard below passes we know transport === newTransport.
- debugFireClose = handleTransportPermanentClose
- newTransport.setOnClose(closeCode => {
- // Guard: if transport was replaced, ignore stale close.
- if (transport !== newTransport) return
- handleTransportPermanentClose(closeCode)
- })
- // Start the flush gate before connect() to cover the WS handshake
- // window. Between transport assignment and setOnConnect firing,
- // writeMessages() could send messages via HTTP POST before the
- // initial flush starts. Starting the gate here ensures those
- // calls are queued. If there are no initial messages, the gate
- // stays inactive.
- if (
- !initialFlushDone &&
- initialMessages &&
- initialMessages.length > 0
- ) {
- flushGate.start()
- }
- newTransport.connect()
- } // end wireTransport
- // Bump unconditionally — ANY new transport (v1 or v2) invalidates an
- // in-flight v2 handshake. Also bumped in doReconnect().
- v2Generation++
- if (useCcrV2) {
- // workSessionId is the cse_* form (infrastructure-layer ID from the
- // work queue), which is what /v1/code/sessions/{id}/worker/* wants.
- // The session_* form (currentSessionId) is NOT usable here —
- // handler/convert.go:30 validates TagCodeSession.
- const sessionUrl = buildCCRv2SdkUrl(baseUrl, workSessionId)
- const thisGen = v2Generation
- logForDebugging(
- `[bridge:repl] CCR v2: sessionUrl=${sessionUrl} session=${workSessionId} gen=${thisGen}`,
- )
- void createV2ReplTransport({
- sessionUrl,
- ingressToken,
- sessionId: workSessionId,
- initialSequenceNum: lastTransportSequenceNum,
- }).then(
- t => {
- // Teardown started while registerWorker was in flight. Teardown
- // saw transport === null and skipped close(); installing now
- // would leak CCRClient heartbeat timers and reset
- // teardownStarted via wireTransport's side effects.
- if (pollController.signal.aborted) {
- t.close()
- return
- }
- // onWorkReceived may have fired again while registerWorker()
- // was in flight (server re-dispatch with a fresh JWT). The
- // transport !== null check alone gets the race wrong when BOTH
- // attempts saw transport === null — it keeps the first resolver
- // (stale epoch) and discards the second (correct epoch). The
- // generation check catches it regardless of transport state.
- if (thisGen !== v2Generation) {
- logForDebugging(
- `[bridge:repl] CCR v2: discarding stale handshake gen=${thisGen} current=${v2Generation}`,
- )
- t.close()
- return
- }
- wireTransport(t)
- },
- (err: unknown) => {
- logForDebugging(
- `[bridge:repl] CCR v2: createV2ReplTransport failed: ${errorMessage(err)}`,
- { level: 'error' },
- )
- logEvent('tengu_bridge_repl_ccr_v2_init_failed', {})
- // If a newer attempt is in flight or already succeeded, don't
- // touch its work item — our failure is irrelevant.
- if (thisGen !== v2Generation) return
- // Release the work item so the server re-dispatches immediately
- // instead of waiting for its own timeout. currentWorkId was set
- // above; without this, the session looks stuck to the user.
- if (currentWorkId) {
- void api
- .stopWork(environmentId, currentWorkId, false)
- .catch((e: unknown) => {
- logForDebugging(
- `[bridge:repl] stopWork after v2 init failure: ${errorMessage(e)}`,
- )
- })
- currentWorkId = null
- currentIngressToken = null
- }
- wakePollLoop()
- },
- )
- } else {
- // v1: HybridTransport (WS reads + POST writes to Session-Ingress).
- // autoReconnect is true (default) — when the WS dies, the transport
- // reconnects automatically with exponential backoff. POST writes
- // continue during reconnection (they use getSessionIngressAuthToken()
- // independently of WS state). The poll loop remains as a secondary
- // fallback if the reconnect budget is exhausted (10 min).
- //
- // Auth: uses OAuth tokens directly instead of the JWT from the work
- // secret. refreshHeaders picks up the latest OAuth token on each
- // WS reconnect attempt.
- const wsUrl = buildSdkUrl(sessionIngressUrl, workSessionId)
- logForDebugging(`[bridge:repl] Ingress URL: ${wsUrl}`)
- logForDebugging(
- `[bridge:repl] Creating HybridTransport: session=${workSessionId}`,
- )
- // v1OauthToken was validated non-null above (we'd have returned early).
- const oauthToken = v1OauthToken ?? ''
- wireTransport(
- createV1ReplTransport(
- new HybridTransport(
- new URL(wsUrl),
- {
- Authorization: `Bearer ${oauthToken}`,
- 'anthropic-version': '2023-06-01',
- },
- workSessionId,
- () => ({
- Authorization: `Bearer ${getOAuthToken() ?? oauthToken}`,
- 'anthropic-version': '2023-06-01',
- }),
- // Cap retries so a persistently-failing session-ingress can't
- // pin the uploader drain loop for the lifetime of the bridge.
- // 50 attempts ≈ 20 min (15s POST timeout + 8s backoff + jitter
- // per cycle at steady state). Bridge-only — 1P keeps indefinite.
- {
- maxConsecutiveFailures: 50,
- isBridge: true,
- onBatchDropped: () => {
- onStateChange?.(
- 'reconnecting',
- 'Lost sync with Remote Control — events could not be delivered',
- )
- // SI has been down ~20 min. Wake the poll loop so that when
- // SI recovers, next poll → onWorkReceived → fresh transport
- // → initial flush succeeds → onStateChange('connected') at
- // ~line 1420. Without this, state stays 'reconnecting' even
- // after SI recovers — daemon.ts:437 denies all permissions,
- // useReplBridge.ts:311 keeps replBridgeSessionActive=false.
- // If the env was archived during the outage, poll 404 →
- // onEnvironmentLost recovery path handles it.
- wakePollLoop()
- },
- },
- ),
- ),
- )
- }
- },
- }
- void startWorkPollLoop(pollOpts)
- // Perpetual mode: hourly mtime refresh of the crash-recovery pointer.
- // The onWorkReceived refresh only fires per user prompt — a
- // daemon idle for >4h would have a stale pointer, and the next restart
- // would clear it (readBridgePointer TTL check) → fresh session. The
- // standalone bridge (bridgeMain.ts) has an identical hourly timer.
- const pointerRefreshTimer = perpetual
- ? setInterval(() => {
- // doReconnect() reassigns currentSessionId/environmentId non-
- // atomically (env at ~:634, session at ~:719, awaits in between).
- // If this timer fires in that window, its fire-and-forget write can
- // race with (and overwrite) doReconnect's own pointer write at ~:740,
- // leaving the pointer at the now-archived old session. doReconnect
- // writes the pointer itself, so skipping here is free.
- if (reconnectPromise) return
- void writeBridgePointer(dir, {
- sessionId: currentSessionId,
- environmentId,
- source: 'repl',
- })
- }, 60 * 60_000)
- : null
- pointerRefreshTimer?.unref?.()
- // Push a silent keep_alive frame on a fixed interval so upstream proxies
- // and the session-ingress layer don't GC an otherwise-idle remote control
- // session. The keep_alive type is filtered before reaching any client UI
- // (Query.ts drops it; web/iOS/Android never see it in their message loop).
- // Interval comes from GrowthBook (tengu_bridge_poll_interval_config
- // session_keepalive_interval_v2_ms, default 120s); 0 = disabled.
- const keepAliveIntervalMs =
- getPollIntervalConfig().session_keepalive_interval_v2_ms
- const keepAliveTimer =
- keepAliveIntervalMs > 0
- ? setInterval(() => {
- if (!transport) return
- logForDebugging('[bridge:repl] keep_alive sent')
- void transport.write({ type: 'keep_alive' }).catch((err: unknown) => {
- logForDebugging(
- `[bridge:repl] keep_alive write failed: ${errorMessage(err)}`,
- )
- })
- }, keepAliveIntervalMs)
- : null
- keepAliveTimer?.unref?.()
- // Shared teardown sequence used by both cleanup registration and
- // the explicit teardown() method on the returned handle.
- let teardownStarted = false
- doTeardownImpl = async (): Promise<void> => {
- if (teardownStarted) {
- logForDebugging(
- `[bridge:repl] Teardown already in progress, skipping duplicate call env=${environmentId} session=${currentSessionId}`,
- )
- return
- }
- teardownStarted = true
- const teardownStart = Date.now()
- logForDebugging(
- `[bridge:repl] Teardown starting: env=${environmentId} session=${currentSessionId} workId=${currentWorkId ?? 'none'} transportState=${transport?.getStateLabel() ?? 'null'}`,
- )
- if (pointerRefreshTimer !== null) {
- clearInterval(pointerRefreshTimer)
- }
- if (keepAliveTimer !== null) {
- clearInterval(keepAliveTimer)
- }
- if (sigusr2Handler) {
- process.off('SIGUSR2', sigusr2Handler)
- }
- if (process.env.USER_TYPE === 'ant') {
- clearBridgeDebugHandle()
- debugFireClose = null
- }
- pollController.abort()
- logForDebugging('[bridge:repl] Teardown: poll loop aborted')
- // Capture the live transport's seq BEFORE close() — close() is sync
- // (just aborts the SSE fetch) and does NOT invoke onClose, so the
- // setOnClose capture path never runs for explicit teardown.
- // Without this, getSSESequenceNum() after teardown returns the stale
- // lastTransportSequenceNum (captured at the last transport swap), and
- // daemon callers persisting that value lose all events since then.
- if (transport) {
- const finalSeq = transport.getLastSequenceNum()
- if (finalSeq > lastTransportSequenceNum) {
- lastTransportSequenceNum = finalSeq
- }
- }
- if (perpetual) {
- // Perpetual teardown is LOCAL-ONLY — do not send result, do not call
- // stopWork, do not close the transport. All of those signal the
- // server (and any mobile/attach subscribers) that the session is
- // ending. Instead: stop polling, let the socket die with the
- // process; the backend times the work-item lease back to pending on
- // its own (TTL 300s). Next daemon start reads the pointer and
- // reconnectSession re-queues work.
- transport = null
- flushGate.drop()
- // Refresh the pointer mtime so that sessions lasting longer than
- // BRIDGE_POINTER_TTL_MS (4h) don't appear stale on next start.
- await writeBridgePointer(dir, {
- sessionId: currentSessionId,
- environmentId,
- source: 'repl',
- })
- logForDebugging(
- `[bridge:repl] Teardown (perpetual): leaving env=${environmentId} session=${currentSessionId} alive on server, duration=${Date.now() - teardownStart}ms`,
- )
- return
- }
- // Fire the result message, then archive, THEN close. transport.write()
- // only enqueues (SerialBatchEventUploader resolves on buffer-add); the
- // stopWork/archive latency (~200-500ms) is the drain window for the
- // result POST. Closing BEFORE archive meant relying on HybridTransport's
- // void-ed 3s grace period, which nothing awaits — forceExit can kill the
- // socket mid-POST. Same reorder as remoteBridgeCore.ts teardown (#22803).
- const teardownTransport = transport
- transport = null
- flushGate.drop()
- if (teardownTransport) {
- void teardownTransport.write(makeResultMessage(currentSessionId))
- }
- const stopWorkP = currentWorkId
- ? api
- .stopWork(environmentId, currentWorkId, true)
- .then(() => {
- logForDebugging('[bridge:repl] Teardown: stopWork completed')
- })
- .catch((err: unknown) => {
- logForDebugging(
- `[bridge:repl] Teardown stopWork failed: ${errorMessage(err)}`,
- )
- })
- : Promise.resolve()
- // Run stopWork and archiveSession in parallel. gracefulShutdown.ts:407
- // races runCleanupFunctions() against 2s (NOT the 5s outer failsafe),
- // so archive is capped at 1.5s at the injection site to stay under budget.
- // archiveSession is contractually no-throw; the injected implementations
- // log their own success/failure internally.
- await Promise.all([stopWorkP, archiveSession(currentSessionId)])
- teardownTransport?.close()
- logForDebugging('[bridge:repl] Teardown: transport closed')
- await api.deregisterEnvironment(environmentId).catch((err: unknown) => {
- logForDebugging(
- `[bridge:repl] Teardown deregister failed: ${errorMessage(err)}`,
- )
- })
- // Clear the crash-recovery pointer — explicit disconnect or clean REPL
- // exit means the user is done with this session. Crash/kill-9 never
- // reaches this line, leaving the pointer for next-launch recovery.
- await clearBridgePointer(dir)
- logForDebugging(
- `[bridge:repl] Teardown complete: env=${environmentId} duration=${Date.now() - teardownStart}ms`,
- )
- }
- // 8. Register cleanup for graceful shutdown
- const unregister = registerCleanup(() => doTeardownImpl?.())
- logForDebugging(
- `[bridge:repl] Ready: env=${environmentId} session=${currentSessionId}`,
- )
- onStateChange?.('ready')
- return {
- get bridgeSessionId() {
- return currentSessionId
- },
- get environmentId() {
- return environmentId
- },
- getSSESequenceNum() {
- // lastTransportSequenceNum only updates when a transport is CLOSED
- // (captured at swap/onClose). During normal operation the CURRENT
- // transport's live seq isn't reflected there. Merge both so callers
- // (e.g. daemon persistState()) get the actual high-water mark.
- const live = transport?.getLastSequenceNum() ?? 0
- return Math.max(lastTransportSequenceNum, live)
- },
- sessionIngressUrl,
- writeMessages(messages) {
- // Filter to user/assistant messages that haven't already been sent.
- // Two layers of dedup:
- // - initialMessageUUIDs: messages sent as session creation events
- // - recentPostedUUIDs: messages recently sent via POST
- const filtered = messages.filter(
- m =>
- isEligibleBridgeMessage(m) &&
- !initialMessageUUIDs.has(m.uuid) &&
- !recentPostedUUIDs.has(m.uuid),
- )
- if (filtered.length === 0) return
- // Fire onUserMessage for title derivation. Scan before the flushGate
- // check — prompts are title-worthy even if they queue behind the
- // initial history flush. Keeps calling on every title-worthy message
- // until the callback returns true; the caller owns the policy.
- if (!userMessageCallbackDone) {
- for (const m of filtered) {
- const text = extractTitleText(m)
- if (text !== undefined && onUserMessage?.(text, currentSessionId)) {
- userMessageCallbackDone = true
- break
- }
- }
- }
- // Queue messages while the initial flush is in progress to prevent
- // them from arriving at the server interleaved with history.
- if (flushGate.enqueue(...filtered)) {
- logForDebugging(
- `[bridge:repl] Queued ${filtered.length} message(s) during initial flush`,
- )
- return
- }
- if (!transport) {
- const types = filtered.map(m => m.type).join(',')
- logForDebugging(
- `[bridge:repl] Transport not configured, dropping ${filtered.length} message(s) [${types}] for session=${currentSessionId}`,
- { level: 'warn' },
- )
- return
- }
- // Track in the bounded ring buffer for echo filtering and dedup.
- for (const msg of filtered) {
- recentPostedUUIDs.add(msg.uuid)
- }
- logForDebugging(
- `[bridge:repl] Sending ${filtered.length} message(s) via transport`,
- )
- // Convert to SDK format and send via HTTP POST (HybridTransport).
- // The web UI receives them via the subscribe WebSocket.
- const sdkMessages = toSDKMessages(filtered)
- const events = sdkMessages.map(sdkMsg => ({
- ...sdkMsg,
- session_id: currentSessionId,
- }))
- void transport.writeBatch(events)
- },
- writeSdkMessages(messages) {
- // Daemon path: query() already yields SDKMessage, skip conversion.
- // Still run echo dedup (server bounces writes back on the WS).
- // No initialMessageUUIDs filter — daemon has no initial messages.
- // No flushGate — daemon never starts it (no initial flush).
- const filtered = messages.filter(
- m => !m.uuid || !recentPostedUUIDs.has(m.uuid as string),
- )
- if (filtered.length === 0) return
- if (!transport) {
- logForDebugging(
- `[bridge:repl] Transport not configured, dropping ${filtered.length} SDK message(s) for session=${currentSessionId}`,
- { level: 'warn' },
- )
- return
- }
- for (const msg of filtered) {
- if (msg.uuid) recentPostedUUIDs.add(msg.uuid as string)
- }
- const events = filtered.map(m => ({ ...m, session_id: currentSessionId }))
- void transport.writeBatch(events)
- },
- sendControlRequest(request: SDKControlRequest) {
- if (!transport) {
- logForDebugging(
- '[bridge:repl] Transport not configured, skipping control_request',
- )
- return
- }
- const event = { ...request, session_id: currentSessionId }
- void transport.write(event)
- logForDebugging(
- `[bridge:repl] Sent control_request request_id=${request.request_id}`,
- )
- },
- sendControlResponse(response: SDKControlResponse) {
- if (!transport) {
- logForDebugging(
- '[bridge:repl] Transport not configured, skipping control_response',
- )
- return
- }
- const event = { ...response, session_id: currentSessionId }
- void transport.write(event)
- logForDebugging('[bridge:repl] Sent control_response')
- },
- sendControlCancelRequest(requestId: string) {
- if (!transport) {
- logForDebugging(
- '[bridge:repl] Transport not configured, skipping control_cancel_request',
- )
- return
- }
- const event = {
- type: 'control_cancel_request' as const,
- request_id: requestId,
- session_id: currentSessionId,
- }
- void transport.write(event)
- logForDebugging(
- `[bridge:repl] Sent control_cancel_request request_id=${requestId}`,
- )
- },
- sendResult() {
- if (!transport) {
- logForDebugging(
- `[bridge:repl] sendResult: skipping, transport not configured session=${currentSessionId}`,
- )
- return
- }
- void transport.write(makeResultMessage(currentSessionId))
- logForDebugging(
- `[bridge:repl] Sent result for session=${currentSessionId}`,
- )
- },
- async teardown() {
- unregister()
- await doTeardownImpl?.()
- logForDebugging('[bridge:repl] Torn down')
- logEvent('tengu_bridge_repl_teardown', {})
- },
- }
- }
- /**
- * Persistent poll loop for work items. Runs in the background for the
- * lifetime of the bridge connection.
- *
- * When a work item arrives, acknowledges it and calls onWorkReceived
- * with the session ID and ingress token (which connects the ingress
- * WebSocket). Then continues polling — the server will dispatch a new
- * work item if the ingress WebSocket drops, allowing automatic
- * reconnection without tearing down the bridge.
- */
- async function startWorkPollLoop({
- api,
- getCredentials,
- signal,
- onStateChange,
- onWorkReceived,
- onEnvironmentLost,
- getWsState,
- isAtCapacity,
- capacitySignal,
- onFatalError,
- getPollIntervalConfig = () => DEFAULT_POLL_CONFIG,
- getHeartbeatInfo,
- onHeartbeatFatal,
- }: {
- api: BridgeApiClient
- getCredentials: () => { environmentId: string; environmentSecret: string }
- signal: AbortSignal
- onStateChange?: (state: BridgeState, detail?: string) => void
- onWorkReceived: (
- sessionId: string,
- ingressToken: string,
- workId: string,
- useCodeSessions: boolean,
- ) => void
- /** Called when the environment has been deleted. Returns new credentials or null. */
- onEnvironmentLost?: () => Promise<{
- environmentId: string
- environmentSecret: string
- } | null>
- /** Returns the current WebSocket readyState label for diagnostic logging. */
- getWsState?: () => string
- /**
- * Returns true when the caller cannot accept new work (transport already
- * connected). When true, the loop polls at the configured at-capacity
- * interval as a heartbeat only. Server-side BRIDGE_LAST_POLL_TTL is
- * 4 hours — anything shorter than that is sufficient for liveness.
- */
- isAtCapacity?: () => boolean
- /**
- * Produces a signal that aborts when capacity frees up (transport lost),
- * merged with the loop signal. Used to interrupt the at-capacity sleep
- * so recovery polling starts immediately.
- */
- capacitySignal?: () => CapacitySignal
- /** Called on unrecoverable errors (e.g. server-side expiry) to trigger full teardown. */
- onFatalError?: () => void
- /** Poll interval config getter — defaults to DEFAULT_POLL_CONFIG. */
- getPollIntervalConfig?: () => PollIntervalConfig
- /**
- * Returns the current work ID and session ingress token for heartbeat.
- * When null, heartbeat is not possible (no active work item).
- */
- getHeartbeatInfo?: () => {
- environmentId: string
- workId: string
- sessionToken: string
- } | null
- /**
- * Called when heartbeatWork throws BridgeFatalError (401/403/404/410 —
- * JWT expired or work item gone). Caller should tear down the transport
- * + work state so isAtCapacity() flips to false and the loop fast-polls
- * for the server's re-dispatched work item. When provided, the loop
- * SKIPS the at-capacity backoff sleep (which would otherwise cause a
- * ~10-minute dead window before recovery). When omitted, falls back to
- * the backoff sleep to avoid a tight poll+heartbeat loop.
- */
- onHeartbeatFatal?: (err: BridgeFatalError) => void
- }): Promise<void> {
- const MAX_ENVIRONMENT_RECREATIONS = 3
- logForDebugging(
- `[bridge:repl] Starting work poll loop for env=${getCredentials().environmentId}`,
- )
- let consecutiveErrors = 0
- let firstErrorTime: number | null = null
- let lastPollErrorTime: number | null = null
- let environmentRecreations = 0
- // Set when the at-capacity sleep overruns its deadline by a large margin
- // (process suspension). Consumed at the top of the next iteration to
- // force one fast-poll cycle — isAtCapacity() is `transport !== null`,
- // which stays true while the transport auto-reconnects, so the poll
- // loop would otherwise go straight back to a 10-minute sleep on a
- // transport that may be pointed at a dead socket.
- let suspensionDetected = false
- while (!signal.aborted) {
- // Capture credentials outside try so the catch block can detect
- // whether a concurrent reconnection replaced the environment.
- const { environmentId: envId, environmentSecret: envSecret } =
- getCredentials()
- const pollConfig = getPollIntervalConfig()
- try {
- const work = await api.pollForWork(
- envId,
- envSecret,
- signal,
- pollConfig.reclaim_older_than_ms,
- )
- // A successful poll proves the env is genuinely healthy — reset the
- // env-loss counter so events hours apart each start fresh. Outside
- // the state-change guard below because onEnvLost's success path
- // already emits 'ready'; emitting again here would be a duplicate.
- // (onEnvLost returning creds does NOT reset this — that would break
- // oscillation protection when the new env immediately dies.)
- environmentRecreations = 0
- // Reset error tracking on successful poll
- if (consecutiveErrors > 0) {
- logForDebugging(
- `[bridge:repl] Poll recovered after ${consecutiveErrors} consecutive error(s)`,
- )
- consecutiveErrors = 0
- firstErrorTime = null
- lastPollErrorTime = null
- onStateChange?.('ready')
- }
- if (!work) {
- // Read-and-clear: after a detected suspension, skip the at-capacity
- // branch exactly once. The pollForWork above already refreshed the
- // server's BRIDGE_LAST_POLL_TTL; this fast cycle gives any
- // re-dispatched work item a chance to land before we go back under.
- const skipAtCapacityOnce = suspensionDetected
- suspensionDetected = false
- if (isAtCapacity?.() && capacitySignal && !skipAtCapacityOnce) {
- const atCapMs = pollConfig.poll_interval_ms_at_capacity
- // Heartbeat loops WITHOUT polling. When at-capacity polling is also
- // enabled (atCapMs > 0), the loop tracks a deadline and breaks out
- // to poll at that interval — heartbeat and poll compose instead of
- // one suppressing the other. Breaks out when:
- // - Poll deadline reached (atCapMs > 0 only)
- // - Auth fails (JWT expired → poll refreshes tokens)
- // - Capacity wake fires (transport lost → poll for new work)
- // - Heartbeat config disabled (GrowthBook update)
- // - Loop aborted (shutdown)
- if (
- pollConfig.non_exclusive_heartbeat_interval_ms > 0 &&
- getHeartbeatInfo
- ) {
- logEvent('tengu_bridge_heartbeat_mode_entered', {
- heartbeat_interval_ms:
- pollConfig.non_exclusive_heartbeat_interval_ms,
- })
- // Deadline computed once at entry — GB updates to atCapMs don't
- // shift an in-flight deadline (next entry picks up the new value).
- const pollDeadline = atCapMs > 0 ? Date.now() + atCapMs : null
- let needsBackoff = false
- let hbCycles = 0
- while (
- !signal.aborted &&
- isAtCapacity() &&
- (pollDeadline === null || Date.now() < pollDeadline)
- ) {
- const hbConfig = getPollIntervalConfig()
- if (hbConfig.non_exclusive_heartbeat_interval_ms <= 0) break
- const info = getHeartbeatInfo()
- if (!info) break
- // Capture capacity signal BEFORE the async heartbeat call so
- // a transport loss during the HTTP request is caught by the
- // subsequent sleep.
- const cap = capacitySignal()
- try {
- await api.heartbeatWork(
- info.environmentId,
- info.workId,
- info.sessionToken,
- )
- } catch (err) {
- logForDebugging(
- `[bridge:repl:heartbeat] Failed: ${errorMessage(err)}`,
- )
- if (err instanceof BridgeFatalError) {
- cap.cleanup()
- logEvent('tengu_bridge_heartbeat_error', {
- status:
- err.status as unknown as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS,
- error_type: (err.status === 401 || err.status === 403
- ? 'auth_failed'
- : 'fatal') as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS,
- })
- // JWT expired (401/403) or work item gone (404/410).
- // Either way the current transport is dead — SSE
- // reconnects and CCR writes will fail on the same
- // stale token. If the caller gave us a recovery hook,
- // tear down work state and skip backoff: isAtCapacity()
- // flips to false, next outer-loop iteration fast-polls
- // for the server's re-dispatched work item. Without
- // the hook, backoff to avoid tight poll+heartbeat loop.
- if (onHeartbeatFatal) {
- onHeartbeatFatal(err)
- logForDebugging(
- `[bridge:repl:heartbeat] Fatal (status=${err.status}), work state cleared — fast-polling for re-dispatch`,
- )
- } else {
- needsBackoff = true
- }
- break
- }
- }
- hbCycles++
- await sleep(
- hbConfig.non_exclusive_heartbeat_interval_ms,
- cap.signal,
- )
- cap.cleanup()
- }
- const exitReason = needsBackoff
- ? 'error'
- : signal.aborted
- ? 'shutdown'
- : !isAtCapacity()
- ? 'capacity_changed'
- : pollDeadline !== null && Date.now() >= pollDeadline
- ? 'poll_due'
- : 'config_disabled'
- logEvent('tengu_bridge_heartbeat_mode_exited', {
- reason:
- exitReason as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS,
- heartbeat_cycles: hbCycles,
- })
- // On auth_failed or fatal, backoff before polling to avoid a
- // tight poll+heartbeat loop. Fall through to the shared sleep
- // below — it's the same capacitySignal-wrapped sleep the legacy
- // path uses, and both need the suspension-overrun check.
- if (!needsBackoff) {
- if (exitReason === 'poll_due') {
- // bridgeApi throttles empty-poll logs (EMPTY_POLL_LOG_INTERVAL=100)
- // so the once-per-10min poll_due poll is invisible at counter=2.
- // Log it here so verification runs see both endpoints in the debug log.
- logForDebugging(
- `[bridge:repl] Heartbeat poll_due after ${hbCycles} cycles — falling through to pollForWork`,
- )
- }
- continue
- }
- }
- // At-capacity sleep — reached by both the legacy path (heartbeat
- // disabled) and the heartbeat-backoff path (needsBackoff=true).
- // Merged so the suspension detector covers both; previously the
- // backoff path had no overrun check and could go straight back
- // under for 10 min after a laptop wake. Use atCapMs when enabled,
- // else the heartbeat interval as a floor (guaranteed > 0 on the
- // backoff path) so heartbeat-only configs don't tight-loop.
- const sleepMs =
- atCapMs > 0
- ? atCapMs
- : pollConfig.non_exclusive_heartbeat_interval_ms
- if (sleepMs > 0) {
- const cap = capacitySignal()
- const sleepStart = Date.now()
- await sleep(sleepMs, cap.signal)
- cap.cleanup()
- // Process-suspension detector. A setTimeout overshooting its
- // deadline by 60s means the process was suspended (laptop lid,
- // SIGSTOP, VM pause) — even a pathological GC pause is seconds,
- // not minutes. Early aborts (wakePollLoop → cap.signal) produce
- // overrun < 0 and fall through. Note: this only catches sleeps
- // that outlast their deadline; WebSocketTransport's ping
- // interval (10s granularity) is the primary detector for shorter
- // suspensions. This is the backstop for when that detector isn't
- // running (transport mid-reconnect, interval stopped).
- const overrun = Date.now() - sleepStart - sleepMs
- if (overrun > 60_000) {
- logForDebugging(
- `[bridge:repl] At-capacity sleep overran by ${Math.round(overrun / 1000)}s — process suspension detected, forcing one fast-poll cycle`,
- )
- logEvent('tengu_bridge_repl_suspension_detected', {
- overrun_ms: overrun,
- })
- suspensionDetected = true
- }
- }
- } else {
- await sleep(pollConfig.poll_interval_ms_not_at_capacity, signal)
- }
- continue
- }
- // Decode before type dispatch — need the JWT for the explicit ack.
- let secret
- try {
- secret = decodeWorkSecret(work.secret)
- } catch (err) {
- logForDebugging(
- `[bridge:repl] Failed to decode work secret: ${errorMessage(err)}`,
- )
- logEvent('tengu_bridge_repl_work_secret_failed', {})
- // Can't ack (needs the JWT we failed to decode). stopWork uses OAuth.
- // Prevents XAUTOCLAIM re-delivering this poisoned item every cycle.
- await api.stopWork(envId, work.id, false).catch(() => {})
- continue
- }
- // Explicitly acknowledge to prevent redelivery. Non-fatal on failure:
- // server re-delivers, and the onWorkReceived callback handles dedup.
- logForDebugging(`[bridge:repl] Acknowledging workId=${work.id}`)
- try {
- await api.acknowledgeWork(envId, work.id, secret.session_ingress_token)
- } catch (err) {
- logForDebugging(
- `[bridge:repl] Acknowledge failed workId=${work.id}: ${errorMessage(err)}`,
- )
- }
- if (work.data.type === 'healthcheck') {
- logForDebugging('[bridge:repl] Healthcheck received')
- continue
- }
- if (work.data.type === 'session') {
- const workSessionId = work.data.id
- try {
- validateBridgeId(workSessionId, 'session_id')
- } catch {
- logForDebugging(
- `[bridge:repl] Invalid session_id in work: ${workSessionId}`,
- )
- continue
- }
- onWorkReceived(
- workSessionId,
- secret.session_ingress_token,
- work.id,
- secret.use_code_sessions === true,
- )
- logForDebugging('[bridge:repl] Work accepted, continuing poll loop')
- }
- } catch (err) {
- if (signal.aborted) break
- // Detect permanent "environment deleted" error — no amount of
- // retrying will recover. Re-register a new environment instead.
- // Checked BEFORE the generic BridgeFatalError bail. pollForWork uses
- // validateStatus: s => s < 500, so 404 is always wrapped into a
- // BridgeFatalError by handleErrorStatus() — never an axios-shaped
- // error. The poll endpoint's only path param is the env ID; 404
- // unambiguously means env-gone (no-work is a 200 with null body).
- // The server sends error.type='not_found_error' (standard Anthropic
- // API shape), not a bridge-specific string — but status===404 is
- // the real signal and survives body-shape changes.
- if (
- err instanceof BridgeFatalError &&
- err.status === 404 &&
- onEnvironmentLost
- ) {
- // If credentials have already been refreshed by a concurrent
- // reconnection (e.g. WS close handler), the stale poll's error
- // is expected — skip onEnvironmentLost and retry with fresh creds.
- const currentEnvId = getCredentials().environmentId
- if (envId !== currentEnvId) {
- logForDebugging(
- `[bridge:repl] Stale poll error for old env=${envId}, current env=${currentEnvId} — skipping onEnvironmentLost`,
- )
- consecutiveErrors = 0
- firstErrorTime = null
- continue
- }
- environmentRecreations++
- logForDebugging(
- `[bridge:repl] Environment deleted, attempting re-registration (attempt ${environmentRecreations}/${MAX_ENVIRONMENT_RECREATIONS})`,
- )
- logEvent('tengu_bridge_repl_env_lost', {
- attempt: environmentRecreations,
- } as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS)
- if (environmentRecreations > MAX_ENVIRONMENT_RECREATIONS) {
- logForDebugging(
- `[bridge:repl] Environment re-registration limit reached (${MAX_ENVIRONMENT_RECREATIONS}), giving up`,
- )
- onStateChange?.(
- 'failed',
- 'Environment deleted and re-registration limit reached',
- )
- onFatalError?.()
- break
- }
- onStateChange?.('reconnecting', 'environment lost, recreating session')
- const newCreds = await onEnvironmentLost()
- // doReconnect() makes several sequential network calls (1-5s).
- // If the user triggered teardown during that window, its internal
- // abort checks return false — but we need to re-check here to
- // avoid emitting a spurious 'failed' + onFatalError() during
- // graceful shutdown.
- if (signal.aborted) break
- if (newCreds) {
- // Credentials are updated in the outer scope via
- // reconnectEnvironmentWithSession — getCredentials() will
- // return the fresh values on the next poll iteration.
- // Do NOT reset environmentRecreations here — onEnvLost returning
- // creds only proves we tried to fix it, not that the env is
- // healthy. A successful poll (above) is the reset point; if the
- // new env immediately dies again we still want the limit to fire.
- consecutiveErrors = 0
- firstErrorTime = null
- onStateChange?.('ready')
- logForDebugging(
- `[bridge:repl] Re-registered environment: ${newCreds.environmentId}`,
- )
- continue
- }
- onStateChange?.(
- 'failed',
- 'Environment deleted and re-registration failed',
- )
- onFatalError?.()
- break
- }
- // Fatal errors (401/403/404/410) — no point retrying
- if (err instanceof BridgeFatalError) {
- const isExpiry = isExpiredErrorType(err.errorType)
- const isSuppressible = isSuppressible403(err)
- logForDebugging(
- `[bridge:repl] Fatal poll error: ${err.message} (status=${err.status}, type=${err.errorType ?? 'unknown'})${isSuppressible ? ' (suppressed)' : ''}`,
- )
- logEvent('tengu_bridge_repl_fatal_error', {
- status: err.status,
- error_type:
- err.errorType as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS,
- })
- logForDiagnosticsNoPII(
- isExpiry ? 'info' : 'error',
- 'bridge_repl_fatal_error',
- { status: err.status, error_type: err.errorType },
- )
- // Cosmetic 403 errors (e.g., external_poll_sessions scope,
- // environments:manage permission) — suppress user-visible error
- // but always trigger teardown so cleanup runs.
- if (!isSuppressible) {
- onStateChange?.(
- 'failed',
- isExpiry
- ? 'session expired · /remote-control to reconnect'
- : err.message,
- )
- }
- // Always trigger teardown — matches bridgeMain.ts where fatalExit=true
- // is unconditional and post-loop cleanup always runs.
- onFatalError?.()
- break
- }
- const now = Date.now()
- // Detect system sleep/wake: if the gap since the last poll error
- // greatly exceeds the max backoff delay, the machine likely slept.
- // Reset error tracking so we retry with a fresh budget instead of
- // immediately giving up.
- if (
- lastPollErrorTime !== null &&
- now - lastPollErrorTime > POLL_ERROR_MAX_DELAY_MS * 2
- ) {
- logForDebugging(
- `[bridge:repl] Detected system sleep (${Math.round((now - lastPollErrorTime) / 1000)}s gap), resetting poll error budget`,
- )
- logForDiagnosticsNoPII('info', 'bridge_repl_poll_sleep_detected', {
- gapMs: now - lastPollErrorTime,
- })
- consecutiveErrors = 0
- firstErrorTime = null
- }
- lastPollErrorTime = now
- consecutiveErrors++
- if (firstErrorTime === null) {
- firstErrorTime = now
- }
- const elapsed = now - firstErrorTime
- const httpStatus = extractHttpStatus(err)
- const errMsg = describeAxiosError(err)
- const wsLabel = getWsState?.() ?? 'unknown'
- logForDebugging(
- `[bridge:repl] Poll error (attempt ${consecutiveErrors}, elapsed ${Math.round(elapsed / 1000)}s, ws=${wsLabel}): ${errMsg}`,
- )
- logEvent('tengu_bridge_repl_poll_error', {
- status: httpStatus,
- consecutiveErrors,
- elapsedMs: elapsed,
- } as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS)
- // Only transition to 'reconnecting' on the first error — stay
- // there until a successful poll (avoid flickering the UI state).
- if (consecutiveErrors === 1) {
- onStateChange?.('reconnecting', errMsg)
- }
- // Give up after continuous failures
- if (elapsed >= POLL_ERROR_GIVE_UP_MS) {
- logForDebugging(
- `[bridge:repl] Poll failures exceeded ${POLL_ERROR_GIVE_UP_MS / 1000}s (${consecutiveErrors} errors), giving up`,
- )
- logForDiagnosticsNoPII('info', 'bridge_repl_poll_give_up')
- logEvent('tengu_bridge_repl_poll_give_up', {
- consecutiveErrors,
- elapsedMs: elapsed,
- lastStatus: httpStatus,
- } as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS)
- onStateChange?.('failed', 'connection to server lost')
- break
- }
- // Exponential backoff: 2s → 4s → 8s → 16s → 32s → 60s (cap)
- const backoff = Math.min(
- POLL_ERROR_INITIAL_DELAY_MS * 2 ** (consecutiveErrors - 1),
- POLL_ERROR_MAX_DELAY_MS,
- )
- // The poll_due heartbeat-loop exit leaves a healthy lease exposed to
- // this backoff path. Heartbeat before each sleep so /poll outages
- // (the VerifyEnvironmentSecretAuth DB path heartbeat was introduced to
- // avoid) don't kill the 300s lease TTL.
- if (getPollIntervalConfig().non_exclusive_heartbeat_interval_ms > 0) {
- const info = getHeartbeatInfo?.()
- if (info) {
- try {
- await api.heartbeatWork(
- info.environmentId,
- info.workId,
- info.sessionToken,
- )
- } catch {
- // Best-effort — if heartbeat also fails the lease dies, same as
- // pre-poll_due behavior (where the only heartbeat-loop exits were
- // ones where the lease was already dying).
- }
- }
- }
- await sleep(backoff, signal)
- }
- }
- logForDebugging(
- `[bridge:repl] Work poll loop ended (aborted=${signal.aborted}) env=${getCredentials().environmentId}`,
- )
- }
- // Exported for testing only
- export {
- startWorkPollLoop as _startWorkPollLoopForTesting,
- POLL_ERROR_INITIAL_DELAY_MS as _POLL_ERROR_INITIAL_DELAY_MS_ForTesting,
- POLL_ERROR_MAX_DELAY_MS as _POLL_ERROR_MAX_DELAY_MS_ForTesting,
- POLL_ERROR_GIVE_UP_MS as _POLL_ERROR_GIVE_UP_MS_ForTesting,
- }
|