| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547 |
- import { feature } from 'bun:bundle'
- import type { ContentBlockParam } from '@anthropic-ai/sdk/resources/messages.mjs'
- import type { Permutations } from 'src/types/utils.js'
- import { getSessionId } from '../bootstrap/state.js'
- import type { AppState } from '../state/AppState.js'
- import type {
- QueueOperation,
- QueueOperationMessage,
- } from '../types/messageQueueTypes.js'
- import type {
- EditablePromptInputMode,
- PromptInputMode,
- QueuedCommand,
- QueuePriority,
- } from '../types/textInputTypes.js'
- import type { PastedContent } from './config.js'
- import { extractTextContent } from './messages.js'
- import { objectGroupBy } from './objectGroupBy.js'
- import { recordQueueOperation } from './sessionStorage.js'
- import { createSignal } from './signal.js'
- export type SetAppState = (f: (prev: AppState) => AppState) => void
- // ============================================================================
- // Logging helper
- // ============================================================================
- function logOperation(operation: QueueOperation, content?: string): void {
- const sessionId = getSessionId()
- const queueOp: QueueOperationMessage = {
- type: 'queue-operation',
- operation,
- timestamp: new Date().toISOString(),
- sessionId,
- ...(content !== undefined && { content }),
- }
- void recordQueueOperation(queueOp)
- }
- // ============================================================================
- // Unified command queue (module-level, independent of React state)
- //
- // All commands — user input, task notifications, orphaned permissions — go
- // through this single queue. React components subscribe via
- // useSyncExternalStore (subscribeToCommandQueue / getCommandQueueSnapshot).
- // Non-React code (print.ts streaming loop) reads directly via
- // getCommandQueue() / getCommandQueueLength().
- //
- // Priority determines dequeue order: 'now' > 'next' > 'later'.
- // Within the same priority, commands are processed FIFO.
- // ============================================================================
- const commandQueue: QueuedCommand[] = []
- /** Frozen snapshot — recreated on every mutation for useSyncExternalStore. */
- let snapshot: readonly QueuedCommand[] = Object.freeze([])
- const queueChanged = createSignal()
- function notifySubscribers(): void {
- snapshot = Object.freeze([...commandQueue])
- queueChanged.emit()
- }
- // ============================================================================
- // useSyncExternalStore interface
- // ============================================================================
- /**
- * Subscribe to command queue changes.
- * Compatible with React's useSyncExternalStore.
- */
- export const subscribeToCommandQueue = queueChanged.subscribe
- /**
- * Get current snapshot of the command queue.
- * Compatible with React's useSyncExternalStore.
- * Returns a frozen array that only changes reference on mutation.
- */
- export function getCommandQueueSnapshot(): readonly QueuedCommand[] {
- return snapshot
- }
- // ============================================================================
- // Read operations (for non-React code)
- // ============================================================================
- /**
- * Get a mutable copy of the current queue.
- * Use for one-off reads where you need the actual commands.
- */
- export function getCommandQueue(): QueuedCommand[] {
- return [...commandQueue]
- }
- /**
- * Get the current queue length without copying.
- */
- export function getCommandQueueLength(): number {
- return commandQueue.length
- }
- /**
- * Check if there are commands in the queue.
- */
- export function hasCommandsInQueue(): boolean {
- return commandQueue.length > 0
- }
- /**
- * Trigger a re-check by notifying subscribers.
- * Use after async processing completes to ensure remaining commands
- * are picked up by useSyncExternalStore consumers.
- */
- export function recheckCommandQueue(): void {
- if (commandQueue.length > 0) {
- notifySubscribers()
- }
- }
- // ============================================================================
- // Write operations
- // ============================================================================
- /**
- * Add a command to the queue.
- * Used for user-initiated commands (prompt, bash, orphaned-permission).
- * Defaults priority to 'next' (processed before task notifications).
- */
- export function enqueue(command: QueuedCommand): void {
- commandQueue.push({ ...command, priority: command.priority ?? 'next' })
- notifySubscribers()
- logOperation(
- 'enqueue',
- typeof command.value === 'string' ? command.value : undefined,
- )
- }
- /**
- * Add a task notification to the queue.
- * Convenience wrapper that defaults priority to 'later' so user input
- * is never starved by system messages.
- */
- export function enqueuePendingNotification(command: QueuedCommand): void {
- commandQueue.push({ ...command, priority: command.priority ?? 'later' })
- notifySubscribers()
- logOperation(
- 'enqueue',
- typeof command.value === 'string' ? command.value : undefined,
- )
- }
- const PRIORITY_ORDER: Record<QueuePriority, number> = {
- now: 0,
- next: 1,
- later: 2,
- }
- /**
- * Remove and return the highest-priority command, or undefined if empty.
- * Within the same priority level, commands are dequeued FIFO.
- *
- * An optional `filter` narrows the candidates: only commands for which the
- * predicate returns `true` are considered. Non-matching commands stay in the
- * queue untouched. This lets between-turn drains (SDK, REPL) restrict to
- * main-thread commands (`cmd.agentId === undefined`) without restructuring
- * the existing while-loop patterns.
- */
- export function dequeue(
- filter?: (cmd: QueuedCommand) => boolean,
- ): QueuedCommand | undefined {
- if (commandQueue.length === 0) {
- return undefined
- }
- // Find the first command with the highest priority (respecting filter)
- let bestIdx = -1
- let bestPriority = Infinity
- for (let i = 0; i < commandQueue.length; i++) {
- const cmd = commandQueue[i]!
- if (filter && !filter(cmd)) continue
- const priority = PRIORITY_ORDER[cmd.priority ?? 'next']
- if (priority < bestPriority) {
- bestIdx = i
- bestPriority = priority
- }
- }
- if (bestIdx === -1) return undefined
- const [dequeued] = commandQueue.splice(bestIdx, 1)
- notifySubscribers()
- logOperation('dequeue')
- return dequeued
- }
- /**
- * Remove and return all commands from the queue.
- * Logs a dequeue operation for each command.
- */
- export function dequeueAll(): QueuedCommand[] {
- if (commandQueue.length === 0) {
- return []
- }
- const commands = [...commandQueue]
- commandQueue.length = 0
- notifySubscribers()
- for (const _cmd of commands) {
- logOperation('dequeue')
- }
- return commands
- }
- /**
- * Return the highest-priority command without removing it, or undefined if empty.
- * Accepts an optional `filter` — only commands passing the predicate are considered.
- */
- export function peek(
- filter?: (cmd: QueuedCommand) => boolean,
- ): QueuedCommand | undefined {
- if (commandQueue.length === 0) {
- return undefined
- }
- let bestIdx = -1
- let bestPriority = Infinity
- for (let i = 0; i < commandQueue.length; i++) {
- const cmd = commandQueue[i]!
- if (filter && !filter(cmd)) continue
- const priority = PRIORITY_ORDER[cmd.priority ?? 'next']
- if (priority < bestPriority) {
- bestIdx = i
- bestPriority = priority
- }
- }
- if (bestIdx === -1) return undefined
- return commandQueue[bestIdx]
- }
- /**
- * Remove and return all commands matching a predicate, preserving priority order.
- * Non-matching commands stay in the queue.
- */
- export function dequeueAllMatching(
- predicate: (cmd: QueuedCommand) => boolean,
- ): QueuedCommand[] {
- const matched: QueuedCommand[] = []
- const remaining: QueuedCommand[] = []
- for (const cmd of commandQueue) {
- if (predicate(cmd)) {
- matched.push(cmd)
- } else {
- remaining.push(cmd)
- }
- }
- if (matched.length === 0) {
- return []
- }
- commandQueue.length = 0
- commandQueue.push(...remaining)
- notifySubscribers()
- for (const _cmd of matched) {
- logOperation('dequeue')
- }
- return matched
- }
- /**
- * Remove specific commands from the queue by reference identity.
- * Callers must pass the same object references that are in the queue
- * (e.g. from getCommandsByMaxPriority). Logs a 'remove' operation for each.
- */
- export function remove(commandsToRemove: QueuedCommand[]): void {
- if (commandsToRemove.length === 0) {
- return
- }
- const before = commandQueue.length
- for (let i = commandQueue.length - 1; i >= 0; i--) {
- if (commandsToRemove.includes(commandQueue[i]!)) {
- commandQueue.splice(i, 1)
- }
- }
- if (commandQueue.length !== before) {
- notifySubscribers()
- }
- for (const _cmd of commandsToRemove) {
- logOperation('remove')
- }
- }
- /**
- * Remove commands matching a predicate.
- * Returns the removed commands.
- */
- export function removeByFilter(
- predicate: (cmd: QueuedCommand) => boolean,
- ): QueuedCommand[] {
- const removed: QueuedCommand[] = []
- for (let i = commandQueue.length - 1; i >= 0; i--) {
- if (predicate(commandQueue[i]!)) {
- removed.unshift(commandQueue.splice(i, 1)[0]!)
- }
- }
- if (removed.length > 0) {
- notifySubscribers()
- for (const _cmd of removed) {
- logOperation('remove')
- }
- }
- return removed
- }
- /**
- * Clear all commands from the queue.
- * Used by ESC cancellation to discard queued notifications.
- */
- export function clearCommandQueue(): void {
- if (commandQueue.length === 0) {
- return
- }
- commandQueue.length = 0
- notifySubscribers()
- }
- /**
- * Clear all commands and reset snapshot.
- * Used for test cleanup.
- */
- export function resetCommandQueue(): void {
- commandQueue.length = 0
- snapshot = Object.freeze([])
- }
- // ============================================================================
- // Editable mode helpers
- // ============================================================================
- const NON_EDITABLE_MODES = new Set<PromptInputMode>([
- 'task-notification',
- ] satisfies Permutations<Exclude<PromptInputMode, EditablePromptInputMode>>)
- export function isPromptInputModeEditable(
- mode: PromptInputMode,
- ): mode is EditablePromptInputMode {
- return !NON_EDITABLE_MODES.has(mode)
- }
- /**
- * Whether this queued command can be pulled into the input buffer via UP/ESC.
- * System-generated commands (proactive ticks, scheduled tasks, plan
- * verification, channel messages) contain raw XML and must not leak into
- * the user's input.
- */
- export function isQueuedCommandEditable(cmd: QueuedCommand): boolean {
- return isPromptInputModeEditable(cmd.mode) && !cmd.isMeta
- }
- /**
- * Whether this queued command should render in the queue preview under the
- * prompt. Superset of editable — channel messages show (so the keyboard user
- * sees what arrived) but stay non-editable (raw XML).
- */
- export function isQueuedCommandVisible(cmd: QueuedCommand): boolean {
- if (
- (feature('KAIROS') || feature('KAIROS_CHANNELS')) &&
- (cmd as any).origin?.kind === 'channel'
- )
- return true
- return isQueuedCommandEditable(cmd)
- }
- /**
- * Extract text from a queued command value.
- * For strings, returns the string.
- * For ContentBlockParam[], extracts text from text blocks.
- */
- function extractTextFromValue(value: string | ContentBlockParam[]): string {
- return typeof value === 'string' ? value : extractTextContent(value, '\n')
- }
- /**
- * Extract images from ContentBlockParam[] and convert to PastedContent format.
- * Returns empty array for string values or if no images found.
- */
- function extractImagesFromValue(
- value: string | ContentBlockParam[],
- startId: number,
- ): PastedContent[] {
- if (typeof value === 'string') {
- return []
- }
- const images: PastedContent[] = []
- let imageIndex = 0
- for (const block of value) {
- if (block.type === 'image' && block.source.type === 'base64') {
- images.push({
- id: startId + imageIndex,
- type: 'image',
- content: block.source.data,
- mediaType: block.source.media_type,
- filename: `image${imageIndex + 1}`,
- })
- imageIndex++
- }
- }
- return images
- }
- export type PopAllEditableResult = {
- text: string
- cursorOffset: number
- images: PastedContent[]
- }
- /**
- * Pop all editable commands and combine them with current input for editing.
- * Notification modes (task-notification) are left in the queue
- * to be auto-processed later.
- * Returns object with combined text, cursor offset, and images to restore.
- * Returns undefined if no editable commands in queue.
- */
- export function popAllEditable(
- currentInput: string,
- currentCursorOffset: number,
- ): PopAllEditableResult | undefined {
- if (commandQueue.length === 0) {
- return undefined
- }
- const { editable = [], nonEditable = [] } = objectGroupBy(
- [...commandQueue],
- cmd => (isQueuedCommandEditable(cmd) ? 'editable' : 'nonEditable'),
- )
- if (editable.length === 0) {
- return undefined
- }
- // Extract text from queued commands (handles both strings and ContentBlockParam[])
- const queuedTexts = editable.map(cmd => extractTextFromValue(cmd.value))
- const newInput = [...queuedTexts, currentInput].filter(Boolean).join('\n')
- // Calculate cursor offset: length of joined queued commands + 1 + current cursor offset
- const cursorOffset = queuedTexts.join('\n').length + 1 + currentCursorOffset
- // Extract images from queued commands
- const images: PastedContent[] = []
- let nextImageId = Date.now() // Use timestamp as base for unique IDs
- for (const cmd of editable) {
- // handlePromptSubmit queues images in pastedContents (value is a string).
- // Preserve the original PastedContent id so imageStore lookups still work.
- if (cmd.pastedContents) {
- for (const content of Object.values(cmd.pastedContents)) {
- if (content.type === 'image') {
- images.push(content)
- }
- }
- }
- // Bridge/remote commands may embed images directly in ContentBlockParam[].
- const cmdImages = extractImagesFromValue(cmd.value, nextImageId)
- images.push(...cmdImages)
- nextImageId += cmdImages.length
- }
- for (const command of editable) {
- logOperation(
- 'popAll',
- typeof command.value === 'string' ? command.value : undefined,
- )
- }
- // Replace queue contents with only the non-editable commands
- commandQueue.length = 0
- commandQueue.push(...nonEditable)
- notifySubscribers()
- return { text: newInput, cursorOffset, images }
- }
- // ============================================================================
- // Backward-compatible aliases (deprecated — prefer new names)
- // ============================================================================
- /** @deprecated Use subscribeToCommandQueue */
- export const subscribeToPendingNotifications = subscribeToCommandQueue
- /** @deprecated Use getCommandQueueSnapshot */
- export function getPendingNotificationsSnapshot(): readonly QueuedCommand[] {
- return snapshot
- }
- /** @deprecated Use hasCommandsInQueue */
- export const hasPendingNotifications = hasCommandsInQueue
- /** @deprecated Use getCommandQueueLength */
- export const getPendingNotificationsCount = getCommandQueueLength
- /** @deprecated Use recheckCommandQueue */
- export const recheckPendingNotifications = recheckCommandQueue
- /** @deprecated Use dequeue */
- export function dequeuePendingNotification(): QueuedCommand | undefined {
- return dequeue()
- }
- /** @deprecated Use resetCommandQueue */
- export const resetPendingNotifications = resetCommandQueue
- /** @deprecated Use clearCommandQueue */
- export const clearPendingNotifications = clearCommandQueue
- /**
- * Get commands at or above a given priority level without removing them.
- * Useful for mid-chain draining where only urgent items should be processed.
- *
- * Priority order: 'now' (0) > 'next' (1) > 'later' (2).
- * Passing 'now' returns only now-priority commands; 'later' returns everything.
- */
- export function getCommandsByMaxPriority(
- maxPriority: QueuePriority,
- ): QueuedCommand[] {
- const threshold = PRIORITY_ORDER[maxPriority]
- return commandQueue.filter(
- cmd => PRIORITY_ORDER[cmd.priority ?? 'next'] <= threshold,
- )
- }
- /**
- * Returns true if the command is a slash command that should be routed through
- * processSlashCommand rather than sent to the model as text.
- *
- * Commands with `skipSlashCommands` (e.g. bridge/CCR messages) are NOT treated
- * as slash commands — their text is meant for the model.
- */
- export function isSlashCommand(cmd: QueuedCommand): boolean {
- return (
- typeof cmd.value === 'string' &&
- cmd.value.trim().startsWith('/') &&
- !cmd.skipSlashCommands
- )
- }
|