messageQueueManager.ts 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547
  1. import { feature } from 'bun:bundle'
  2. import type { ContentBlockParam } from '@anthropic-ai/sdk/resources/messages.mjs'
  3. import type { Permutations } from 'src/types/utils.js'
  4. import { getSessionId } from '../bootstrap/state.js'
  5. import type { AppState } from '../state/AppState.js'
  6. import type {
  7. QueueOperation,
  8. QueueOperationMessage,
  9. } from '../types/messageQueueTypes.js'
  10. import type {
  11. EditablePromptInputMode,
  12. PromptInputMode,
  13. QueuedCommand,
  14. QueuePriority,
  15. } from '../types/textInputTypes.js'
  16. import type { PastedContent } from './config.js'
  17. import { extractTextContent } from './messages.js'
  18. import { objectGroupBy } from './objectGroupBy.js'
  19. import { recordQueueOperation } from './sessionStorage.js'
  20. import { createSignal } from './signal.js'
  21. export type SetAppState = (f: (prev: AppState) => AppState) => void
  22. // ============================================================================
  23. // Logging helper
  24. // ============================================================================
  25. function logOperation(operation: QueueOperation, content?: string): void {
  26. const sessionId = getSessionId()
  27. const queueOp: QueueOperationMessage = {
  28. type: 'queue-operation',
  29. operation,
  30. timestamp: new Date().toISOString(),
  31. sessionId,
  32. ...(content !== undefined && { content }),
  33. }
  34. void recordQueueOperation(queueOp)
  35. }
  36. // ============================================================================
  37. // Unified command queue (module-level, independent of React state)
  38. //
  39. // All commands — user input, task notifications, orphaned permissions — go
  40. // through this single queue. React components subscribe via
  41. // useSyncExternalStore (subscribeToCommandQueue / getCommandQueueSnapshot).
  42. // Non-React code (print.ts streaming loop) reads directly via
  43. // getCommandQueue() / getCommandQueueLength().
  44. //
  45. // Priority determines dequeue order: 'now' > 'next' > 'later'.
  46. // Within the same priority, commands are processed FIFO.
  47. // ============================================================================
  48. const commandQueue: QueuedCommand[] = []
  49. /** Frozen snapshot — recreated on every mutation for useSyncExternalStore. */
  50. let snapshot: readonly QueuedCommand[] = Object.freeze([])
  51. const queueChanged = createSignal()
  52. function notifySubscribers(): void {
  53. snapshot = Object.freeze([...commandQueue])
  54. queueChanged.emit()
  55. }
  56. // ============================================================================
  57. // useSyncExternalStore interface
  58. // ============================================================================
  59. /**
  60. * Subscribe to command queue changes.
  61. * Compatible with React's useSyncExternalStore.
  62. */
  63. export const subscribeToCommandQueue = queueChanged.subscribe
  64. /**
  65. * Get current snapshot of the command queue.
  66. * Compatible with React's useSyncExternalStore.
  67. * Returns a frozen array that only changes reference on mutation.
  68. */
  69. export function getCommandQueueSnapshot(): readonly QueuedCommand[] {
  70. return snapshot
  71. }
  72. // ============================================================================
  73. // Read operations (for non-React code)
  74. // ============================================================================
  75. /**
  76. * Get a mutable copy of the current queue.
  77. * Use for one-off reads where you need the actual commands.
  78. */
  79. export function getCommandQueue(): QueuedCommand[] {
  80. return [...commandQueue]
  81. }
  82. /**
  83. * Get the current queue length without copying.
  84. */
  85. export function getCommandQueueLength(): number {
  86. return commandQueue.length
  87. }
  88. /**
  89. * Check if there are commands in the queue.
  90. */
  91. export function hasCommandsInQueue(): boolean {
  92. return commandQueue.length > 0
  93. }
  94. /**
  95. * Trigger a re-check by notifying subscribers.
  96. * Use after async processing completes to ensure remaining commands
  97. * are picked up by useSyncExternalStore consumers.
  98. */
  99. export function recheckCommandQueue(): void {
  100. if (commandQueue.length > 0) {
  101. notifySubscribers()
  102. }
  103. }
  104. // ============================================================================
  105. // Write operations
  106. // ============================================================================
  107. /**
  108. * Add a command to the queue.
  109. * Used for user-initiated commands (prompt, bash, orphaned-permission).
  110. * Defaults priority to 'next' (processed before task notifications).
  111. */
  112. export function enqueue(command: QueuedCommand): void {
  113. commandQueue.push({ ...command, priority: command.priority ?? 'next' })
  114. notifySubscribers()
  115. logOperation(
  116. 'enqueue',
  117. typeof command.value === 'string' ? command.value : undefined,
  118. )
  119. }
  120. /**
  121. * Add a task notification to the queue.
  122. * Convenience wrapper that defaults priority to 'later' so user input
  123. * is never starved by system messages.
  124. */
  125. export function enqueuePendingNotification(command: QueuedCommand): void {
  126. commandQueue.push({ ...command, priority: command.priority ?? 'later' })
  127. notifySubscribers()
  128. logOperation(
  129. 'enqueue',
  130. typeof command.value === 'string' ? command.value : undefined,
  131. )
  132. }
  133. const PRIORITY_ORDER: Record<QueuePriority, number> = {
  134. now: 0,
  135. next: 1,
  136. later: 2,
  137. }
  138. /**
  139. * Remove and return the highest-priority command, or undefined if empty.
  140. * Within the same priority level, commands are dequeued FIFO.
  141. *
  142. * An optional `filter` narrows the candidates: only commands for which the
  143. * predicate returns `true` are considered. Non-matching commands stay in the
  144. * queue untouched. This lets between-turn drains (SDK, REPL) restrict to
  145. * main-thread commands (`cmd.agentId === undefined`) without restructuring
  146. * the existing while-loop patterns.
  147. */
  148. export function dequeue(
  149. filter?: (cmd: QueuedCommand) => boolean,
  150. ): QueuedCommand | undefined {
  151. if (commandQueue.length === 0) {
  152. return undefined
  153. }
  154. // Find the first command with the highest priority (respecting filter)
  155. let bestIdx = -1
  156. let bestPriority = Infinity
  157. for (let i = 0; i < commandQueue.length; i++) {
  158. const cmd = commandQueue[i]!
  159. if (filter && !filter(cmd)) continue
  160. const priority = PRIORITY_ORDER[cmd.priority ?? 'next']
  161. if (priority < bestPriority) {
  162. bestIdx = i
  163. bestPriority = priority
  164. }
  165. }
  166. if (bestIdx === -1) return undefined
  167. const [dequeued] = commandQueue.splice(bestIdx, 1)
  168. notifySubscribers()
  169. logOperation('dequeue')
  170. return dequeued
  171. }
  172. /**
  173. * Remove and return all commands from the queue.
  174. * Logs a dequeue operation for each command.
  175. */
  176. export function dequeueAll(): QueuedCommand[] {
  177. if (commandQueue.length === 0) {
  178. return []
  179. }
  180. const commands = [...commandQueue]
  181. commandQueue.length = 0
  182. notifySubscribers()
  183. for (const _cmd of commands) {
  184. logOperation('dequeue')
  185. }
  186. return commands
  187. }
  188. /**
  189. * Return the highest-priority command without removing it, or undefined if empty.
  190. * Accepts an optional `filter` — only commands passing the predicate are considered.
  191. */
  192. export function peek(
  193. filter?: (cmd: QueuedCommand) => boolean,
  194. ): QueuedCommand | undefined {
  195. if (commandQueue.length === 0) {
  196. return undefined
  197. }
  198. let bestIdx = -1
  199. let bestPriority = Infinity
  200. for (let i = 0; i < commandQueue.length; i++) {
  201. const cmd = commandQueue[i]!
  202. if (filter && !filter(cmd)) continue
  203. const priority = PRIORITY_ORDER[cmd.priority ?? 'next']
  204. if (priority < bestPriority) {
  205. bestIdx = i
  206. bestPriority = priority
  207. }
  208. }
  209. if (bestIdx === -1) return undefined
  210. return commandQueue[bestIdx]
  211. }
  212. /**
  213. * Remove and return all commands matching a predicate, preserving priority order.
  214. * Non-matching commands stay in the queue.
  215. */
  216. export function dequeueAllMatching(
  217. predicate: (cmd: QueuedCommand) => boolean,
  218. ): QueuedCommand[] {
  219. const matched: QueuedCommand[] = []
  220. const remaining: QueuedCommand[] = []
  221. for (const cmd of commandQueue) {
  222. if (predicate(cmd)) {
  223. matched.push(cmd)
  224. } else {
  225. remaining.push(cmd)
  226. }
  227. }
  228. if (matched.length === 0) {
  229. return []
  230. }
  231. commandQueue.length = 0
  232. commandQueue.push(...remaining)
  233. notifySubscribers()
  234. for (const _cmd of matched) {
  235. logOperation('dequeue')
  236. }
  237. return matched
  238. }
  239. /**
  240. * Remove specific commands from the queue by reference identity.
  241. * Callers must pass the same object references that are in the queue
  242. * (e.g. from getCommandsByMaxPriority). Logs a 'remove' operation for each.
  243. */
  244. export function remove(commandsToRemove: QueuedCommand[]): void {
  245. if (commandsToRemove.length === 0) {
  246. return
  247. }
  248. const before = commandQueue.length
  249. for (let i = commandQueue.length - 1; i >= 0; i--) {
  250. if (commandsToRemove.includes(commandQueue[i]!)) {
  251. commandQueue.splice(i, 1)
  252. }
  253. }
  254. if (commandQueue.length !== before) {
  255. notifySubscribers()
  256. }
  257. for (const _cmd of commandsToRemove) {
  258. logOperation('remove')
  259. }
  260. }
  261. /**
  262. * Remove commands matching a predicate.
  263. * Returns the removed commands.
  264. */
  265. export function removeByFilter(
  266. predicate: (cmd: QueuedCommand) => boolean,
  267. ): QueuedCommand[] {
  268. const removed: QueuedCommand[] = []
  269. for (let i = commandQueue.length - 1; i >= 0; i--) {
  270. if (predicate(commandQueue[i]!)) {
  271. removed.unshift(commandQueue.splice(i, 1)[0]!)
  272. }
  273. }
  274. if (removed.length > 0) {
  275. notifySubscribers()
  276. for (const _cmd of removed) {
  277. logOperation('remove')
  278. }
  279. }
  280. return removed
  281. }
  282. /**
  283. * Clear all commands from the queue.
  284. * Used by ESC cancellation to discard queued notifications.
  285. */
  286. export function clearCommandQueue(): void {
  287. if (commandQueue.length === 0) {
  288. return
  289. }
  290. commandQueue.length = 0
  291. notifySubscribers()
  292. }
  293. /**
  294. * Clear all commands and reset snapshot.
  295. * Used for test cleanup.
  296. */
  297. export function resetCommandQueue(): void {
  298. commandQueue.length = 0
  299. snapshot = Object.freeze([])
  300. }
  301. // ============================================================================
  302. // Editable mode helpers
  303. // ============================================================================
  304. const NON_EDITABLE_MODES = new Set<PromptInputMode>([
  305. 'task-notification',
  306. ] satisfies Permutations<Exclude<PromptInputMode, EditablePromptInputMode>>)
  307. export function isPromptInputModeEditable(
  308. mode: PromptInputMode,
  309. ): mode is EditablePromptInputMode {
  310. return !NON_EDITABLE_MODES.has(mode)
  311. }
  312. /**
  313. * Whether this queued command can be pulled into the input buffer via UP/ESC.
  314. * System-generated commands (proactive ticks, scheduled tasks, plan
  315. * verification, channel messages) contain raw XML and must not leak into
  316. * the user's input.
  317. */
  318. export function isQueuedCommandEditable(cmd: QueuedCommand): boolean {
  319. return isPromptInputModeEditable(cmd.mode) && !cmd.isMeta
  320. }
  321. /**
  322. * Whether this queued command should render in the queue preview under the
  323. * prompt. Superset of editable — channel messages show (so the keyboard user
  324. * sees what arrived) but stay non-editable (raw XML).
  325. */
  326. export function isQueuedCommandVisible(cmd: QueuedCommand): boolean {
  327. if (
  328. (feature('KAIROS') || feature('KAIROS_CHANNELS')) &&
  329. (cmd as any).origin?.kind === 'channel'
  330. )
  331. return true
  332. return isQueuedCommandEditable(cmd)
  333. }
  334. /**
  335. * Extract text from a queued command value.
  336. * For strings, returns the string.
  337. * For ContentBlockParam[], extracts text from text blocks.
  338. */
  339. function extractTextFromValue(value: string | ContentBlockParam[]): string {
  340. return typeof value === 'string' ? value : extractTextContent(value, '\n')
  341. }
  342. /**
  343. * Extract images from ContentBlockParam[] and convert to PastedContent format.
  344. * Returns empty array for string values or if no images found.
  345. */
  346. function extractImagesFromValue(
  347. value: string | ContentBlockParam[],
  348. startId: number,
  349. ): PastedContent[] {
  350. if (typeof value === 'string') {
  351. return []
  352. }
  353. const images: PastedContent[] = []
  354. let imageIndex = 0
  355. for (const block of value) {
  356. if (block.type === 'image' && block.source.type === 'base64') {
  357. images.push({
  358. id: startId + imageIndex,
  359. type: 'image',
  360. content: block.source.data,
  361. mediaType: block.source.media_type,
  362. filename: `image${imageIndex + 1}`,
  363. })
  364. imageIndex++
  365. }
  366. }
  367. return images
  368. }
  369. export type PopAllEditableResult = {
  370. text: string
  371. cursorOffset: number
  372. images: PastedContent[]
  373. }
  374. /**
  375. * Pop all editable commands and combine them with current input for editing.
  376. * Notification modes (task-notification) are left in the queue
  377. * to be auto-processed later.
  378. * Returns object with combined text, cursor offset, and images to restore.
  379. * Returns undefined if no editable commands in queue.
  380. */
  381. export function popAllEditable(
  382. currentInput: string,
  383. currentCursorOffset: number,
  384. ): PopAllEditableResult | undefined {
  385. if (commandQueue.length === 0) {
  386. return undefined
  387. }
  388. const { editable = [], nonEditable = [] } = objectGroupBy(
  389. [...commandQueue],
  390. cmd => (isQueuedCommandEditable(cmd) ? 'editable' : 'nonEditable'),
  391. )
  392. if (editable.length === 0) {
  393. return undefined
  394. }
  395. // Extract text from queued commands (handles both strings and ContentBlockParam[])
  396. const queuedTexts = editable.map(cmd => extractTextFromValue(cmd.value))
  397. const newInput = [...queuedTexts, currentInput].filter(Boolean).join('\n')
  398. // Calculate cursor offset: length of joined queued commands + 1 + current cursor offset
  399. const cursorOffset = queuedTexts.join('\n').length + 1 + currentCursorOffset
  400. // Extract images from queued commands
  401. const images: PastedContent[] = []
  402. let nextImageId = Date.now() // Use timestamp as base for unique IDs
  403. for (const cmd of editable) {
  404. // handlePromptSubmit queues images in pastedContents (value is a string).
  405. // Preserve the original PastedContent id so imageStore lookups still work.
  406. if (cmd.pastedContents) {
  407. for (const content of Object.values(cmd.pastedContents)) {
  408. if (content.type === 'image') {
  409. images.push(content)
  410. }
  411. }
  412. }
  413. // Bridge/remote commands may embed images directly in ContentBlockParam[].
  414. const cmdImages = extractImagesFromValue(cmd.value, nextImageId)
  415. images.push(...cmdImages)
  416. nextImageId += cmdImages.length
  417. }
  418. for (const command of editable) {
  419. logOperation(
  420. 'popAll',
  421. typeof command.value === 'string' ? command.value : undefined,
  422. )
  423. }
  424. // Replace queue contents with only the non-editable commands
  425. commandQueue.length = 0
  426. commandQueue.push(...nonEditable)
  427. notifySubscribers()
  428. return { text: newInput, cursorOffset, images }
  429. }
  430. // ============================================================================
  431. // Backward-compatible aliases (deprecated — prefer new names)
  432. // ============================================================================
  433. /** @deprecated Use subscribeToCommandQueue */
  434. export const subscribeToPendingNotifications = subscribeToCommandQueue
  435. /** @deprecated Use getCommandQueueSnapshot */
  436. export function getPendingNotificationsSnapshot(): readonly QueuedCommand[] {
  437. return snapshot
  438. }
  439. /** @deprecated Use hasCommandsInQueue */
  440. export const hasPendingNotifications = hasCommandsInQueue
  441. /** @deprecated Use getCommandQueueLength */
  442. export const getPendingNotificationsCount = getCommandQueueLength
  443. /** @deprecated Use recheckCommandQueue */
  444. export const recheckPendingNotifications = recheckCommandQueue
  445. /** @deprecated Use dequeue */
  446. export function dequeuePendingNotification(): QueuedCommand | undefined {
  447. return dequeue()
  448. }
  449. /** @deprecated Use resetCommandQueue */
  450. export const resetPendingNotifications = resetCommandQueue
  451. /** @deprecated Use clearCommandQueue */
  452. export const clearPendingNotifications = clearCommandQueue
  453. /**
  454. * Get commands at or above a given priority level without removing them.
  455. * Useful for mid-chain draining where only urgent items should be processed.
  456. *
  457. * Priority order: 'now' (0) > 'next' (1) > 'later' (2).
  458. * Passing 'now' returns only now-priority commands; 'later' returns everything.
  459. */
  460. export function getCommandsByMaxPriority(
  461. maxPriority: QueuePriority,
  462. ): QueuedCommand[] {
  463. const threshold = PRIORITY_ORDER[maxPriority]
  464. return commandQueue.filter(
  465. cmd => PRIORITY_ORDER[cmd.priority ?? 'next'] <= threshold,
  466. )
  467. }
  468. /**
  469. * Returns true if the command is a slash command that should be routed through
  470. * processSlashCommand rather than sent to the model as text.
  471. *
  472. * Commands with `skipSlashCommands` (e.g. bridge/CCR messages) are NOT treated
  473. * as slash commands — their text is meant for the model.
  474. */
  475. export function isSlashCommand(cmd: QueuedCommand): boolean {
  476. return (
  477. typeof cmd.value === 'string' &&
  478. cmd.value.trim().startsWith('/') &&
  479. !cmd.skipSlashCommands
  480. )
  481. }