conversationRecovery.ts 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597
  1. import { feature } from 'bun:bundle'
  2. import type { UUID } from 'crypto'
  3. import { relative } from 'path'
  4. import { getCwd } from 'src/utils/cwd.js'
  5. import { addInvokedSkill } from '../bootstrap/state.js'
  6. import { asSessionId } from '../types/ids.js'
  7. import type {
  8. AttributionSnapshotMessage,
  9. ContextCollapseCommitEntry,
  10. ContextCollapseSnapshotEntry,
  11. LogOption,
  12. PersistedWorktreeSession,
  13. SerializedMessage,
  14. } from '../types/logs.js'
  15. import type {
  16. Message,
  17. NormalizedMessage,
  18. NormalizedUserMessage,
  19. } from '../types/message.js'
  20. import { PERMISSION_MODES } from '../types/permissions.js'
  21. import { suppressNextSkillListing } from './attachments.js'
  22. import {
  23. copyFileHistoryForResume,
  24. type FileHistorySnapshot,
  25. } from './fileHistory.js'
  26. import { logError } from './log.js'
  27. import {
  28. createAssistantMessage,
  29. createUserMessage,
  30. filterOrphanedThinkingOnlyMessages,
  31. filterUnresolvedToolUses,
  32. filterWhitespaceOnlyAssistantMessages,
  33. isToolUseResultMessage,
  34. NO_RESPONSE_REQUESTED,
  35. normalizeMessages,
  36. } from './messages.js'
  37. import { copyPlanForResume } from './plans.js'
  38. import { processSessionStartHooks } from './sessionStart.js'
  39. import {
  40. buildConversationChain,
  41. checkResumeConsistency,
  42. getLastSessionLog,
  43. getSessionIdFromLog,
  44. isLiteLog,
  45. loadFullLog,
  46. loadMessageLogs,
  47. loadTranscriptFile,
  48. removeExtraFields,
  49. } from './sessionStorage.js'
  50. import type { ContentReplacementRecord } from './toolResultStorage.js'
  51. // Dead code elimination: ant-only tool names are conditionally required so
  52. // their strings don't leak into external builds. Static imports always bundle.
  53. /* eslint-disable @typescript-eslint/no-require-imports */
  54. const BRIEF_TOOL_NAME: string | null =
  55. feature('KAIROS') || feature('KAIROS_BRIEF')
  56. ? (
  57. require('../tools/BriefTool/prompt.js') as typeof import('../tools/BriefTool/prompt.js')
  58. ).BRIEF_TOOL_NAME
  59. : null
  60. const LEGACY_BRIEF_TOOL_NAME: string | null =
  61. feature('KAIROS') || feature('KAIROS_BRIEF')
  62. ? (
  63. require('../tools/BriefTool/prompt.js') as typeof import('../tools/BriefTool/prompt.js')
  64. ).LEGACY_BRIEF_TOOL_NAME
  65. : null
  66. const SEND_USER_FILE_TOOL_NAME: string | null = feature('KAIROS')
  67. ? (
  68. require('../tools/SendUserFileTool/prompt.js') as typeof import('../tools/SendUserFileTool/prompt.js')
  69. ).SEND_USER_FILE_TOOL_NAME
  70. : null
  71. /* eslint-enable @typescript-eslint/no-require-imports */
  72. /**
  73. * Transforms legacy attachment types to current types for backward compatibility
  74. */
  75. function migrateLegacyAttachmentTypes(message: Message): Message {
  76. if (message.type !== 'attachment') {
  77. return message
  78. }
  79. const attachment = message.attachment as {
  80. type: string
  81. [key: string]: unknown
  82. } // Handle legacy types not in current type system
  83. // Transform legacy attachment types
  84. if (attachment.type === 'new_file') {
  85. return {
  86. ...message,
  87. attachment: {
  88. ...attachment,
  89. type: 'file',
  90. displayPath: relative(getCwd(), attachment.filename as string),
  91. },
  92. } as SerializedMessage // Cast entire message since we know the structure is correct
  93. }
  94. if (attachment.type === 'new_directory') {
  95. return {
  96. ...message,
  97. attachment: {
  98. ...attachment,
  99. type: 'directory',
  100. displayPath: relative(getCwd(), attachment.path as string),
  101. },
  102. } as SerializedMessage // Cast entire message since we know the structure is correct
  103. }
  104. // Backfill displayPath for attachments from old sessions
  105. if (!('displayPath' in attachment)) {
  106. const path =
  107. 'filename' in attachment
  108. ? (attachment.filename as string)
  109. : 'path' in attachment
  110. ? (attachment.path as string)
  111. : 'skillDir' in attachment
  112. ? (attachment.skillDir as string)
  113. : undefined
  114. if (path) {
  115. return {
  116. ...message,
  117. attachment: {
  118. ...attachment,
  119. displayPath: relative(getCwd(), path),
  120. },
  121. } as Message
  122. }
  123. }
  124. return message
  125. }
  126. export type TeleportRemoteResponse = {
  127. log: Message[]
  128. branch?: string
  129. }
  130. export type TurnInterruptionState =
  131. | { kind: 'none' }
  132. | { kind: 'interrupted_prompt'; message: NormalizedUserMessage }
  133. export type DeserializeResult = {
  134. messages: Message[]
  135. turnInterruptionState: TurnInterruptionState
  136. }
  137. /**
  138. * Deserializes messages from a log file into the format expected by the REPL.
  139. * Filters unresolved tool uses, orphaned thinking messages, and appends a
  140. * synthetic assistant sentinel when the last message is from the user.
  141. * @internal Exported for testing - use loadConversationForResume instead
  142. */
  143. export function deserializeMessages(serializedMessages: Message[]): Message[] {
  144. return deserializeMessagesWithInterruptDetection(serializedMessages).messages
  145. }
  146. /**
  147. * Like deserializeMessages, but also detects whether the session was
  148. * interrupted mid-turn. Used by the SDK resume path to auto-continue
  149. * interrupted turns after a gateway-triggered restart.
  150. * @internal Exported for testing
  151. */
  152. export function deserializeMessagesWithInterruptDetection(
  153. serializedMessages: Message[],
  154. ): DeserializeResult {
  155. try {
  156. // Transform legacy attachment types before processing
  157. const migratedMessages = serializedMessages.map(
  158. migrateLegacyAttachmentTypes,
  159. )
  160. // Strip invalid permissionMode values from deserialized user messages.
  161. // The field is unvalidated JSON from disk and may contain modes from a different build.
  162. const validModes = new Set<string>(PERMISSION_MODES)
  163. for (const msg of migratedMessages) {
  164. if (
  165. msg.type === 'user' &&
  166. msg.permissionMode !== undefined &&
  167. !validModes.has(msg.permissionMode)
  168. ) {
  169. msg.permissionMode = undefined
  170. }
  171. }
  172. // Filter out unresolved tool uses and any synthetic messages that follow them
  173. const filteredToolUses = filterUnresolvedToolUses(
  174. migratedMessages,
  175. ) as NormalizedMessage[]
  176. // Filter out orphaned thinking-only assistant messages that can cause API errors
  177. // during resume. These occur when streaming yields separate messages per content
  178. // block and interleaved user messages prevent proper merging by message.id.
  179. const filteredThinking = filterOrphanedThinkingOnlyMessages(
  180. filteredToolUses,
  181. ) as NormalizedMessage[]
  182. // Filter out assistant messages with only whitespace text content.
  183. // This can happen when model outputs "\n\n" before thinking, user cancels mid-stream.
  184. const filteredMessages = filterWhitespaceOnlyAssistantMessages(
  185. filteredThinking,
  186. ) as NormalizedMessage[]
  187. const internalState = detectTurnInterruption(filteredMessages)
  188. // Transform mid-turn interruptions into interrupted_prompt by appending
  189. // a synthetic continuation message. This unifies both interruption kinds
  190. // so the consumer only needs to handle interrupted_prompt.
  191. let turnInterruptionState: TurnInterruptionState
  192. if (internalState.kind === 'interrupted_turn') {
  193. const [continuationMessage] = normalizeMessages([
  194. createUserMessage({
  195. content: 'Continue from where you left off.',
  196. isMeta: true,
  197. }),
  198. ])
  199. filteredMessages.push(continuationMessage!)
  200. turnInterruptionState = {
  201. kind: 'interrupted_prompt',
  202. message: continuationMessage!,
  203. }
  204. } else {
  205. turnInterruptionState = internalState
  206. }
  207. // Append a synthetic assistant sentinel after the last user message so
  208. // the conversation is API-valid if no resume action is taken. Skip past
  209. // trailing system/progress messages and insert right after the user
  210. // message so removeInterruptedMessage's splice(idx, 2) removes the
  211. // correct pair.
  212. const lastRelevantIdx = filteredMessages.findLastIndex(
  213. m => m.type !== 'system' && m.type !== 'progress',
  214. )
  215. if (
  216. lastRelevantIdx !== -1 &&
  217. filteredMessages[lastRelevantIdx]!.type === 'user'
  218. ) {
  219. filteredMessages.splice(
  220. lastRelevantIdx + 1,
  221. 0,
  222. createAssistantMessage({
  223. content: NO_RESPONSE_REQUESTED,
  224. }) as NormalizedMessage,
  225. )
  226. }
  227. return { messages: filteredMessages, turnInterruptionState }
  228. } catch (error) {
  229. logError(error as Error)
  230. throw error
  231. }
  232. }
  233. /**
  234. * Internal 3-way result from detection, before transforming interrupted_turn
  235. * into interrupted_prompt with a synthetic continuation message.
  236. */
  237. type InternalInterruptionState =
  238. | TurnInterruptionState
  239. | { kind: 'interrupted_turn' }
  240. /**
  241. * Determines whether the conversation was interrupted mid-turn based on the
  242. * last message after filtering. An assistant as last message (after filtering
  243. * unresolved tool_uses) is treated as a completed turn because stop_reason is
  244. * always null on persisted messages in the streaming path.
  245. *
  246. * System and progress messages are skipped when finding the last turn-relevant
  247. * message — they are bookkeeping artifacts that should not mask a genuine
  248. * interruption. Attachments are kept as part of the turn.
  249. */
  250. function detectTurnInterruption(
  251. messages: NormalizedMessage[],
  252. ): InternalInterruptionState {
  253. if (messages.length === 0) {
  254. return { kind: 'none' }
  255. }
  256. // Find the last turn-relevant message, skipping system/progress and
  257. // synthetic API error assistants. Error assistants are already filtered
  258. // before API send (normalizeMessagesForAPI) — skipping them here lets
  259. // auto-resume fire after retry exhaustion instead of reading the error as
  260. // a completed turn.
  261. const lastMessageIdx = messages.findLastIndex(
  262. m =>
  263. m.type !== 'system' &&
  264. m.type !== 'progress' &&
  265. !(m.type === 'assistant' && m.isApiErrorMessage),
  266. )
  267. const lastMessage =
  268. lastMessageIdx !== -1 ? messages[lastMessageIdx] : undefined
  269. if (!lastMessage) {
  270. return { kind: 'none' }
  271. }
  272. if (lastMessage.type === 'assistant') {
  273. // In the streaming path, stop_reason is always null on persisted messages
  274. // because messages are recorded at content_block_stop time, before
  275. // message_delta delivers the stop_reason. After filterUnresolvedToolUses
  276. // has removed assistant messages with unmatched tool_uses, an assistant as
  277. // the last message means the turn most likely completed normally.
  278. return { kind: 'none' }
  279. }
  280. if (lastMessage.type === 'user') {
  281. if (lastMessage.isMeta || lastMessage.isCompactSummary) {
  282. return { kind: 'none' }
  283. }
  284. if (isToolUseResultMessage(lastMessage)) {
  285. // Brief mode (#20467) drops the trailing assistant text block, so a
  286. // completed brief-mode turn legitimately ends on SendUserMessage's
  287. // tool_result. Without this check, resume misclassifies every
  288. // brief-mode session as interrupted mid-turn and injects a phantom
  289. // "Continue from where you left off." before the user's real next
  290. // prompt. Look back one step for the originating tool_use.
  291. if (isTerminalToolResult(lastMessage, messages, lastMessageIdx)) {
  292. return { kind: 'none' }
  293. }
  294. return { kind: 'interrupted_turn' }
  295. }
  296. // Plain text user prompt — CC hadn't started responding
  297. return { kind: 'interrupted_prompt', message: lastMessage }
  298. }
  299. if (lastMessage.type === 'attachment') {
  300. // Attachments are part of the user turn — the user provided context but
  301. // the assistant never responded.
  302. return { kind: 'interrupted_turn' }
  303. }
  304. return { kind: 'none' }
  305. }
  306. /**
  307. * Is this tool_result the output of a tool that legitimately terminates a
  308. * turn? SendUserMessage is the canonical case: in brief mode, calling it is
  309. * the turn's final act — there is no follow-up assistant text (#20467
  310. * removed it). A transcript ending here means the turn COMPLETED, not that
  311. * it was killed mid-tool.
  312. *
  313. * Walks back to find the assistant tool_use that this result belongs to and
  314. * checks its name. The matching tool_use is typically the immediately
  315. * preceding relevant message (filterUnresolvedToolUses has already dropped
  316. * unpaired ones), but we walk just in case system/progress noise is
  317. * interleaved.
  318. */
  319. function isTerminalToolResult(
  320. result: NormalizedUserMessage,
  321. messages: NormalizedMessage[],
  322. resultIdx: number,
  323. ): boolean {
  324. const content = result.message.content
  325. if (!Array.isArray(content)) return false
  326. const block = content[0]
  327. if (block?.type !== 'tool_result') return false
  328. const toolUseId = block.tool_use_id
  329. for (let i = resultIdx - 1; i >= 0; i--) {
  330. const msg = messages[i]!
  331. if (msg.type !== 'assistant') continue
  332. for (const b of msg.message.content) {
  333. if (b.type === 'tool_use' && b.id === toolUseId) {
  334. return (
  335. b.name === BRIEF_TOOL_NAME ||
  336. b.name === LEGACY_BRIEF_TOOL_NAME ||
  337. b.name === SEND_USER_FILE_TOOL_NAME
  338. )
  339. }
  340. }
  341. }
  342. return false
  343. }
  344. /**
  345. * Restores skill state from invoked_skills attachments in messages.
  346. * This ensures that skills are preserved across resume after compaction.
  347. * Without this, if another compaction happens after resume, the skills would be lost
  348. * because STATE.invokedSkills would be empty.
  349. * @internal Exported for testing - use loadConversationForResume instead
  350. */
  351. export function restoreSkillStateFromMessages(messages: Message[]): void {
  352. for (const message of messages) {
  353. if (message.type !== 'attachment') {
  354. continue
  355. }
  356. if (message.attachment.type === 'invoked_skills') {
  357. for (const skill of message.attachment.skills) {
  358. if (skill.name && skill.path && skill.content) {
  359. // Resume only happens for the main session, so agentId is null
  360. addInvokedSkill(skill.name, skill.path, skill.content, null)
  361. }
  362. }
  363. }
  364. // A prior process already injected the skills-available reminder — it's
  365. // in the transcript the model is about to see. sentSkillNames is
  366. // process-local, so without this every resume re-announces the same
  367. // ~600 tokens. Fire-once latch; consumed on the first attachment pass.
  368. if (message.attachment.type === 'skill_listing') {
  369. suppressNextSkillListing()
  370. }
  371. }
  372. }
  373. /**
  374. * Chain-walk a transcript jsonl by path. Same sequence loadFullLog
  375. * runs internally — loadTranscriptFile → find newest non-sidechain
  376. * leaf → buildConversationChain → removeExtraFields — just starting
  377. * from an arbitrary path instead of the sid-derived one.
  378. *
  379. * leafUuids is populated by loadTranscriptFile as "uuids that no
  380. * other message's parentUuid points at" — the chain tips. There can
  381. * be several (sidechains, orphans); newest non-sidechain is the main
  382. * conversation's end.
  383. */
  384. export async function loadMessagesFromJsonlPath(path: string): Promise<{
  385. messages: SerializedMessage[]
  386. sessionId: UUID | undefined
  387. }> {
  388. const { messages: byUuid, leafUuids } = await loadTranscriptFile(path)
  389. let tip: (typeof byUuid extends Map<UUID, infer T> ? T : never) | null = null
  390. let tipTs = 0
  391. for (const m of byUuid.values()) {
  392. if (m.isSidechain || !leafUuids.has(m.uuid)) continue
  393. const ts = new Date(m.timestamp).getTime()
  394. if (ts > tipTs) {
  395. tipTs = ts
  396. tip = m
  397. }
  398. }
  399. if (!tip) return { messages: [], sessionId: undefined }
  400. const chain = buildConversationChain(byUuid, tip)
  401. return {
  402. messages: removeExtraFields(chain),
  403. // Leaf's sessionId — forked sessions copy chain[0] from the source
  404. // transcript, so the root retains the source session's ID. Matches
  405. // loadFullLog's mostRecentLeaf.sessionId.
  406. sessionId: tip.sessionId as UUID | undefined,
  407. }
  408. }
  409. /**
  410. * Loads a conversation for resume from various sources.
  411. * This is the centralized function for loading and deserializing conversations.
  412. *
  413. * @param source - The source to load from:
  414. * - undefined: load most recent conversation
  415. * - string: session ID to load
  416. * - LogOption: already loaded conversation
  417. * @param sourceJsonlFile - Alternate: path to a transcript jsonl.
  418. * Used when --resume receives a .jsonl path (cli/print.ts routes
  419. * on suffix), typically for cross-directory resume where the
  420. * transcript lives outside the current project dir.
  421. * @returns Object containing the deserialized messages and the original log, or null if not found
  422. */
  423. export async function loadConversationForResume(
  424. source: string | LogOption | undefined,
  425. sourceJsonlFile: string | undefined,
  426. ): Promise<{
  427. messages: Message[]
  428. turnInterruptionState: TurnInterruptionState
  429. fileHistorySnapshots?: FileHistorySnapshot[]
  430. attributionSnapshots?: AttributionSnapshotMessage[]
  431. contentReplacements?: ContentReplacementRecord[]
  432. contextCollapseCommits?: ContextCollapseCommitEntry[]
  433. contextCollapseSnapshot?: ContextCollapseSnapshotEntry
  434. sessionId: UUID | undefined
  435. // Session metadata for restoring agent context
  436. agentName?: string
  437. agentColor?: string
  438. agentSetting?: string
  439. customTitle?: string
  440. tag?: string
  441. mode?: 'coordinator' | 'normal'
  442. worktreeSession?: PersistedWorktreeSession | null
  443. prNumber?: number
  444. prUrl?: string
  445. prRepository?: string
  446. // Full path to the session file (for cross-directory resume)
  447. fullPath?: string
  448. } | null> {
  449. try {
  450. let log: LogOption | null = null
  451. let messages: Message[] | null = null
  452. let sessionId: UUID | undefined
  453. if (source === undefined) {
  454. // --continue: most recent session, skipping live --bg/daemon sessions
  455. // that are actively writing their own transcript.
  456. const logsPromise = loadMessageLogs()
  457. let skip = new Set<string>()
  458. if (feature('BG_SESSIONS')) {
  459. try {
  460. const { listAllLiveSessions } = await import('./udsClient.js')
  461. const live = await listAllLiveSessions()
  462. skip = new Set(
  463. live.flatMap(s =>
  464. s.kind && s.kind !== 'interactive' && s.sessionId
  465. ? [s.sessionId]
  466. : [],
  467. ),
  468. )
  469. } catch {
  470. // UDS unavailable — treat all sessions as continuable
  471. }
  472. }
  473. const logs = await logsPromise
  474. log =
  475. logs.find(l => {
  476. const id = getSessionIdFromLog(l)
  477. return !id || !skip.has(id)
  478. }) ?? null
  479. } else if (sourceJsonlFile) {
  480. // --resume with a .jsonl path (cli/print.ts routes on suffix).
  481. // Same chain walk as the sid branch below — only the starting
  482. // path differs.
  483. const loaded = await loadMessagesFromJsonlPath(sourceJsonlFile)
  484. messages = loaded.messages
  485. sessionId = loaded.sessionId
  486. } else if (typeof source === 'string') {
  487. // Load specific session by ID
  488. log = await getLastSessionLog(source as UUID)
  489. sessionId = source as UUID
  490. } else {
  491. // Already have a LogOption
  492. log = source
  493. }
  494. if (!log && !messages) {
  495. return null
  496. }
  497. if (log) {
  498. // Load full messages for lite logs
  499. if (isLiteLog(log)) {
  500. log = await loadFullLog(log)
  501. }
  502. // Determine sessionId first so we can pass it to copy functions
  503. if (!sessionId) {
  504. sessionId = getSessionIdFromLog(log) as UUID
  505. }
  506. // Pass the original session ID to ensure the plan slug is associated with
  507. // the session we're resuming, not the temporary session ID before resume
  508. if (sessionId) {
  509. await copyPlanForResume(log, asSessionId(sessionId))
  510. }
  511. // Copy file history for resume
  512. void copyFileHistoryForResume(log)
  513. messages = log.messages
  514. checkResumeConsistency(messages)
  515. }
  516. // Restore skill state from invoked_skills attachments before deserialization.
  517. // This ensures skills survive multiple compaction cycles after resume.
  518. restoreSkillStateFromMessages(messages!)
  519. // Deserialize messages to handle unresolved tool uses and ensure proper format
  520. const deserialized = deserializeMessagesWithInterruptDetection(messages!)
  521. messages = deserialized.messages
  522. // Process session start hooks for resume
  523. const hookMessages = await processSessionStartHooks('resume', { sessionId })
  524. // Append hook messages to the conversation
  525. messages.push(...hookMessages)
  526. return {
  527. messages,
  528. turnInterruptionState: deserialized.turnInterruptionState,
  529. fileHistorySnapshots: log?.fileHistorySnapshots,
  530. attributionSnapshots: log?.attributionSnapshots,
  531. contentReplacements: log?.contentReplacements,
  532. contextCollapseCommits: log?.contextCollapseCommits,
  533. contextCollapseSnapshot: log?.contextCollapseSnapshot,
  534. sessionId,
  535. // Include session metadata for restoring agent context on resume
  536. agentName: log?.agentName,
  537. agentColor: log?.agentColor,
  538. agentSetting: log?.agentSetting,
  539. customTitle: log?.customTitle,
  540. tag: log?.tag,
  541. mode: log?.mode,
  542. worktreeSession: log?.worktreeSession,
  543. prNumber: log?.prNumber,
  544. prUrl: log?.prUrl,
  545. prRepository: log?.prRepository,
  546. // Include full path for cross-directory resume
  547. fullPath: log?.fullPath,
  548. }
  549. } catch (error) {
  550. logError(error as Error)
  551. throw error
  552. }
  553. }