| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747174817491750175117521753175417551756175717581759176017611762176317641765176617671768176917701771177217731774177517761777177817791780178117821783178417851786178717881789179017911792179317941795179617971798179918001801180218031804180518061807180818091810181118121813181418151816181718181819182018211822182318241825182618271828182918301831183218331834183518361837183818391840184118421843184418451846184718481849185018511852185318541855185618571858185918601861186218631864186518661867186818691870187118721873187418751876187718781879188018811882188318841885188618871888188918901891189218931894189518961897189818991900190119021903190419051906190719081909191019111912191319141915191619171918191919201921192219231924192519261927192819291930193119321933193419351936193719381939194019411942194319441945194619471948194919501951195219531954195519561957195819591960196119621963196419651966196719681969197019711972197319741975197619771978197919801981198219831984198519861987198819891990199119921993199419951996199719981999200020012002200320042005200620072008200920102011201220132014201520162017201820192020202120222023202420252026202720282029203020312032203320342035203620372038203920402041204220432044204520462047204820492050205120522053205420552056205720582059206020612062206320642065206620672068206920702071207220732074207520762077207820792080208120822083208420852086208720882089209020912092209320942095209620972098209921002101210221032104210521062107210821092110211121122113211421152116211721182119212021212122212321242125212621272128212921302131213221332134213521362137213821392140214121422143214421452146214721482149215021512152215321542155215621572158215921602161216221632164216521662167216821692170217121722173217421752176217721782179218021812182218321842185218621872188218921902191219221932194219521962197219821992200220122022203220422052206220722082209221022112212221322142215221622172218221922202221222222232224222522262227222822292230223122322233223422352236223722382239224022412242224322442245224622472248224922502251225222532254225522562257225822592260226122622263226422652266226722682269227022712272227322742275227622772278227922802281228222832284228522862287228822892290229122922293229422952296229722982299230023012302230323042305230623072308230923102311231223132314231523162317231823192320232123222323232423252326232723282329233023312332233323342335233623372338233923402341234223432344234523462347234823492350235123522353235423552356235723582359236023612362236323642365236623672368236923702371237223732374237523762377237823792380238123822383238423852386238723882389239023912392239323942395239623972398239924002401240224032404240524062407240824092410241124122413241424152416241724182419242024212422242324242425242624272428242924302431243224332434243524362437243824392440244124422443244424452446244724482449245024512452245324542455245624572458245924602461246224632464246524662467246824692470247124722473247424752476247724782479248024812482248324842485248624872488248924902491249224932494249524962497249824992500250125022503250425052506250725082509251025112512251325142515251625172518251925202521252225232524252525262527252825292530253125322533253425352536253725382539254025412542254325442545254625472548254925502551255225532554255525562557255825592560256125622563256425652566256725682569257025712572257325742575257625772578257925802581258225832584258525862587258825892590259125922593259425952596259725982599260026012602260326042605260626072608260926102611261226132614261526162617261826192620262126222623262426252626262726282629263026312632263326342635263626372638263926402641264226432644264526462647264826492650265126522653265426552656265726582659266026612662266326642665266626672668266926702671267226732674267526762677267826792680268126822683268426852686268726882689269026912692269326942695269626972698269927002701270227032704270527062707270827092710271127122713271427152716271727182719272027212722272327242725272627272728272927302731273227332734273527362737273827392740274127422743274427452746274727482749275027512752275327542755275627572758275927602761276227632764276527662767276827692770277127722773277427752776277727782779278027812782278327842785278627872788278927902791279227932794279527962797279827992800280128022803280428052806280728082809281028112812281328142815281628172818281928202821282228232824282528262827282828292830283128322833283428352836283728382839284028412842284328442845284628472848284928502851285228532854285528562857285828592860286128622863286428652866286728682869287028712872287328742875287628772878287928802881288228832884288528862887288828892890289128922893289428952896289728982899290029012902290329042905290629072908290929102911291229132914291529162917291829192920292129222923292429252926292729282929293029312932293329342935293629372938293929402941294229432944294529462947294829492950295129522953295429552956295729582959296029612962296329642965296629672968296929702971297229732974297529762977297829792980298129822983298429852986298729882989299029912992299329942995299629972998299930003001300230033004300530063007300830093010301130123013301430153016301730183019302030213022302330243025302630273028302930303031303230333034303530363037303830393040304130423043304430453046304730483049305030513052305330543055305630573058305930603061306230633064306530663067306830693070307130723073307430753076307730783079308030813082308330843085308630873088308930903091309230933094309530963097309830993100310131023103310431053106310731083109311031113112311331143115311631173118311931203121312231233124312531263127312831293130313131323133313431353136313731383139314031413142314331443145314631473148314931503151315231533154315531563157315831593160316131623163316431653166316731683169317031713172317331743175317631773178317931803181318231833184318531863187318831893190319131923193319431953196319731983199320032013202320332043205320632073208320932103211321232133214321532163217321832193220322132223223322432253226322732283229323032313232323332343235323632373238323932403241324232433244324532463247324832493250325132523253325432553256325732583259326032613262326332643265326632673268326932703271327232733274327532763277327832793280328132823283328432853286328732883289329032913292329332943295329632973298329933003301330233033304330533063307330833093310331133123313331433153316331733183319332033213322332333243325332633273328332933303331333233333334333533363337333833393340334133423343334433453346334733483349335033513352335333543355335633573358335933603361336233633364336533663367336833693370337133723373337433753376337733783379338033813382338333843385338633873388338933903391339233933394339533963397339833993400340134023403340434053406340734083409341034113412341334143415341634173418341934203421342234233424342534263427342834293430343134323433343434353436343734383439344034413442344334443445344634473448344934503451345234533454345534563457345834593460346134623463346434653466346734683469347034713472347334743475347634773478347934803481348234833484348534863487348834893490349134923493349434953496349734983499350035013502350335043505350635073508350935103511351235133514351535163517351835193520352135223523352435253526352735283529353035313532353335343535353635373538353935403541354235433544354535463547354835493550355135523553355435553556355735583559356035613562356335643565356635673568356935703571357235733574357535763577357835793580358135823583358435853586358735883589359035913592359335943595359635973598359936003601360236033604360536063607360836093610361136123613361436153616361736183619362036213622362336243625362636273628362936303631363236333634363536363637363836393640364136423643364436453646364736483649365036513652365336543655365636573658365936603661366236633664366536663667366836693670367136723673367436753676367736783679368036813682368336843685368636873688368936903691369236933694369536963697369836993700370137023703370437053706370737083709371037113712371337143715371637173718371937203721372237233724372537263727372837293730373137323733373437353736373737383739374037413742374337443745374637473748374937503751375237533754375537563757375837593760376137623763376437653766376737683769377037713772377337743775377637773778377937803781378237833784378537863787378837893790379137923793379437953796379737983799380038013802380338043805380638073808380938103811381238133814381538163817381838193820382138223823382438253826382738283829383038313832383338343835383638373838383938403841384238433844384538463847384838493850385138523853385438553856385738583859386038613862386338643865386638673868386938703871387238733874387538763877387838793880388138823883388438853886388738883889389038913892389338943895389638973898389939003901390239033904390539063907390839093910391139123913391439153916391739183919392039213922392339243925392639273928392939303931393239333934393539363937393839393940394139423943394439453946394739483949395039513952395339543955395639573958395939603961396239633964396539663967396839693970397139723973397439753976397739783979398039813982398339843985398639873988398939903991399239933994399539963997399839994000400140024003400440054006400740084009401040114012401340144015401640174018401940204021402240234024402540264027402840294030403140324033403440354036403740384039404040414042404340444045404640474048404940504051405240534054405540564057405840594060406140624063406440654066406740684069407040714072407340744075407640774078407940804081408240834084408540864087408840894090409140924093409440954096409740984099410041014102410341044105410641074108410941104111411241134114411541164117411841194120412141224123412441254126412741284129413041314132413341344135413641374138413941404141414241434144414541464147414841494150415141524153415441554156415741584159416041614162416341644165416641674168416941704171417241734174417541764177417841794180418141824183418441854186418741884189419041914192419341944195419641974198419942004201420242034204420542064207420842094210421142124213421442154216421742184219422042214222422342244225422642274228422942304231423242334234423542364237423842394240424142424243424442454246424742484249425042514252425342544255425642574258425942604261426242634264426542664267426842694270427142724273427442754276427742784279428042814282428342844285428642874288428942904291429242934294429542964297429842994300430143024303430443054306430743084309431043114312431343144315431643174318431943204321432243234324432543264327432843294330433143324333433443354336433743384339434043414342434343444345434643474348434943504351435243534354435543564357435843594360436143624363436443654366436743684369437043714372437343744375437643774378437943804381438243834384438543864387438843894390439143924393439443954396439743984399440044014402440344044405440644074408440944104411441244134414441544164417441844194420442144224423442444254426442744284429443044314432443344344435443644374438443944404441444244434444444544464447444844494450445144524453445444554456445744584459446044614462446344644465446644674468446944704471447244734474447544764477447844794480448144824483448444854486448744884489449044914492449344944495449644974498449945004501450245034504450545064507450845094510451145124513451445154516451745184519452045214522452345244525452645274528452945304531453245334534453545364537453845394540454145424543454445454546454745484549455045514552455345544555455645574558455945604561456245634564456545664567456845694570457145724573457445754576457745784579458045814582458345844585458645874588458945904591459245934594459545964597459845994600460146024603460446054606460746084609461046114612461346144615461646174618461946204621462246234624462546264627462846294630463146324633463446354636463746384639464046414642464346444645464646474648464946504651465246534654465546564657465846594660466146624663466446654666466746684669467046714672467346744675467646774678467946804681468246834684468546864687468846894690469146924693469446954696469746984699470047014702470347044705470647074708470947104711471247134714471547164717471847194720472147224723472447254726472747284729473047314732473347344735473647374738473947404741474247434744474547464747474847494750475147524753475447554756475747584759476047614762476347644765476647674768476947704771477247734774477547764777477847794780478147824783478447854786478747884789479047914792479347944795479647974798479948004801480248034804480548064807480848094810481148124813481448154816481748184819482048214822482348244825482648274828482948304831483248334834483548364837483848394840484148424843484448454846484748484849485048514852485348544855485648574858485948604861486248634864486548664867486848694870487148724873487448754876487748784879488048814882488348844885488648874888488948904891489248934894489548964897489848994900490149024903490449054906490749084909491049114912491349144915491649174918491949204921492249234924492549264927492849294930493149324933493449354936493749384939494049414942494349444945494649474948494949504951495249534954495549564957495849594960496149624963496449654966496749684969497049714972497349744975497649774978497949804981498249834984498549864987498849894990499149924993499449954996499749984999500050015002500350045005500650075008500950105011501250135014501550165017501850195020502150225023502450255026502750285029503050315032503350345035503650375038503950405041504250435044504550465047504850495050505150525053505450555056505750585059506050615062506350645065506650675068506950705071507250735074507550765077507850795080508150825083508450855086508750885089509050915092509350945095509650975098509951005101510251035104510551065107510851095110511151125113511451155116511751185119512051215122512351245125512651275128512951305131513251335134513551365137513851395140514151425143514451455146514751485149515051515152515351545155515651575158515951605161516251635164516551665167516851695170517151725173517451755176517751785179518051815182518351845185518651875188518951905191519251935194519551965197519851995200520152025203520452055206520752085209521052115212521352145215521652175218521952205221522252235224522552265227522852295230523152325233523452355236523752385239524052415242524352445245524652475248524952505251525252535254525552565257525852595260526152625263526452655266526752685269527052715272527352745275527652775278527952805281528252835284528552865287528852895290529152925293529452955296529752985299530053015302530353045305530653075308530953105311531253135314531553165317531853195320532153225323532453255326532753285329533053315332533353345335533653375338533953405341534253435344534553465347534853495350535153525353535453555356535753585359536053615362536353645365536653675368536953705371537253735374537553765377537853795380538153825383538453855386538753885389539053915392539353945395539653975398539954005401540254035404540554065407540854095410541154125413541454155416541754185419542054215422542354245425542654275428542954305431543254335434543554365437543854395440544154425443544454455446544754485449545054515452545354545455545654575458545954605461546254635464546554665467546854695470547154725473547454755476547754785479548054815482548354845485548654875488548954905491549254935494549554965497549854995500550155025503550455055506550755085509551055115512551355145515551655175518551955205521552255235524552555265527552855295530553155325533553455355536553755385539554055415542554355445545554655475548554955505551555255535554555555565557555855595560556155625563556455655566556755685569557055715572557355745575557655775578557955805581558255835584558555865587558855895590559155925593559455955596 |
- // biome-ignore-all assist/source/organizeImports: ANT-ONLY import markers must not be reordered
- import { feature } from 'bun:bundle'
- import { readFile, stat } from 'fs/promises'
- import { dirname } from 'path'
- import {
- downloadUserSettings,
- redownloadUserSettings,
- } from 'src/services/settingsSync/index.js'
- import { waitForRemoteManagedSettingsToLoad } from 'src/services/remoteManagedSettings/index.js'
- import { StructuredIO } from 'src/cli/structuredIO.js'
- import { RemoteIO } from 'src/cli/remoteIO.js'
- import {
- type Command,
- formatDescriptionWithSource,
- getCommandName,
- } from 'src/commands.js'
- import { createStreamlinedTransformer } from 'src/utils/streamlinedTransform.js'
- import { installStreamJsonStdoutGuard } from 'src/utils/streamJsonStdoutGuard.js'
- import type { ToolPermissionContext } from 'src/Tool.js'
- import type { ThinkingConfig } from 'src/utils/thinking.js'
- import { assembleToolPool, filterToolsByDenyRules } from 'src/tools.js'
- import uniqBy from 'lodash-es/uniqBy.js'
- import { uniq } from 'src/utils/array.js'
- import { mergeAndFilterTools } from 'src/utils/toolPool.js'
- import {
- logEvent,
- type AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS,
- } from 'src/services/analytics/index.js'
- import { getFeatureValue_CACHED_MAY_BE_STALE } from 'src/services/analytics/growthbook.js'
- import { logForDebugging } from 'src/utils/debug.js'
- import {
- logForDiagnosticsNoPII,
- withDiagnosticsTiming,
- } from 'src/utils/diagLogs.js'
- import { toolMatchesName, type Tool, type Tools } from 'src/Tool.js'
- import {
- type AgentDefinition,
- isBuiltInAgent,
- parseAgentsFromJson,
- } from 'src/tools/AgentTool/loadAgentsDir.js'
- import type { Message, NormalizedUserMessage } from 'src/types/message.js'
- import type { QueuedCommand } from 'src/types/textInputTypes.js'
- import {
- dequeue,
- dequeueAllMatching,
- enqueue,
- hasCommandsInQueue,
- peek,
- subscribeToCommandQueue,
- getCommandsByMaxPriority,
- } from 'src/utils/messageQueueManager.js'
- import { notifyCommandLifecycle } from 'src/utils/commandLifecycle.js'
- import {
- getSessionState,
- notifySessionStateChanged,
- notifySessionMetadataChanged,
- setPermissionModeChangedListener,
- type RequiresActionDetails,
- type SessionExternalMetadata,
- } from 'src/utils/sessionState.js'
- import { externalMetadataToAppState } from 'src/state/onChangeAppState.js'
- import { getInMemoryErrors, logError, logMCPDebug } from 'src/utils/log.js'
- import {
- writeToStdout,
- registerProcessOutputErrorHandlers,
- } from 'src/utils/process.js'
- import type { Stream } from 'src/utils/stream.js'
- import { EMPTY_USAGE } from 'src/services/api/logging.js'
- import {
- loadConversationForResume,
- type TurnInterruptionState,
- } from 'src/utils/conversationRecovery.js'
- import type {
- MCPServerConnection,
- McpSdkServerConfig,
- ScopedMcpServerConfig,
- } from 'src/services/mcp/types.js'
- import {
- ChannelMessageNotificationSchema,
- gateChannelServer,
- wrapChannelMessage,
- findChannelEntry,
- } from 'src/services/mcp/channelNotification.js'
- import {
- isChannelAllowlisted,
- isChannelsEnabled,
- } from 'src/services/mcp/channelAllowlist.js'
- import { parsePluginIdentifier } from 'src/utils/plugins/pluginIdentifier.js'
- import { validateUuid } from 'src/utils/uuid.js'
- import { fromArray } from 'src/utils/generators.js'
- import { ask } from 'src/QueryEngine.js'
- import type { PermissionPromptTool } from 'src/utils/queryHelpers.js'
- import {
- createFileStateCacheWithSizeLimit,
- mergeFileStateCaches,
- READ_FILE_STATE_CACHE_SIZE,
- } from 'src/utils/fileStateCache.js'
- import { expandPath } from 'src/utils/path.js'
- import { extractReadFilesFromMessages } from 'src/utils/queryHelpers.js'
- import { registerHookEventHandler } from 'src/utils/hooks/hookEvents.js'
- import { executeFilePersistence } from 'src/utils/filePersistence/filePersistence.js'
- import { finalizePendingAsyncHooks } from 'src/utils/hooks/AsyncHookRegistry.js'
- import {
- gracefulShutdown,
- gracefulShutdownSync,
- isShuttingDown,
- } from 'src/utils/gracefulShutdown.js'
- import { registerCleanup } from 'src/utils/cleanupRegistry.js'
- import { createIdleTimeoutManager } from 'src/utils/idleTimeout.js'
- import type {
- SDKStatus,
- ModelInfo,
- SDKMessage,
- SDKUserMessage,
- SDKUserMessageReplay,
- PermissionResult,
- McpServerConfigForProcessTransport,
- McpServerStatus,
- RewindFilesResult,
- } from 'src/entrypoints/agentSdkTypes.js'
- import type {
- StdoutMessage,
- SDKControlInitializeRequest,
- SDKControlInitializeResponse,
- SDKControlRequest,
- SDKControlResponse,
- SDKControlMcpSetServersResponse,
- SDKControlReloadPluginsResponse,
- } from 'src/entrypoints/sdk/controlTypes.js'
- import type { PermissionMode } from '@anthropic-ai/claude-agent-sdk'
- import type { PermissionMode as InternalPermissionMode } from 'src/types/permissions.js'
- import { cwd } from 'process'
- import { getCwd } from 'src/utils/cwd.js'
- import omit from 'lodash-es/omit.js'
- import reject from 'lodash-es/reject.js'
- import { isPolicyAllowed } from 'src/services/policyLimits/index.js'
- import type { ReplBridgeHandle } from 'src/bridge/replBridge.js'
- import { getRemoteSessionUrl } from 'src/constants/product.js'
- import { buildBridgeConnectUrl } from 'src/bridge/bridgeStatusUtil.js'
- import { extractInboundMessageFields } from 'src/bridge/inboundMessages.js'
- import { resolveAndPrepend } from 'src/bridge/inboundAttachments.js'
- import type { CanUseToolFn } from 'src/hooks/useCanUseTool.js'
- import { hasPermissionsToUseTool } from 'src/utils/permissions/permissions.js'
- import { safeParseJSON } from 'src/utils/json.js'
- import {
- outputSchema as permissionToolOutputSchema,
- permissionPromptToolResultToPermissionDecision,
- } from 'src/utils/permissions/PermissionPromptToolResultSchema.js'
- import { createAbortController } from 'src/utils/abortController.js'
- import { createCombinedAbortSignal } from 'src/utils/combinedAbortSignal.js'
- import { generateSessionTitle } from 'src/utils/sessionTitle.js'
- import { buildSideQuestionFallbackParams } from 'src/utils/queryContext.js'
- import { runSideQuestion } from 'src/utils/sideQuestion.js'
- import {
- processSessionStartHooks,
- processSetupHooks,
- takeInitialUserMessage,
- } from 'src/utils/sessionStart.js'
- import {
- DEFAULT_OUTPUT_STYLE_NAME,
- getAllOutputStyles,
- } from 'src/constants/outputStyles.js'
- import { TEAMMATE_MESSAGE_TAG, TICK_TAG } from 'src/constants/xml.js'
- import {
- getSettings_DEPRECATED,
- getSettingsWithSources,
- } from 'src/utils/settings/settings.js'
- import { settingsChangeDetector } from 'src/utils/settings/changeDetector.js'
- import { applySettingsChange } from 'src/utils/settings/applySettingsChange.js'
- import {
- isFastModeAvailable,
- isFastModeEnabled,
- isFastModeSupportedByModel,
- getFastModeState,
- } from 'src/utils/fastMode.js'
- import {
- isAutoModeGateEnabled,
- getAutoModeUnavailableNotification,
- getAutoModeUnavailableReason,
- isBypassPermissionsModeDisabled,
- transitionPermissionMode,
- } from 'src/utils/permissions/permissionSetup.js'
- import {
- tryGenerateSuggestion,
- logSuggestionOutcome,
- logSuggestionSuppressed,
- type PromptVariant,
- } from 'src/services/PromptSuggestion/promptSuggestion.js'
- import { getLastCacheSafeParams } from 'src/utils/forkedAgent.js'
- import { getAccountInformation } from 'src/utils/auth.js'
- import { OAuthService } from 'src/services/oauth/index.js'
- import { installOAuthTokens } from 'src/cli/handlers/auth.js'
- import { getAPIProvider } from 'src/utils/model/providers.js'
- import type { HookCallbackMatcher } from 'src/types/hooks.js'
- import { AwsAuthStatusManager } from 'src/utils/awsAuthStatusManager.js'
- import type { HookEvent } from 'src/entrypoints/agentSdkTypes.js'
- import {
- registerHookCallbacks,
- setInitJsonSchema,
- getInitJsonSchema,
- setSdkAgentProgressSummariesEnabled,
- } from 'src/bootstrap/state.js'
- import { createSyntheticOutputTool } from 'src/tools/SyntheticOutputTool/SyntheticOutputTool.js'
- import { parseSessionIdentifier } from 'src/utils/sessionUrl.js'
- import {
- hydrateRemoteSession,
- hydrateFromCCRv2InternalEvents,
- resetSessionFilePointer,
- doesMessageExistInSession,
- findUnresolvedToolUse,
- recordAttributionSnapshot,
- saveAgentSetting,
- saveMode,
- saveAiGeneratedTitle,
- restoreSessionMetadata,
- } from 'src/utils/sessionStorage.js'
- import { incrementPromptCount } from 'src/utils/commitAttribution.js'
- import {
- setupSdkMcpClients,
- connectToServer,
- clearServerCache,
- fetchToolsForClient,
- areMcpConfigsEqual,
- reconnectMcpServerImpl,
- } from 'src/services/mcp/client.js'
- import {
- filterMcpServersByPolicy,
- getMcpConfigByName,
- isMcpServerDisabled,
- setMcpServerEnabled,
- } from 'src/services/mcp/config.js'
- import {
- performMCPOAuthFlow,
- revokeServerTokens,
- } from 'src/services/mcp/auth.js'
- import {
- runElicitationHooks,
- runElicitationResultHooks,
- } from 'src/services/mcp/elicitationHandler.js'
- import { executeNotificationHooks } from 'src/utils/hooks.js'
- import {
- ElicitRequestSchema,
- ElicitationCompleteNotificationSchema,
- } from '@modelcontextprotocol/sdk/types.js'
- import { getMcpPrefix } from 'src/services/mcp/mcpStringUtils.js'
- import {
- commandBelongsToServer,
- filterToolsByServer,
- } from 'src/services/mcp/utils.js'
- import { setupVscodeSdkMcp } from 'src/services/mcp/vscodeSdkMcp.js'
- import { getAllMcpConfigs } from 'src/services/mcp/config.js'
- import {
- isQualifiedForGrove,
- checkGroveForNonInteractive,
- } from 'src/services/api/grove.js'
- import {
- toInternalMessages,
- toSDKRateLimitInfo,
- } from 'src/utils/messages/mappers.js'
- import { createModelSwitchBreadcrumbs } from 'src/utils/messages.js'
- import { collectContextData } from 'src/commands/context/context-noninteractive.js'
- import { LOCAL_COMMAND_STDOUT_TAG } from 'src/constants/xml.js'
- import {
- statusListeners,
- type ClaudeAILimits,
- } from 'src/services/claudeAiLimits.js'
- import {
- getDefaultMainLoopModel,
- getMainLoopModel,
- modelDisplayString,
- parseUserSpecifiedModel,
- } from 'src/utils/model/model.js'
- import { getModelOptions } from 'src/utils/model/modelOptions.js'
- import {
- modelSupportsEffort,
- modelSupportsMaxEffort,
- EFFORT_LEVELS,
- resolveAppliedEffort,
- } from 'src/utils/effort.js'
- import { modelSupportsAdaptiveThinking } from 'src/utils/thinking.js'
- import { modelSupportsAutoMode } from 'src/utils/betas.js'
- import { ensureModelStringsInitialized } from 'src/utils/model/modelStrings.js'
- import {
- getSessionId,
- setMainLoopModelOverride,
- setMainThreadAgentType,
- switchSession,
- isSessionPersistenceDisabled,
- getIsRemoteMode,
- getFlagSettingsInline,
- setFlagSettingsInline,
- getMainThreadAgentType,
- getAllowedChannels,
- setAllowedChannels,
- type ChannelEntry,
- } from 'src/bootstrap/state.js'
- import { runWithWorkload, WORKLOAD_CRON } from 'src/utils/workloadContext.js'
- import type { UUID } from 'crypto'
- import { randomUUID } from 'crypto'
- import type { ContentBlockParam } from '@anthropic-ai/sdk/resources/messages.mjs'
- import type { AppState } from 'src/state/AppStateStore.js'
- import {
- fileHistoryRewind,
- fileHistoryCanRestore,
- fileHistoryEnabled,
- fileHistoryGetDiffStats,
- } from 'src/utils/fileHistory.js'
- import {
- restoreAgentFromSession,
- restoreSessionStateFromLog,
- } from 'src/utils/sessionRestore.js'
- import { SandboxManager } from 'src/utils/sandbox/sandbox-adapter.js'
- import {
- headlessProfilerStartTurn,
- headlessProfilerCheckpoint,
- logHeadlessProfilerTurn,
- } from 'src/utils/headlessProfiler.js'
- import {
- startQueryProfile,
- logQueryProfileReport,
- } from 'src/utils/queryProfiler.js'
- import { asSessionId } from 'src/types/ids.js'
- import { jsonStringify } from '../utils/slowOperations.js'
- import { skillChangeDetector } from '../utils/skills/skillChangeDetector.js'
- import { getCommands, clearCommandsCache } from '../commands.js'
- import {
- isBareMode,
- isEnvTruthy,
- isEnvDefinedFalsy,
- } from '../utils/envUtils.js'
- import { installPluginsForHeadless } from '../utils/plugins/headlessPluginInstall.js'
- import { refreshActivePlugins } from '../utils/plugins/refresh.js'
- import { loadAllPluginsCacheOnly } from '../utils/plugins/pluginLoader.js'
- import {
- isTeamLead,
- hasActiveInProcessTeammates,
- hasWorkingInProcessTeammates,
- waitForTeammatesToBecomeIdle,
- } from '../utils/teammate.js'
- import {
- readUnreadMessages,
- markMessagesAsRead,
- isShutdownApproved,
- } from '../utils/teammateMailbox.js'
- import { removeTeammateFromTeamFile } from '../utils/swarm/teamHelpers.js'
- import { unassignTeammateTasks } from '../utils/tasks.js'
- import { getRunningTasks } from '../utils/task/framework.js'
- import { isBackgroundTask } from '../tasks/types.js'
- import { stopTask } from '../tasks/stopTask.js'
- import { drainSdkEvents } from '../utils/sdkEventQueue.js'
- import { initializeGrowthBook } from '../services/analytics/growthbook.js'
- import { errorMessage, toError } from '../utils/errors.js'
- import { sleep } from '../utils/sleep.js'
- import { isExtractModeActive } from '../memdir/paths.js'
- // Dead code elimination: conditional imports
- /* eslint-disable @typescript-eslint/no-require-imports */
- const coordinatorModeModule = feature('COORDINATOR_MODE')
- ? (require('../coordinator/coordinatorMode.js') as typeof import('../coordinator/coordinatorMode.js'))
- : null
- const proactiveModule =
- feature('PROACTIVE') || feature('KAIROS')
- ? (require('../proactive/index.js') as typeof import('../proactive/index.js'))
- : null
- const cronSchedulerModule = require('../utils/cronScheduler.js') as typeof import('../utils/cronScheduler.js')
- const cronJitterConfigModule = require('../utils/cronJitterConfig.js') as typeof import('../utils/cronJitterConfig.js')
- const cronGate = require('../tools/ScheduleCronTool/prompt.js') as typeof import('../tools/ScheduleCronTool/prompt.js')
- const extractMemoriesModule = feature('EXTRACT_MEMORIES')
- ? (require('../services/extractMemories/extractMemories.js') as typeof import('../services/extractMemories/extractMemories.js'))
- : null
- /* eslint-enable @typescript-eslint/no-require-imports */
- const SHUTDOWN_TEAM_PROMPT = `<system-reminder>
- You are running in non-interactive mode and cannot return a response to the user until your team is shut down.
- You MUST shut down your team before preparing your final response:
- 1. Use requestShutdown to ask each team member to shut down gracefully
- 2. Wait for shutdown approvals
- 3. Use the cleanup operation to clean up the team
- 4. Only then provide your final response to the user
- The user cannot receive your response until the team is completely shut down.
- </system-reminder>
- Shut down your team and prepare your final response for the user.`
- // Track message UUIDs received during the current session runtime
- const MAX_RECEIVED_UUIDS = 10_000
- const receivedMessageUuids = new Set<UUID>()
- const receivedMessageUuidsOrder: UUID[] = []
- function trackReceivedMessageUuid(uuid: UUID): boolean {
- if (receivedMessageUuids.has(uuid)) {
- return false // duplicate
- }
- receivedMessageUuids.add(uuid)
- receivedMessageUuidsOrder.push(uuid)
- // Evict oldest entries when at capacity
- if (receivedMessageUuidsOrder.length > MAX_RECEIVED_UUIDS) {
- const toEvict = receivedMessageUuidsOrder.splice(
- 0,
- receivedMessageUuidsOrder.length - MAX_RECEIVED_UUIDS,
- )
- for (const old of toEvict) {
- receivedMessageUuids.delete(old)
- }
- }
- return true // new UUID
- }
- type PromptValue = string | ContentBlockParam[]
- function toBlocks(v: PromptValue): ContentBlockParam[] {
- return typeof v === 'string' ? [{ type: 'text', text: v }] : v
- }
- /**
- * Join prompt values from multiple queued commands into one. Strings are
- * newline-joined; if any value is a block array, all values are normalized
- * to blocks and concatenated.
- */
- export function joinPromptValues(values: PromptValue[]): PromptValue {
- if (values.length === 1) return values[0]!
- if (values.every(v => typeof v === 'string')) {
- return values.join('\n')
- }
- return values.flatMap(toBlocks)
- }
- /**
- * Whether `next` can be batched into the same ask() call as `head`. Only
- * prompt-mode commands batch, and only when the workload tag matches (so the
- * combined turn is attributed correctly) and the isMeta flag matches (so a
- * proactive tick can't merge into a user prompt and lose its hidden-in-
- * transcript marking when the head is spread over the merged command).
- */
- export function canBatchWith(
- head: QueuedCommand,
- next: QueuedCommand | undefined,
- ): boolean {
- return (
- next !== undefined &&
- next.mode === 'prompt' &&
- next.workload === head.workload &&
- next.isMeta === head.isMeta
- )
- }
- export async function runHeadless(
- inputPrompt: string | AsyncIterable<string>,
- getAppState: () => AppState,
- setAppState: (f: (prev: AppState) => AppState) => void,
- commands: Command[],
- tools: Tools,
- sdkMcpConfigs: Record<string, McpSdkServerConfig>,
- agents: AgentDefinition[],
- options: {
- continue: boolean | undefined
- resume: string | boolean | undefined
- resumeSessionAt: string | undefined
- verbose: boolean | undefined
- outputFormat: string | undefined
- jsonSchema: Record<string, unknown> | undefined
- permissionPromptToolName: string | undefined
- allowedTools: string[] | undefined
- thinkingConfig: ThinkingConfig | undefined
- maxTurns: number | undefined
- maxBudgetUsd: number | undefined
- taskBudget: { total: number } | undefined
- systemPrompt: string | undefined
- appendSystemPrompt: string | undefined
- userSpecifiedModel: string | undefined
- fallbackModel: string | undefined
- teleport: string | true | null | undefined
- sdkUrl: string | undefined
- replayUserMessages: boolean | undefined
- includePartialMessages: boolean | undefined
- forkSession: boolean | undefined
- rewindFiles: string | undefined
- enableAuthStatus: boolean | undefined
- agent: string | undefined
- workload: string | undefined
- setupTrigger?: 'init' | 'maintenance' | undefined
- sessionStartHooksPromise?: ReturnType<typeof processSessionStartHooks>
- setSDKStatus?: (status: SDKStatus) => void
- },
- ): Promise<void> {
- if (
- process.env.USER_TYPE === 'ant' &&
- isEnvTruthy(process.env.CLAUDE_CODE_EXIT_AFTER_FIRST_RENDER)
- ) {
- process.stderr.write(
- `\nStartup time: ${Math.round(process.uptime() * 1000)}ms\n`,
- )
- // eslint-disable-next-line custom-rules/no-process-exit
- process.exit(0)
- }
- // Fire user settings download now so it overlaps with the MCP/tool setup
- // below. Managed settings already started in main.tsx preAction; this gives
- // user settings a similar head start. The cached promise is joined in
- // installPluginsAndApplyMcpInBackground before plugin install reads
- // enabledPlugins.
- if (
- feature('DOWNLOAD_USER_SETTINGS') &&
- (isEnvTruthy(process.env.CLAUDE_CODE_REMOTE) || getIsRemoteMode())
- ) {
- void downloadUserSettings()
- }
- // In headless mode there is no React tree, so the useSettingsChange hook
- // never runs. Subscribe directly so that settings changes (including
- // managed-settings / policy updates) are fully applied.
- settingsChangeDetector.subscribe(source => {
- applySettingsChange(source, setAppState)
- // In headless mode, also sync the denormalized fastMode field from
- // settings. The TUI manages fastMode via the UI so it skips this.
- if (isFastModeEnabled()) {
- setAppState(prev => {
- const s = prev.settings as Record<string, unknown>
- const fastMode = s.fastMode === true && !s.fastModePerSessionOptIn
- return { ...prev, fastMode }
- })
- }
- })
- // Proactive activation is now handled in main.tsx before getTools() so
- // SleepTool passes isEnabled() filtering. This fallback covers the case
- // where CLAUDE_CODE_PROACTIVE is set but main.tsx's check didn't fire
- // (e.g. env was injected by the SDK transport after argv parsing).
- if (
- (feature('PROACTIVE') || feature('KAIROS')) &&
- proactiveModule &&
- !proactiveModule.isProactiveActive() &&
- isEnvTruthy(process.env.CLAUDE_CODE_PROACTIVE)
- ) {
- proactiveModule.activateProactive('command')
- }
- // Periodically force a full GC to keep memory usage in check
- if (typeof Bun !== 'undefined') {
- const gcTimer = setInterval(Bun.gc, 1000)
- gcTimer.unref()
- }
- // Start headless profiler for first turn
- headlessProfilerStartTurn()
- headlessProfilerCheckpoint('runHeadless_entry')
- // Check Grove requirements for non-interactive consumer subscribers
- if (await isQualifiedForGrove()) {
- await checkGroveForNonInteractive()
- }
- headlessProfilerCheckpoint('after_grove_check')
- // Initialize GrowthBook so feature flags take effect in headless mode.
- // Without this, the disk cache is empty and all flags fall back to defaults.
- void initializeGrowthBook()
- if (options.resumeSessionAt && !options.resume) {
- process.stderr.write(`Error: --resume-session-at requires --resume\n`)
- gracefulShutdownSync(1)
- return
- }
- if (options.rewindFiles && !options.resume) {
- process.stderr.write(`Error: --rewind-files requires --resume\n`)
- gracefulShutdownSync(1)
- return
- }
- if (options.rewindFiles && inputPrompt) {
- process.stderr.write(
- `Error: --rewind-files is a standalone operation and cannot be used with a prompt\n`,
- )
- gracefulShutdownSync(1)
- return
- }
- const structuredIO = getStructuredIO(inputPrompt, options)
- // When emitting NDJSON for SDK clients, any stray write to stdout (debug
- // prints, dependency console.log, library banners) breaks the client's
- // line-by-line JSON parser. Install a guard that diverts non-JSON lines to
- // stderr so the stream stays clean. Must run before the first
- // structuredIO.write below.
- if (options.outputFormat === 'stream-json') {
- installStreamJsonStdoutGuard()
- }
- // #34044: if user explicitly set sandbox.enabled=true but deps are missing,
- // isSandboxingEnabled() returns false silently. Surface the reason so users
- // know their security config isn't being enforced.
- const sandboxUnavailableReason = SandboxManager.getSandboxUnavailableReason()
- if (sandboxUnavailableReason) {
- if (SandboxManager.isSandboxRequired()) {
- process.stderr.write(
- `\nError: sandbox required but unavailable: ${sandboxUnavailableReason}\n` +
- ` sandbox.failIfUnavailable is set — refusing to start without a working sandbox.\n\n`,
- )
- gracefulShutdownSync(1)
- return
- }
- process.stderr.write(
- `\n⚠ Sandbox disabled: ${sandboxUnavailableReason}\n` +
- ` Commands will run WITHOUT sandboxing. Network and filesystem restrictions will NOT be enforced.\n\n`,
- )
- } else if (SandboxManager.isSandboxingEnabled()) {
- // Initialize sandbox with a callback that forwards network permission
- // requests to the SDK host via the can_use_tool control_request protocol.
- // This must happen after structuredIO is created so we can send requests.
- try {
- await SandboxManager.initialize(structuredIO.createSandboxAskCallback())
- } catch (err) {
- process.stderr.write(`\n❌ Sandbox Error: ${errorMessage(err)}\n`)
- gracefulShutdownSync(1, 'other')
- return
- }
- }
- if (options.outputFormat === 'stream-json' && options.verbose) {
- registerHookEventHandler(event => {
- const message: StdoutMessage = (() => {
- switch (event.type) {
- case 'started':
- return {
- type: 'system' as const,
- subtype: 'hook_started' as const,
- hook_id: event.hookId,
- hook_name: event.hookName,
- hook_event: event.hookEvent,
- uuid: randomUUID(),
- session_id: getSessionId(),
- }
- case 'progress':
- return {
- type: 'system' as const,
- subtype: 'hook_progress' as const,
- hook_id: event.hookId,
- hook_name: event.hookName,
- hook_event: event.hookEvent,
- stdout: event.stdout,
- stderr: event.stderr,
- output: event.output,
- uuid: randomUUID(),
- session_id: getSessionId(),
- }
- case 'response':
- return {
- type: 'system' as const,
- subtype: 'hook_response' as const,
- hook_id: event.hookId,
- hook_name: event.hookName,
- hook_event: event.hookEvent,
- output: event.output,
- stdout: event.stdout,
- stderr: event.stderr,
- exit_code: event.exitCode,
- outcome: event.outcome,
- uuid: randomUUID(),
- session_id: getSessionId(),
- }
- }
- })()
- void structuredIO.write(message)
- })
- }
- if (options.setupTrigger) {
- await processSetupHooks(options.setupTrigger)
- }
- headlessProfilerCheckpoint('before_loadInitialMessages')
- const appState = getAppState()
- const {
- messages: initialMessages,
- turnInterruptionState,
- agentSetting: resumedAgentSetting,
- } = await loadInitialMessages(setAppState, {
- continue: options.continue,
- teleport: options.teleport,
- resume: options.resume,
- resumeSessionAt: options.resumeSessionAt,
- forkSession: options.forkSession,
- outputFormat: options.outputFormat,
- sessionStartHooksPromise: options.sessionStartHooksPromise,
- restoredWorkerState: structuredIO.restoredWorkerState,
- })
- // SessionStart hooks can emit initialUserMessage — the first user turn for
- // headless orchestrator sessions where stdin is empty and additionalContext
- // alone (an attachment, not a turn) would leave the REPL with nothing to
- // respond to. The hook promise is awaited inside loadInitialMessages, so the
- // module-level pending value is set by the time we get here.
- const hookInitialUserMessage = takeInitialUserMessage()
- if (hookInitialUserMessage) {
- structuredIO.prependUserMessage(hookInitialUserMessage)
- }
- // Restore agent setting from the resumed session (if not overridden by current --agent flag
- // or settings-based agent, which would already have set mainThreadAgentType in main.tsx)
- if (!options.agent && !getMainThreadAgentType() && resumedAgentSetting) {
- const { agentDefinition: restoredAgent } = restoreAgentFromSession(
- resumedAgentSetting,
- undefined,
- { activeAgents: agents, allAgents: agents },
- )
- if (restoredAgent) {
- setAppState(prev => ({ ...prev, agent: restoredAgent.agentType }))
- // Apply the agent's system prompt for non-built-in agents (mirrors main.tsx initial --agent path)
- if (!options.systemPrompt && !isBuiltInAgent(restoredAgent)) {
- const agentSystemPrompt = restoredAgent.getSystemPrompt()
- if (agentSystemPrompt) {
- options.systemPrompt = agentSystemPrompt
- }
- }
- // Re-persist agent setting so future resumes maintain the agent
- saveAgentSetting(restoredAgent.agentType)
- }
- }
- // gracefulShutdownSync schedules an async shutdown and sets process.exitCode.
- // If a loadInitialMessages error path triggered it, bail early to avoid
- // unnecessary work while the process winds down.
- if (initialMessages.length === 0 && process.exitCode !== undefined) {
- return
- }
- // Handle --rewind-files: restore filesystem and exit immediately
- if (options.rewindFiles) {
- // File history snapshots are only created for user messages,
- // so we require the target to be a user message
- const targetMessage = initialMessages.find(
- m => m.uuid === options.rewindFiles,
- )
- if (!targetMessage || targetMessage.type !== 'user') {
- process.stderr.write(
- `Error: --rewind-files requires a user message UUID, but ${options.rewindFiles} is not a user message in this session\n`,
- )
- gracefulShutdownSync(1)
- return
- }
- const currentAppState = getAppState()
- const result = await handleRewindFiles(
- options.rewindFiles as UUID,
- currentAppState,
- setAppState,
- false,
- )
- if (!result.canRewind) {
- process.stderr.write(`Error: ${result.error || 'Unexpected error'}\n`)
- gracefulShutdownSync(1)
- return
- }
- // Rewind complete - exit successfully
- process.stdout.write(
- `Files rewound to state at message ${options.rewindFiles}\n`,
- )
- gracefulShutdownSync(0)
- return
- }
- // Check if we need input prompt - skip if we're resuming with a valid session ID/JSONL file or using SDK URL
- const hasValidResumeSessionId =
- typeof options.resume === 'string' &&
- (Boolean(validateUuid(options.resume)) || options.resume.endsWith('.jsonl'))
- const isUsingSdkUrl = Boolean(options.sdkUrl)
- if (!inputPrompt && !hasValidResumeSessionId && !isUsingSdkUrl) {
- process.stderr.write(
- `Error: Input must be provided either through stdin or as a prompt argument when using --print\n`,
- )
- gracefulShutdownSync(1)
- return
- }
- if (options.outputFormat === 'stream-json' && !options.verbose) {
- process.stderr.write(
- 'Error: When using --print, --output-format=stream-json requires --verbose\n',
- )
- gracefulShutdownSync(1)
- return
- }
- // Filter out MCP tools that are in the deny list
- const allowedMcpTools = filterToolsByDenyRules(
- appState.mcp.tools,
- appState.toolPermissionContext,
- )
- let filteredTools = [...tools, ...allowedMcpTools]
- // When using SDK URL, always use stdio permission prompting to delegate to the SDK
- const effectivePermissionPromptToolName = options.sdkUrl
- ? 'stdio'
- : options.permissionPromptToolName
- // Callback for when a permission prompt is shown
- const onPermissionPrompt = (details: RequiresActionDetails) => {
- if (feature('COMMIT_ATTRIBUTION')) {
- setAppState(prev => ({
- ...prev,
- attribution: {
- ...prev.attribution,
- permissionPromptCount: prev.attribution.permissionPromptCount + 1,
- },
- }))
- }
- notifySessionStateChanged('requires_action', details)
- }
- const canUseTool = getCanUseToolFn(
- effectivePermissionPromptToolName,
- structuredIO,
- () => getAppState().mcp.tools,
- onPermissionPrompt,
- )
- if (options.permissionPromptToolName) {
- // Remove the permission prompt tool from the list of available tools.
- filteredTools = filteredTools.filter(
- tool => !toolMatchesName(tool, options.permissionPromptToolName!),
- )
- }
- // Install errors handlers to gracefully handle broken pipes (e.g., when parent process dies)
- registerProcessOutputErrorHandlers()
- headlessProfilerCheckpoint('after_loadInitialMessages')
- // Ensure model strings are initialized before generating model options.
- // For Bedrock users, this waits for the profile fetch to get correct region strings.
- await ensureModelStringsInitialized()
- headlessProfilerCheckpoint('after_modelStrings')
- // UDS inbox store registration is deferred until after `run` is defined
- // so we can pass `run` as the onEnqueue callback (see below).
- // Only `json` + `verbose` needs the full array (jsonStringify(messages) below).
- // For stream-json (SDK/CCR) and default text output, only the last message is
- // read for the exit code / final result. Avoid accumulating every message in
- // memory for the entire session.
- const needsFullArray = options.outputFormat === 'json' && options.verbose
- const messages: SDKMessage[] = []
- let lastMessage: SDKMessage | undefined
- // Streamlined mode transforms messages when CLAUDE_CODE_STREAMLINED_OUTPUT=true and using stream-json
- // Build flag gates this out of external builds; env var is the runtime opt-in for ant builds
- const transformToStreamlined =
- feature('STREAMLINED_OUTPUT') &&
- isEnvTruthy(process.env.CLAUDE_CODE_STREAMLINED_OUTPUT) &&
- options.outputFormat === 'stream-json'
- ? createStreamlinedTransformer()
- : null
- headlessProfilerCheckpoint('before_runHeadlessStreaming')
- for await (const message of runHeadlessStreaming(
- structuredIO,
- appState.mcp.clients,
- [...commands, ...appState.mcp.commands],
- filteredTools,
- initialMessages,
- canUseTool,
- sdkMcpConfigs,
- getAppState,
- setAppState,
- agents,
- options,
- turnInterruptionState,
- )) {
- if (transformToStreamlined) {
- // Streamlined mode: transform messages and stream immediately
- const transformed = transformToStreamlined(message)
- if (transformed) {
- await structuredIO.write(transformed)
- }
- } else if (options.outputFormat === 'stream-json' && options.verbose) {
- await structuredIO.write(message)
- }
- // Should not be getting control messages or stream events in non-stream mode.
- // Also filter out streamlined types since they're only produced by the transformer.
- // SDK-only system events are excluded so lastMessage stays at the result
- // (session_state_changed(idle) and any late task_notification drain after
- // result in the finally block).
- if (
- message.type !== 'control_response' &&
- message.type !== 'control_request' &&
- message.type !== 'control_cancel_request' &&
- !(
- message.type === 'system' &&
- (message.subtype === 'session_state_changed' ||
- message.subtype === 'task_notification' ||
- message.subtype === 'task_started' ||
- message.subtype === 'task_progress' ||
- message.subtype === 'post_turn_summary')
- ) &&
- message.type !== 'stream_event' &&
- message.type !== 'keep_alive' &&
- message.type !== 'streamlined_text' &&
- message.type !== 'streamlined_tool_use_summary' &&
- message.type !== 'prompt_suggestion'
- ) {
- if (needsFullArray) {
- messages.push(message)
- }
- lastMessage = message
- }
- }
- switch (options.outputFormat) {
- case 'json':
- if (!lastMessage || lastMessage.type !== 'result') {
- throw new Error('No messages returned')
- }
- if (options.verbose) {
- writeToStdout(jsonStringify(messages) + '\n')
- break
- }
- writeToStdout(jsonStringify(lastMessage) + '\n')
- break
- case 'stream-json':
- // already logged above
- break
- default:
- if (!lastMessage || lastMessage.type !== 'result') {
- throw new Error('No messages returned')
- }
- switch (lastMessage.subtype) {
- case 'success':
- writeToStdout(
- (lastMessage.result as string).endsWith('\n')
- ? (lastMessage.result as string)
- : (lastMessage.result as string) + '\n',
- )
- break
- case 'error_during_execution':
- writeToStdout(`Execution error`)
- break
- case 'error_max_turns':
- writeToStdout(`Error: Reached max turns (${options.maxTurns})`)
- break
- case 'error_max_budget_usd':
- writeToStdout(`Error: Exceeded USD budget (${options.maxBudgetUsd})`)
- break
- case 'error_max_structured_output_retries':
- writeToStdout(
- `Error: Failed to provide valid structured output after maximum retries`,
- )
- }
- }
- // Log headless latency metrics for the final turn
- logHeadlessProfilerTurn()
- // Drain any in-flight memory extraction before shutdown. The response is
- // already flushed above, so this adds no user-visible latency — it just
- // delays process exit so gracefulShutdownSync's 5s failsafe doesn't kill
- // the forked agent mid-flight. Gated by isExtractModeActive so the
- // tengu_slate_thimble flag controls non-interactive extraction end-to-end.
- if (feature('EXTRACT_MEMORIES') && isExtractModeActive()) {
- await extractMemoriesModule!.drainPendingExtraction()
- }
- gracefulShutdownSync(
- lastMessage?.type === 'result' && lastMessage?.is_error ? 1 : 0,
- )
- }
- function runHeadlessStreaming(
- structuredIO: StructuredIO,
- mcpClients: MCPServerConnection[],
- commands: Command[],
- tools: Tools,
- initialMessages: Message[],
- canUseTool: CanUseToolFn,
- sdkMcpConfigs: Record<string, McpSdkServerConfig>,
- getAppState: () => AppState,
- setAppState: (f: (prev: AppState) => AppState) => void,
- agents: AgentDefinition[],
- options: {
- verbose: boolean | undefined
- jsonSchema: Record<string, unknown> | undefined
- permissionPromptToolName: string | undefined
- allowedTools: string[] | undefined
- thinkingConfig: ThinkingConfig | undefined
- maxTurns: number | undefined
- maxBudgetUsd: number | undefined
- taskBudget: { total: number } | undefined
- systemPrompt: string | undefined
- appendSystemPrompt: string | undefined
- userSpecifiedModel: string | undefined
- fallbackModel: string | undefined
- replayUserMessages?: boolean | undefined
- includePartialMessages?: boolean | undefined
- enableAuthStatus?: boolean | undefined
- agent?: string | undefined
- setSDKStatus?: (status: SDKStatus) => void
- promptSuggestions?: boolean | undefined
- workload?: string | undefined
- },
- turnInterruptionState?: TurnInterruptionState,
- ): AsyncIterable<StdoutMessage> {
- let running = false
- let runPhase:
- | 'draining_commands'
- | 'waiting_for_agents'
- | 'finally_flush'
- | 'finally_post_flush'
- | undefined
- let inputClosed = false
- let shutdownPromptInjected = false
- let heldBackResult: StdoutMessage | null = null
- let abortController: AbortController | undefined
- // Same queue sendRequest() enqueues to — one FIFO for everything.
- const output = structuredIO.outbound
- // Ctrl+C in -p mode: abort the in-flight query, then shut down gracefully.
- // gracefulShutdown persists session state and flushes analytics, with a
- // failsafe timer that force-exits if cleanup hangs.
- const sigintHandler = () => {
- logForDiagnosticsNoPII('info', 'shutdown_signal', { signal: 'SIGINT' })
- if (abortController && !abortController.signal.aborted) {
- abortController.abort()
- }
- void gracefulShutdown(0)
- }
- process.on('SIGINT', sigintHandler)
- // Dump run()'s state at SIGTERM so a stuck session's healthsweep can name
- // the do/while(waitingForAgents) poll without reading the transcript.
- registerCleanup(async () => {
- const bg: Record<string, number> = {}
- for (const t of getRunningTasks(getAppState())) {
- if (isBackgroundTask(t)) bg[t.type] = (bg[t.type] ?? 0) + 1
- }
- logForDiagnosticsNoPII('info', 'run_state_at_shutdown', {
- run_active: running,
- run_phase: runPhase,
- worker_status: getSessionState(),
- internal_events_pending: structuredIO.internalEventsPending,
- bg_tasks: bg,
- })
- })
- // Wire the central onChangeAppState mode-diff hook to the SDK output stream.
- // This fires whenever ANY code path mutates toolPermissionContext.mode —
- // Shift+Tab, ExitPlanMode dialog, /plan slash command, rewind, bridge
- // set_permission_mode, the query loop, stop_task — rather than the two
- // paths that previously went through a bespoke wrapper.
- // The wrapper's body was fully redundant (it enqueued here AND called
- // notifySessionMetadataChanged, both of which onChangeAppState now covers);
- // keeping it would double-emit status messages.
- setPermissionModeChangedListener(newMode => {
- // Only emit for SDK-exposed modes.
- if (
- newMode === 'default' ||
- newMode === 'acceptEdits' ||
- newMode === 'bypassPermissions' ||
- newMode === 'plan' ||
- newMode === (feature('TRANSCRIPT_CLASSIFIER') && 'auto') ||
- newMode === 'dontAsk'
- ) {
- output.enqueue({
- type: 'system',
- subtype: 'status',
- status: null,
- permissionMode: newMode as PermissionMode,
- uuid: randomUUID(),
- session_id: getSessionId(),
- })
- }
- })
- // Prompt suggestion tracking (push model)
- const suggestionState: {
- abortController: AbortController | null
- inflightPromise: Promise<void> | null
- lastEmitted: {
- text: string
- emittedAt: number
- promptId: PromptVariant
- generationRequestId: string | null
- } | null
- pendingSuggestion: {
- type: 'prompt_suggestion'
- suggestion: string
- uuid: UUID
- session_id: string
- } | null
- pendingLastEmittedEntry: {
- text: string
- promptId: PromptVariant
- generationRequestId: string | null
- } | null
- } = {
- abortController: null,
- inflightPromise: null,
- lastEmitted: null,
- pendingSuggestion: null,
- pendingLastEmittedEntry: null,
- }
- // Set up AWS auth status listener if enabled
- let unsubscribeAuthStatus: (() => void) | undefined
- if (options.enableAuthStatus) {
- const authStatusManager = AwsAuthStatusManager.getInstance()
- unsubscribeAuthStatus = authStatusManager.subscribe(status => {
- output.enqueue({
- type: 'auth_status',
- isAuthenticating: status.isAuthenticating,
- output: status.output,
- error: status.error,
- uuid: randomUUID(),
- session_id: getSessionId(),
- })
- })
- }
- // Set up rate limit status listener to emit SDKRateLimitEvent for all status changes.
- // Emitting for all statuses (including 'allowed') ensures consumers can clear warnings
- // when rate limits reset. The upstream emitStatusChange already deduplicates via isEqual.
- const rateLimitListener = (limits: ClaudeAILimits) => {
- const rateLimitInfo = toSDKRateLimitInfo(limits)
- if (rateLimitInfo) {
- output.enqueue({
- type: 'rate_limit_event',
- rate_limit_info: rateLimitInfo,
- uuid: randomUUID(),
- session_id: getSessionId(),
- })
- }
- }
- statusListeners.add(rateLimitListener)
- // Messages for internal tracking, directly mutated by ask(). These messages
- // include Assistant, User, Attachment, and Progress messages.
- // TODO: Clean up this code to avoid passing around a mutable array.
- const mutableMessages: Message[] = initialMessages
- // Seed the readFileState cache from the transcript (content the model saw,
- // with message timestamps) so getChangedFiles can detect external edits.
- // This cache instance must persist across ask() calls, since the edit tool
- // relies on this as a global state.
- let readFileState = extractReadFilesFromMessages(
- initialMessages,
- cwd(),
- READ_FILE_STATE_CACHE_SIZE,
- )
- // Client-supplied readFileState seeds (via seed_read_state control request).
- // The stdin IIFE runs concurrently with ask() — a seed arriving mid-turn
- // would be lost to ask()'s clone-then-replace (QueryEngine.ts finally block)
- // if written directly into readFileState. Instead, seeds land here, merge
- // into getReadFileCache's view (readFileState-wins-ties: seeds fill gaps),
- // and are re-applied then CLEARED in setReadFileCache. One-shot: each seed
- // survives exactly one clone-replace cycle, then becomes a regular
- // readFileState entry subject to compact's clear like everything else.
- const pendingSeeds = createFileStateCacheWithSizeLimit(
- READ_FILE_STATE_CACHE_SIZE,
- )
- // Auto-resume interrupted turns on restart so CC continues from where it
- // left off without requiring the SDK to re-send the prompt.
- const resumeInterruptedTurnEnv =
- process.env.CLAUDE_CODE_RESUME_INTERRUPTED_TURN
- if (
- turnInterruptionState &&
- turnInterruptionState.kind !== 'none' &&
- resumeInterruptedTurnEnv
- ) {
- logForDebugging(
- `[print.ts] Auto-resuming interrupted turn (kind: ${turnInterruptionState.kind})`,
- )
- // Remove the interrupted message and its sentinel, then re-enqueue so
- // the model sees it exactly once. For mid-turn interruptions, the
- // deserialization layer transforms them into interrupted_prompt by
- // appending a synthetic "Continue from where you left off." message.
- removeInterruptedMessage(mutableMessages, turnInterruptionState.message)
- enqueue({
- mode: 'prompt',
- value: turnInterruptionState.message.message.content,
- uuid: randomUUID(),
- })
- }
- const modelOptions = getModelOptions()
- const modelInfos = modelOptions.map(option => {
- const modelId = option.value === null ? 'default' : option.value
- const resolvedModel =
- modelId === 'default'
- ? getDefaultMainLoopModel()
- : parseUserSpecifiedModel(modelId)
- const hasEffort = modelSupportsEffort(resolvedModel)
- const hasAdaptiveThinking = modelSupportsAdaptiveThinking(resolvedModel)
- const hasFastMode = isFastModeSupportedByModel(option.value)
- const hasAutoMode = modelSupportsAutoMode(resolvedModel)
- return {
- name: modelId,
- value: modelId,
- displayName: option.label,
- description: option.description,
- ...(hasEffort && {
- supportsEffort: true,
- supportedEffortLevels: modelSupportsMaxEffort(resolvedModel)
- ? [...EFFORT_LEVELS]
- : EFFORT_LEVELS.filter(l => l !== 'max'),
- }),
- ...(hasAdaptiveThinking && { supportsAdaptiveThinking: true }),
- ...(hasFastMode && { supportsFastMode: true }),
- ...(hasAutoMode && { supportsAutoMode: true }),
- }
- })
- let activeUserSpecifiedModel = options.userSpecifiedModel
- function injectModelSwitchBreadcrumbs(
- modelArg: string,
- resolvedModel: string,
- ): void {
- const breadcrumbs = createModelSwitchBreadcrumbs(
- modelArg,
- modelDisplayString(resolvedModel),
- )
- mutableMessages.push(...breadcrumbs)
- for (const crumb of breadcrumbs) {
- if (
- typeof crumb.message.content === 'string' &&
- crumb.message.content.includes(`<${LOCAL_COMMAND_STDOUT_TAG}>`)
- ) {
- output.enqueue({
- type: 'user',
- content: crumb.message.content,
- message: crumb.message,
- session_id: getSessionId(),
- parent_tool_use_id: null,
- uuid: crumb.uuid,
- timestamp: crumb.timestamp,
- isReplay: true,
- } as SDKUserMessageReplay)
- }
- }
- }
- // Cache SDK MCP clients to avoid reconnecting on each run
- let sdkClients: MCPServerConnection[] = []
- let sdkTools: Tools = []
- // Track which MCP clients have had elicitation handlers registered
- const elicitationRegistered = new Set<string>()
- /**
- * Register elicitation request/completion handlers on connected MCP clients
- * that haven't been registered yet. SDK MCP servers are excluded because they
- * route through SdkControlClientTransport. Hooks run first (matching REPL
- * behavior); if no hook responds, the request is forwarded to the SDK
- * consumer via the control protocol.
- */
- function registerElicitationHandlers(clients: MCPServerConnection[]): void {
- for (const connection of clients) {
- if (
- connection.type !== 'connected' ||
- elicitationRegistered.has(connection.name)
- ) {
- continue
- }
- // Skip SDK MCP servers — elicitation flows through SdkControlClientTransport
- if (connection.config.type === 'sdk') {
- continue
- }
- const serverName = connection.name
- // Wrapped in try/catch because setRequestHandler throws if the client wasn't
- // created with elicitation capability declared (e.g., SDK-created clients).
- try {
- connection.client.setRequestHandler(
- ElicitRequestSchema,
- async (request, extra) => {
- logMCPDebug(
- serverName,
- `Elicitation request received in print mode: ${jsonStringify(request)}`,
- )
- const mode = request.params.mode === 'url' ? 'url' : 'form'
- logEvent('tengu_mcp_elicitation_shown', {
- mode: mode as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS,
- })
- // Run elicitation hooks first — they can provide a response programmatically
- const hookResponse = await runElicitationHooks(
- serverName,
- request.params,
- extra.signal,
- )
- if (hookResponse) {
- logMCPDebug(
- serverName,
- `Elicitation resolved by hook: ${jsonStringify(hookResponse)}`,
- )
- logEvent('tengu_mcp_elicitation_response', {
- mode: mode as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS,
- action:
- hookResponse.action as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS,
- })
- return hookResponse
- }
- // Delegate to SDK consumer via control protocol
- const url =
- 'url' in request.params
- ? (request.params.url as string)
- : undefined
- const requestedSchema =
- 'requestedSchema' in request.params
- ? (request.params.requestedSchema as
- | Record<string, unknown>
- | undefined)
- : undefined
- const elicitationId =
- 'elicitationId' in request.params
- ? (request.params.elicitationId as string | undefined)
- : undefined
- const rawResult = await structuredIO.handleElicitation(
- serverName,
- request.params.message,
- requestedSchema,
- extra.signal,
- mode,
- url,
- elicitationId,
- )
- const result = await runElicitationResultHooks(
- serverName,
- rawResult,
- extra.signal,
- mode,
- elicitationId,
- )
- logEvent('tengu_mcp_elicitation_response', {
- mode: mode as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS,
- action:
- result.action as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS,
- })
- return result
- },
- )
- // Surface completion notifications to SDK consumers (URL mode)
- connection.client.setNotificationHandler(
- ElicitationCompleteNotificationSchema,
- notification => {
- const { elicitationId } = notification.params
- logMCPDebug(
- serverName,
- `Elicitation completion notification: ${elicitationId}`,
- )
- void executeNotificationHooks({
- message: `MCP server "${serverName}" confirmed elicitation ${elicitationId} complete`,
- notificationType: 'elicitation_complete',
- })
- output.enqueue({
- type: 'system',
- subtype: 'elicitation_complete',
- mcp_server_name: serverName,
- elicitation_id: elicitationId,
- uuid: randomUUID(),
- session_id: getSessionId(),
- })
- },
- )
- elicitationRegistered.add(serverName)
- } catch {
- // setRequestHandler throws if the client wasn't created with
- // elicitation capability — skip silently
- }
- }
- }
- async function updateSdkMcp() {
- // Check if SDK MCP servers need to be updated (new servers added or removed)
- const currentServerNames = new Set(Object.keys(sdkMcpConfigs))
- const connectedServerNames = new Set(sdkClients.map(c => c.name))
- // Check if there are any differences (additions or removals)
- const hasNewServers = Array.from(currentServerNames).some(
- name => !connectedServerNames.has(name),
- )
- const hasRemovedServers = Array.from(connectedServerNames).some(
- name => !currentServerNames.has(name),
- )
- // Check if any SDK clients are pending and need to be upgraded
- const hasPendingSdkClients = sdkClients.some(c => c.type === 'pending')
- // Check if any SDK clients failed their handshake and need to be retried.
- // Without this, a client that lands in 'failed' (e.g. handshake timeout on
- // a WS reconnect race) stays failed forever — its name satisfies the
- // connectedServerNames diff but it contributes zero tools.
- const hasFailedSdkClients = sdkClients.some(c => c.type === 'failed')
- const haveServersChanged =
- hasNewServers ||
- hasRemovedServers ||
- hasPendingSdkClients ||
- hasFailedSdkClients
- if (haveServersChanged) {
- // Clean up removed servers
- for (const client of sdkClients) {
- if (!currentServerNames.has(client.name)) {
- if (client.type === 'connected') {
- await client.cleanup()
- }
- }
- }
- // Re-initialize all SDK MCP servers with current config
- const sdkSetup = await setupSdkMcpClients(
- sdkMcpConfigs,
- (serverName, message) =>
- structuredIO.sendMcpMessage(serverName, message),
- )
- sdkClients = sdkSetup.clients
- sdkTools = sdkSetup.tools
- // Store SDK MCP tools in appState so subagents can access them via
- // assembleToolPool. Only tools are stored here — SDK clients are already
- // merged separately in the query loop (allMcpClients) and mcp_status handler.
- // Use both old (connectedServerNames) and new (currentServerNames) to remove
- // stale SDK tools when servers are added or removed.
- const allSdkNames = uniq([...connectedServerNames, ...currentServerNames])
- setAppState(prev => ({
- ...prev,
- mcp: {
- ...prev.mcp,
- tools: [
- ...prev.mcp.tools.filter(
- t =>
- !allSdkNames.some(name =>
- t.name.startsWith(getMcpPrefix(name)),
- ),
- ),
- ...sdkTools,
- ],
- },
- }))
- // Set up the special internal VSCode MCP server if necessary.
- setupVscodeSdkMcp(sdkClients)
- }
- }
- void updateSdkMcp()
- // State for dynamically added MCP servers (via mcp_set_servers control message)
- // These are separate from SDK MCP servers and support all transport types
- let dynamicMcpState: DynamicMcpState = {
- clients: [],
- tools: [],
- configs: {},
- }
- // Shared tool assembly for ask() and the get_context_usage control request.
- // Closes over the mutable sdkTools/dynamicMcpState bindings so both call
- // sites see late-connecting servers.
- const buildAllTools = (appState: AppState): Tools => {
- const assembledTools = assembleToolPool(
- appState.toolPermissionContext,
- appState.mcp.tools,
- )
- let allTools = uniqBy(
- mergeAndFilterTools(
- [...tools, ...sdkTools, ...dynamicMcpState.tools],
- assembledTools,
- appState.toolPermissionContext.mode,
- ),
- 'name',
- )
- if (options.permissionPromptToolName) {
- allTools = allTools.filter(
- tool => !toolMatchesName(tool, options.permissionPromptToolName!),
- )
- }
- const initJsonSchema = getInitJsonSchema()
- if (initJsonSchema && !options.jsonSchema) {
- const syntheticOutputResult = createSyntheticOutputTool(initJsonSchema)
- if ('tool' in syntheticOutputResult) {
- allTools = [...allTools, syntheticOutputResult.tool]
- }
- }
- return allTools
- }
- // Bridge handle for remote-control (SDK control message).
- // Mirrors the REPL's useReplBridge hook: the handle is created when
- // `remote_control` is enabled and torn down when disabled.
- let bridgeHandle: ReplBridgeHandle | null = null
- // Cursor into mutableMessages — tracks how far we've forwarded.
- // Same index-based diff as useReplBridge's lastWrittenIndexRef.
- let bridgeLastForwardedIndex = 0
- // Forward new messages from mutableMessages to the bridge.
- // Called incrementally during each turn (so claude.ai sees progress
- // and stays alive during permission waits) and again after the turn.
- //
- // writeMessages has its own UUID-based dedup (initialMessageUUIDs,
- // recentPostedUUIDs) — the index cursor here is a pre-filter to avoid
- // O(n) re-scanning of already-sent messages on every call.
- function forwardMessagesToBridge(): void {
- if (!bridgeHandle) return
- // Guard against mutableMessages shrinking (compaction truncates it).
- const startIndex = Math.min(
- bridgeLastForwardedIndex,
- mutableMessages.length,
- )
- const newMessages = mutableMessages
- .slice(startIndex)
- .filter(m => m.type === 'user' || m.type === 'assistant')
- bridgeLastForwardedIndex = mutableMessages.length
- if (newMessages.length > 0) {
- bridgeHandle.writeMessages(newMessages)
- }
- }
- // Helper to apply MCP server changes - used by both mcp_set_servers control message
- // and background plugin installation.
- // NOTE: Nested function required - mutates closure state (sdkMcpConfigs, sdkClients, etc.)
- let mcpChangesPromise: Promise<{
- response: SDKControlMcpSetServersResponse
- sdkServersChanged: boolean
- }> = Promise.resolve({
- response: {
- added: [] as string[],
- removed: [] as string[],
- errors: {} as Record<string, string>,
- },
- sdkServersChanged: false,
- })
- function applyMcpServerChanges(
- servers: Record<string, McpServerConfigForProcessTransport>,
- ): Promise<{
- response: SDKControlMcpSetServersResponse
- sdkServersChanged: boolean
- }> {
- // Serialize calls to prevent race conditions between concurrent callers
- // (background plugin install and mcp_set_servers control messages)
- const doWork = async (): Promise<{
- response: SDKControlMcpSetServersResponse
- sdkServersChanged: boolean
- }> => {
- const oldSdkClientNames = new Set(sdkClients.map(c => c.name))
- const result = await handleMcpSetServers(
- servers,
- { configs: sdkMcpConfigs, clients: sdkClients, tools: sdkTools },
- dynamicMcpState,
- setAppState,
- )
- // Update SDK state (need to mutate sdkMcpConfigs since it's shared)
- for (const key of Object.keys(sdkMcpConfigs)) {
- delete sdkMcpConfigs[key]
- }
- Object.assign(sdkMcpConfigs, result.newSdkState.configs)
- sdkClients = result.newSdkState.clients
- sdkTools = result.newSdkState.tools
- dynamicMcpState = result.newDynamicState
- // Keep appState.mcp.tools in sync so subagents can see SDK MCP tools.
- // Use both old and new SDK client names to remove stale tools.
- if (result.sdkServersChanged) {
- const newSdkClientNames = new Set(sdkClients.map(c => c.name))
- const allSdkNames = uniq([...oldSdkClientNames, ...newSdkClientNames])
- setAppState(prev => ({
- ...prev,
- mcp: {
- ...prev.mcp,
- tools: [
- ...prev.mcp.tools.filter(
- t =>
- !allSdkNames.some(name =>
- t.name.startsWith(getMcpPrefix(name)),
- ),
- ),
- ...sdkTools,
- ],
- },
- }))
- }
- return {
- response: result.response,
- sdkServersChanged: result.sdkServersChanged,
- }
- }
- mcpChangesPromise = mcpChangesPromise.then(doWork, doWork)
- return mcpChangesPromise
- }
- // Build McpServerStatus[] for control responses. Shared by mcp_status and
- // reload_plugins handlers. Reads closure state: sdkClients, dynamicMcpState.
- function buildMcpServerStatuses(): McpServerStatus[] {
- const currentAppState = getAppState()
- const currentMcpClients = currentAppState.mcp.clients
- const allMcpTools = uniqBy(
- [...currentAppState.mcp.tools, ...dynamicMcpState.tools],
- 'name',
- )
- const existingNames = new Set([
- ...currentMcpClients.map(c => c.name),
- ...sdkClients.map(c => c.name),
- ])
- return [
- ...currentMcpClients,
- ...sdkClients,
- ...dynamicMcpState.clients.filter(c => !existingNames.has(c.name)),
- ].map(connection => {
- let config
- if (
- connection.config.type === 'sse' ||
- connection.config.type === 'http'
- ) {
- config = {
- type: connection.config.type,
- url: connection.config.url,
- headers: connection.config.headers,
- oauth: connection.config.oauth,
- }
- } else if (connection.config.type === 'claudeai-proxy') {
- config = {
- type: 'claudeai-proxy' as const,
- url: connection.config.url,
- id: connection.config.id,
- }
- } else if (
- connection.config.type === 'stdio' ||
- connection.config.type === undefined
- ) {
- const stdioConfig = connection.config as { command: string; args: string[] }
- config = {
- type: 'stdio' as const,
- command: stdioConfig.command,
- args: stdioConfig.args,
- }
- }
- const serverTools =
- connection.type === 'connected'
- ? filterToolsByServer(allMcpTools, connection.name).map(tool => ({
- name: tool.mcpInfo?.toolName ?? tool.name,
- annotations: {
- readOnly: tool.isReadOnly({}) || undefined,
- destructive: tool.isDestructive?.({}) || undefined,
- openWorld: tool.isOpenWorld?.({}) || undefined,
- },
- }))
- : undefined
- // Capabilities passthrough with allowlist pre-filter. The IDE reads
- // experimental['claude/channel'] to decide whether to show the
- // Enable-channel prompt — only echo it if channel_enable would
- // actually pass the allowlist. Not a security boundary (the
- // handler re-runs the full gate); just avoids dead buttons.
- let capabilities: { experimental?: Record<string, unknown> } | undefined
- if (
- (feature('KAIROS') || feature('KAIROS_CHANNELS')) &&
- connection.type === 'connected' &&
- connection.capabilities.experimental
- ) {
- const exp = { ...connection.capabilities.experimental }
- if (
- exp['claude/channel'] &&
- (!isChannelsEnabled() ||
- !isChannelAllowlisted(connection.config.pluginSource))
- ) {
- delete exp['claude/channel']
- }
- if (Object.keys(exp).length > 0) {
- capabilities = { experimental: exp }
- }
- }
- return {
- name: connection.name,
- status: connection.type as McpServerStatus['status'],
- serverInfo:
- connection.type === 'connected' ? connection.serverInfo : undefined,
- error: connection.type === 'failed' ? connection.error : undefined,
- config,
- scope: connection.config.scope,
- tools: serverTools,
- capabilities,
- }
- }) as McpServerStatus[]
- }
- // NOTE: Nested function required - needs closure access to applyMcpServerChanges and updateSdkMcp
- async function installPluginsAndApplyMcpInBackground(): Promise<void> {
- try {
- // Join point for user settings (fired at runHeadless entry) and managed
- // settings (fired in main.tsx preAction). downloadUserSettings() caches
- // its promise so this awaits the same in-flight request.
- await Promise.all([
- feature('DOWNLOAD_USER_SETTINGS') &&
- (isEnvTruthy(process.env.CLAUDE_CODE_REMOTE) || getIsRemoteMode())
- ? withDiagnosticsTiming('headless_user_settings_download', () =>
- downloadUserSettings(),
- )
- : Promise.resolve(),
- withDiagnosticsTiming('headless_managed_settings_wait', () =>
- waitForRemoteManagedSettingsToLoad(),
- ),
- ])
- const pluginsInstalled = await installPluginsForHeadless()
- if (pluginsInstalled) {
- await applyPluginMcpDiff()
- }
- } catch (error) {
- logError(error)
- }
- }
- // Background plugin installation for all headless users
- // Installs marketplaces from extraKnownMarketplaces and missing enabled plugins
- // CLAUDE_CODE_SYNC_PLUGIN_INSTALL=true: resolved in run() before the first
- // query so plugins are guaranteed available on the first ask().
- let pluginInstallPromise: Promise<void> | null = null
- // --bare / SIMPLE: skip plugin install. Scripted calls don't add plugins
- // mid-session; the next interactive run reconciles.
- if (!isBareMode()) {
- if (isEnvTruthy(process.env.CLAUDE_CODE_SYNC_PLUGIN_INSTALL)) {
- pluginInstallPromise = installPluginsAndApplyMcpInBackground()
- } else {
- void installPluginsAndApplyMcpInBackground()
- }
- }
- // Idle timeout management
- const idleTimeout = createIdleTimeoutManager(() => !running)
- // Mutable commands and agents for hot reloading
- let currentCommands = commands
- let currentAgents = agents
- // Clear all plugin-related caches, reload commands/agents/hooks.
- // Called after CLAUDE_CODE_SYNC_PLUGIN_INSTALL completes (before first query)
- // and after non-sync background install finishes.
- // refreshActivePlugins calls clearAllCaches() which is required because
- // loadAllPlugins() may have run during main.tsx startup BEFORE managed
- // settings were fetched. Without clearing, getCommands() would rebuild
- // from a stale plugin list.
- async function refreshPluginState(): Promise<void> {
- // refreshActivePlugins handles the full cache sweep (clearAllCaches),
- // reloads all plugin component loaders, writes AppState.plugins +
- // AppState.agentDefinitions, registers hooks, and bumps mcp.pluginReconnectKey.
- const { agentDefinitions: freshAgentDefs } =
- await refreshActivePlugins(setAppState)
- // Headless-specific: currentCommands/currentAgents are local mutable refs
- // captured by the query loop (REPL uses AppState instead). getCommands is
- // fresh because refreshActivePlugins cleared its cache.
- currentCommands = await getCommands(cwd())
- // Preserve SDK-provided agents (--agents CLI flag or SDK initialize
- // control_request) — both inject via parseAgentsFromJson with
- // source='flagSettings'. loadMarkdownFilesForSubdir never assigns this
- // source, so it cleanly discriminates "injected, not disk-loadable".
- //
- // The previous filter used a negative set-diff (!freshAgentTypes.has(a))
- // which also matched plugin agents that were in the poisoned initial
- // currentAgents but correctly excluded from freshAgentDefs after managed
- // settings applied — leaking policy-blocked agents into the init message.
- // See gh-23085: isBridgeEnabled() at Commander-definition time poisoned
- // the settings cache before setEligibility(true) ran.
- const sdkAgents = currentAgents.filter(a => a.source === 'flagSettings')
- currentAgents = [...freshAgentDefs.allAgents, ...sdkAgents]
- }
- // Re-diff MCP configs after plugin state changes. Filters to
- // process-transport-supported types and carries SDK-mode servers through
- // so applyMcpServerChanges' diff doesn't close their transports.
- // Nested: needs closure access to sdkMcpConfigs, applyMcpServerChanges,
- // updateSdkMcp.
- async function applyPluginMcpDiff(): Promise<void> {
- const { servers: newConfigs } = await getAllMcpConfigs()
- const supportedConfigs: Record<string, McpServerConfigForProcessTransport> =
- {}
- for (const [name, config] of Object.entries(newConfigs)) {
- const type = config.type
- if (
- type === undefined ||
- type === 'stdio' ||
- type === 'sse' ||
- type === 'http' ||
- type === 'sdk'
- ) {
- supportedConfigs[name] = config as McpServerConfigForProcessTransport
- }
- }
- for (const [name, config] of Object.entries(sdkMcpConfigs)) {
- if (config.type === 'sdk' && !(name in supportedConfigs)) {
- supportedConfigs[name] = config as unknown as McpServerConfigForProcessTransport
- }
- }
- const { response, sdkServersChanged } =
- await applyMcpServerChanges(supportedConfigs)
- if (sdkServersChanged) {
- void updateSdkMcp()
- }
- logForDebugging(
- `Headless MCP refresh: added=${response.added.length}, removed=${response.removed.length}`,
- )
- }
- // Subscribe to skill changes for hot reloading
- const unsubscribeSkillChanges = skillChangeDetector.subscribe(() => {
- clearCommandsCache()
- void getCommands(cwd()).then(newCommands => {
- currentCommands = newCommands
- })
- })
- // Proactive mode: schedule a tick to keep the model looping autonomously.
- // setTimeout(0) yields to the event loop so pending stdin messages
- // (interrupts, user messages) are processed before the tick fires.
- const scheduleProactiveTick =
- feature('PROACTIVE') || feature('KAIROS')
- ? () => {
- setTimeout(() => {
- if (
- !proactiveModule?.isProactiveActive() ||
- proactiveModule.isProactivePaused() ||
- inputClosed
- ) {
- return
- }
- const tickContent = `<${TICK_TAG}>${new Date().toLocaleTimeString()}</${TICK_TAG}>`
- enqueue({
- mode: 'prompt' as const,
- value: tickContent,
- uuid: randomUUID(),
- priority: 'later',
- isMeta: true,
- })
- void run()
- }, 0)
- }
- : undefined
- // Abort the current operation when a 'now' priority message arrives.
- subscribeToCommandQueue(() => {
- if (abortController && getCommandsByMaxPriority('now').length > 0) {
- abortController.abort('interrupt')
- }
- })
- const run = async () => {
- if (running) {
- return
- }
- running = true
- runPhase = undefined
- notifySessionStateChanged('running')
- idleTimeout.stop()
- headlessProfilerCheckpoint('run_entry')
- // TODO(custom-tool-refactor): Should move to the init message, like browser
- await updateSdkMcp()
- headlessProfilerCheckpoint('after_updateSdkMcp')
- // Resolve deferred plugin installation (CLAUDE_CODE_SYNC_PLUGIN_INSTALL).
- // The promise was started eagerly so installation overlaps with other init.
- // Awaiting here guarantees plugins are available before the first ask().
- // If CLAUDE_CODE_SYNC_PLUGIN_INSTALL_TIMEOUT_MS is set, races against that
- // deadline and proceeds without plugins on timeout (logging an error).
- if (pluginInstallPromise) {
- const timeoutMs = parseInt(
- process.env.CLAUDE_CODE_SYNC_PLUGIN_INSTALL_TIMEOUT_MS || '',
- 10,
- )
- if (timeoutMs > 0) {
- const timeout = sleep(timeoutMs).then(() => 'timeout' as const)
- const result = await Promise.race([pluginInstallPromise, timeout])
- if (result === 'timeout') {
- logError(
- new Error(
- `CLAUDE_CODE_SYNC_PLUGIN_INSTALL: plugin installation timed out after ${timeoutMs}ms`,
- ),
- )
- logEvent('tengu_sync_plugin_install_timeout', {
- timeout_ms: timeoutMs,
- })
- }
- } else {
- await pluginInstallPromise
- }
- pluginInstallPromise = null
- // Refresh commands, agents, and hooks now that plugins are installed
- await refreshPluginState()
- // Set up hot-reload for plugin hooks now that the initial install is done.
- // In sync-install mode, setup.ts skips this to avoid racing with the install.
- const { setupPluginHookHotReload } = await import(
- '../utils/plugins/loadPluginHooks.js'
- )
- setupPluginHookHotReload()
- }
- // Only main-thread commands (agentId===undefined) — subagent
- // notifications are drained by the subagent's mid-turn gate in query.ts.
- // Defined outside the try block so it's accessible in the post-finally
- // queue re-checks at the bottom of run().
- const isMainThread = (cmd: QueuedCommand) => cmd.agentId === undefined
- try {
- let command: QueuedCommand | undefined
- let waitingForAgents = false
- // Extract command processing into a named function for the do-while pattern.
- // Drains the queue, batching consecutive prompt-mode commands into one
- // ask() call so messages that queued up during a long turn coalesce
- // into a single follow-up turn instead of N separate turns.
- const drainCommandQueue = async () => {
- while ((command = dequeue(isMainThread))) {
- if (
- command.mode !== 'prompt' &&
- command.mode !== 'orphaned-permission' &&
- command.mode !== 'task-notification'
- ) {
- throw new Error(
- 'only prompt commands are supported in streaming mode',
- )
- }
- // Non-prompt commands (task-notification, orphaned-permission) carry
- // side effects or orphanedPermission state, so they process singly.
- // Prompt commands greedily collect followers with matching workload.
- const batch: QueuedCommand[] = [command]
- if (command.mode === 'prompt') {
- while (canBatchWith(command, peek(isMainThread))) {
- batch.push(dequeue(isMainThread)!)
- }
- if (batch.length > 1) {
- command = {
- ...command,
- value: joinPromptValues(batch.map(c => c.value)),
- uuid: batch.findLast(c => c.uuid)?.uuid ?? command.uuid,
- }
- }
- }
- const batchUuids = batch.map(c => c.uuid).filter(u => u !== undefined)
- // QueryEngine will emit a replay for command.uuid (the last uuid in
- // the batch) via its messagesToAck path. Emit replays here for the
- // rest so consumers that track per-uuid delivery (clank's
- // asyncMessages footer, CCR) see an ack for every message they sent,
- // not just the one that survived the merge.
- if (options.replayUserMessages && batch.length > 1) {
- for (const c of batch) {
- if (c.uuid && c.uuid !== command.uuid) {
- output.enqueue({
- type: 'user',
- content: c.value,
- message: { role: 'user', content: c.value },
- session_id: getSessionId(),
- parent_tool_use_id: null,
- uuid: c.uuid as string,
- isReplay: true,
- } as SDKUserMessageReplay)
- }
- }
- }
- // Combine all MCP clients. appState.mcp is populated incrementally
- // per-server by main.tsx (mirrors useManageMCPConnections). Reading
- // fresh per-command means late-connecting servers are visible on the
- // next turn. registerElicitationHandlers is idempotent (tracking set).
- const appState = getAppState()
- const allMcpClients = [
- ...appState.mcp.clients,
- ...sdkClients,
- ...dynamicMcpState.clients,
- ]
- registerElicitationHandlers(allMcpClients)
- // Channel handlers for servers allowlisted via --channels at
- // construction time (or enableChannel() mid-session). Runs every
- // turn like registerElicitationHandlers — idempotent per-client
- // (setNotificationHandler replaces, not stacks) and no-ops for
- // non-allowlisted servers (one feature-flag check).
- for (const client of allMcpClients) {
- reregisterChannelHandlerAfterReconnect(client)
- }
- const allTools = buildAllTools(appState)
- for (const uuid of batchUuids) {
- notifyCommandLifecycle(uuid, 'started')
- }
- // Task notifications arrive when background agents complete.
- // Emit an SDK system event for SDK consumers, then fall through
- // to ask() so the model sees the agent result and can act on it.
- // This matches TUI behavior where useQueueProcessor always feeds
- // notifications to the model regardless of coordinator mode.
- if (command.mode === 'task-notification') {
- const notificationText =
- typeof command.value === 'string' ? command.value : ''
- // Parse the XML-formatted notification
- const taskIdMatch = notificationText.match(
- /<task-id>([^<]+)<\/task-id>/,
- )
- const toolUseIdMatch = notificationText.match(
- /<tool-use-id>([^<]+)<\/tool-use-id>/,
- )
- const outputFileMatch = notificationText.match(
- /<output-file>([^<]+)<\/output-file>/,
- )
- const statusMatch = notificationText.match(
- /<status>([^<]+)<\/status>/,
- )
- const summaryMatch = notificationText.match(
- /<summary>([^<]+)<\/summary>/,
- )
- const isValidStatus = (
- s: string | undefined,
- ): s is 'completed' | 'failed' | 'stopped' | 'killed' =>
- s === 'completed' ||
- s === 'failed' ||
- s === 'stopped' ||
- s === 'killed'
- const rawStatus = statusMatch?.[1]
- const status = isValidStatus(rawStatus)
- ? rawStatus === 'killed'
- ? 'stopped'
- : rawStatus
- : 'completed'
- const usageMatch = notificationText.match(
- /<usage>([\s\S]*?)<\/usage>/,
- )
- const usageContent = usageMatch?.[1] ?? ''
- const totalTokensMatch = usageContent.match(
- /<total_tokens>(\d+)<\/total_tokens>/,
- )
- const toolUsesMatch = usageContent.match(
- /<tool_uses>(\d+)<\/tool_uses>/,
- )
- const durationMsMatch = usageContent.match(
- /<duration_ms>(\d+)<\/duration_ms>/,
- )
- // Only emit a task_notification SDK event when a <status> tag is
- // present — that means this is a terminal notification (completed/
- // failed/stopped). Stream events from enqueueStreamEvent carry no
- // <status> (they're progress pings); emitting them here would
- // default to 'completed' and falsely close the task for SDK
- // consumers. Terminal bookends are now emitted directly via
- // emitTaskTerminatedSdk, so skipping statusless events is safe.
- if (statusMatch) {
- output.enqueue({
- type: 'system',
- subtype: 'task_notification',
- task_id: taskIdMatch?.[1] ?? '',
- tool_use_id: toolUseIdMatch?.[1],
- status,
- output_file: outputFileMatch?.[1] ?? '',
- summary: summaryMatch?.[1] ?? '',
- usage:
- totalTokensMatch && toolUsesMatch
- ? {
- total_tokens: parseInt(totalTokensMatch[1]!, 10),
- tool_uses: parseInt(toolUsesMatch[1]!, 10),
- duration_ms: durationMsMatch
- ? parseInt(durationMsMatch[1]!, 10)
- : 0,
- }
- : undefined,
- session_id: getSessionId(),
- uuid: randomUUID(),
- })
- }
- // No continue -- fall through to ask() so the model processes the result
- }
- const input = command.value
- if (structuredIO instanceof RemoteIO && command.mode === 'prompt') {
- logEvent('tengu_bridge_message_received', {
- is_repl: false,
- })
- }
- // Abort any in-flight suggestion generation and track acceptance
- suggestionState.abortController?.abort()
- suggestionState.abortController = null
- suggestionState.pendingSuggestion = null
- suggestionState.pendingLastEmittedEntry = null
- if (suggestionState.lastEmitted) {
- if (command.mode === 'prompt') {
- // SDK user messages enqueue ContentBlockParam[], not a plain string
- const inputText =
- typeof input === 'string'
- ? input
- : (
- input.find(b => b.type === 'text') as
- | { type: 'text'; text: string }
- | undefined
- )?.text
- if (typeof inputText === 'string') {
- logSuggestionOutcome(
- suggestionState.lastEmitted.text,
- inputText,
- suggestionState.lastEmitted.emittedAt,
- suggestionState.lastEmitted.promptId,
- suggestionState.lastEmitted.generationRequestId,
- )
- }
- suggestionState.lastEmitted = null
- }
- }
- abortController = createAbortController()
- const turnStartTime = feature('FILE_PERSISTENCE')
- ? Date.now()
- : undefined
- headlessProfilerCheckpoint('before_ask')
- startQueryProfile()
- // Per-iteration ALS context so bg agents spawned inside ask()
- // inherit workload across their detached awaits. In-process cron
- // stamps cmd.workload; the SDK --workload flag is options.workload.
- // const-capture: TS loses `while ((command = dequeue()))` narrowing
- // inside the closure.
- const cmd = command
- await runWithWorkload(cmd.workload ?? options.workload, async () => {
- for await (const message of ask({
- commands: uniqBy(
- [...currentCommands, ...appState.mcp.commands],
- 'name',
- ),
- prompt: input,
- promptUuid: cmd.uuid,
- isMeta: cmd.isMeta,
- cwd: cwd(),
- tools: allTools,
- verbose: options.verbose,
- mcpClients: allMcpClients,
- thinkingConfig: options.thinkingConfig,
- maxTurns: options.maxTurns,
- maxBudgetUsd: options.maxBudgetUsd,
- taskBudget: options.taskBudget,
- canUseTool,
- userSpecifiedModel: activeUserSpecifiedModel,
- fallbackModel: options.fallbackModel,
- jsonSchema: getInitJsonSchema() ?? options.jsonSchema,
- mutableMessages,
- getReadFileCache: () =>
- pendingSeeds.size === 0
- ? readFileState
- : mergeFileStateCaches(readFileState, pendingSeeds),
- setReadFileCache: cache => {
- readFileState = cache
- for (const [path, seed] of pendingSeeds.entries()) {
- const existing = readFileState.get(path)
- if (!existing || seed.timestamp > existing.timestamp) {
- readFileState.set(path, seed)
- }
- }
- pendingSeeds.clear()
- },
- customSystemPrompt: options.systemPrompt,
- appendSystemPrompt: options.appendSystemPrompt,
- getAppState,
- setAppState,
- abortController,
- replayUserMessages: options.replayUserMessages,
- includePartialMessages: options.includePartialMessages,
- handleElicitation: (serverName, params, elicitSignal) =>
- structuredIO.handleElicitation(
- serverName,
- params.message,
- undefined,
- elicitSignal,
- params.mode,
- params.url,
- 'elicitationId' in params ? params.elicitationId : undefined,
- ),
- agents: currentAgents,
- orphanedPermission: cmd.orphanedPermission,
- setSDKStatus: status => {
- output.enqueue({
- type: 'system',
- subtype: 'status',
- status,
- session_id: getSessionId(),
- uuid: randomUUID(),
- })
- },
- })) {
- // Forward messages to bridge incrementally (mid-turn) so
- // claude.ai sees progress and the connection stays alive
- // while blocked on permission requests.
- forwardMessagesToBridge()
- if (message.type === 'result') {
- // Flush pending SDK events so they appear before result on the stream.
- for (const event of drainSdkEvents()) {
- output.enqueue(event)
- }
- // Hold-back: don't emit result while background agents are running
- const currentState = getAppState()
- if (
- getRunningTasks(currentState).some(
- t =>
- (t.type === 'local_agent' ||
- t.type === 'local_workflow') &&
- isBackgroundTask(t),
- )
- ) {
- heldBackResult = message
- } else {
- heldBackResult = null
- output.enqueue(message)
- }
- } else {
- // Flush SDK events (task_started, task_progress) so background
- // agent progress is streamed in real-time, not batched until result.
- for (const event of drainSdkEvents()) {
- output.enqueue(event)
- }
- output.enqueue(message)
- }
- }
- }) // end runWithWorkload
- for (const uuid of batchUuids) {
- notifyCommandLifecycle(uuid, 'completed')
- }
- // Forward messages to bridge after each turn
- forwardMessagesToBridge()
- bridgeHandle?.sendResult()
- if (feature('FILE_PERSISTENCE') && turnStartTime !== undefined) {
- void executeFilePersistence(
- { turnStartTime } as import('src/utils/filePersistence/types.js').TurnStartTime,
- abortController.signal,
- result => {
- output.enqueue({
- type: 'system' as const,
- subtype: 'files_persisted' as const,
- files: (result as any).persistedFiles,
- failed: (result as any).failedFiles,
- processed_at: new Date().toISOString(),
- uuid: randomUUID(),
- session_id: getSessionId(),
- })
- },
- )
- }
- // Generate and emit prompt suggestion for SDK consumers
- if (
- options.promptSuggestions &&
- !isEnvDefinedFalsy(process.env.CLAUDE_CODE_ENABLE_PROMPT_SUGGESTION)
- ) {
- // TS narrows suggestionState to never in the while loop body;
- // cast via unknown to reset narrowing.
- const state = suggestionState as unknown as typeof suggestionState
- state.abortController?.abort()
- const localAbort = new AbortController()
- suggestionState.abortController = localAbort
- const cacheSafeParams = getLastCacheSafeParams()
- if (!cacheSafeParams) {
- logSuggestionSuppressed(
- 'sdk_no_params',
- undefined,
- undefined,
- 'sdk',
- )
- } else {
- // Use a ref object so the IIFE's finally can compare against its own
- // promise without a self-reference (which upsets TypeScript's flow analysis).
- const ref: { promise: Promise<void> | null } = { promise: null }
- ref.promise = (async () => {
- try {
- const result = await tryGenerateSuggestion(
- localAbort,
- mutableMessages,
- getAppState,
- cacheSafeParams,
- 'sdk',
- )
- if (!result || localAbort.signal.aborted) return
- const suggestionMsg = {
- type: 'prompt_suggestion' as const,
- suggestion: result.suggestion,
- uuid: randomUUID(),
- session_id: getSessionId(),
- }
- const lastEmittedEntry = {
- text: result.suggestion,
- emittedAt: Date.now(),
- promptId: result.promptId,
- generationRequestId: result.generationRequestId,
- }
- // Defer emission if the result is being held for background agents,
- // so that prompt_suggestion always arrives after result.
- // Only set lastEmitted when the suggestion is actually delivered
- // to the consumer; deferred suggestions may be discarded before
- // delivery if a new command arrives first.
- if (heldBackResult) {
- suggestionState.pendingSuggestion = suggestionMsg
- suggestionState.pendingLastEmittedEntry = {
- text: lastEmittedEntry.text,
- promptId: lastEmittedEntry.promptId,
- generationRequestId: lastEmittedEntry.generationRequestId,
- }
- } else {
- suggestionState.lastEmitted = lastEmittedEntry
- output.enqueue(suggestionMsg)
- }
- } catch (error) {
- if (
- error instanceof Error &&
- (error.name === 'AbortError' ||
- error.name === 'APIUserAbortError')
- ) {
- logSuggestionSuppressed(
- 'aborted',
- undefined,
- undefined,
- 'sdk',
- )
- return
- }
- logError(toError(error))
- } finally {
- if (suggestionState.inflightPromise === ref.promise) {
- suggestionState.inflightPromise = null
- }
- }
- })()
- suggestionState.inflightPromise = ref.promise
- }
- }
- // Log headless profiler metrics for this turn and start next turn
- logHeadlessProfilerTurn()
- logQueryProfileReport()
- headlessProfilerStartTurn()
- }
- }
- // Use a do-while loop to drain commands and then wait for any
- // background agents that are still running. When agents complete,
- // their notifications are enqueued and the loop re-drains.
- do {
- // Drain SDK events (task_started, task_progress) before command queue
- // so progress events precede task_notification on the stream.
- for (const event of drainSdkEvents()) {
- output.enqueue(event)
- }
- runPhase = 'draining_commands'
- await drainCommandQueue()
- // Check for running background tasks before exiting.
- // Exclude in_process_teammate — teammates are long-lived by design
- // (status: 'running' for their whole lifetime, cleaned up by the
- // shutdown protocol, not by transitioning to 'completed'). Waiting
- // on them here loops forever (gh-30008). Same exclusion already
- // exists at useBackgroundTaskNavigation.ts:55 for the same reason;
- // L1839 above is already narrower (type === 'local_agent') so it
- // doesn't hit this.
- waitingForAgents = false
- {
- const state = getAppState()
- const hasRunningBg = getRunningTasks(state).some(
- t => isBackgroundTask(t) && t.type !== 'in_process_teammate',
- )
- const hasMainThreadQueued = peek(isMainThread) !== undefined
- if (hasRunningBg || hasMainThreadQueued) {
- waitingForAgents = true
- if (!hasMainThreadQueued) {
- runPhase = 'waiting_for_agents'
- // No commands ready yet, wait for tasks to complete
- await sleep(100)
- }
- // Loop back to drain any newly queued commands
- }
- }
- } while (waitingForAgents)
- if (heldBackResult) {
- output.enqueue(heldBackResult)
- heldBackResult = null
- if (suggestionState.pendingSuggestion) {
- output.enqueue(suggestionState.pendingSuggestion)
- // Now that the suggestion is actually delivered, record it for acceptance tracking
- if (suggestionState.pendingLastEmittedEntry) {
- suggestionState.lastEmitted = {
- ...suggestionState.pendingLastEmittedEntry,
- emittedAt: Date.now(),
- }
- suggestionState.pendingLastEmittedEntry = null
- }
- suggestionState.pendingSuggestion = null
- }
- }
- } catch (error) {
- // Emit error result message before shutting down
- // Write directly to structuredIO to ensure immediate delivery
- try {
- await structuredIO.write({
- type: 'result',
- subtype: 'error_during_execution',
- duration_ms: 0,
- duration_api_ms: 0,
- is_error: true,
- num_turns: 0,
- stop_reason: null,
- session_id: getSessionId(),
- total_cost_usd: 0,
- usage: EMPTY_USAGE,
- modelUsage: {},
- permission_denials: [],
- uuid: randomUUID(),
- errors: [
- errorMessage(error),
- ...getInMemoryErrors().map(_ => _.error),
- ],
- })
- } catch {
- // If we can't emit the error result, continue with shutdown anyway
- }
- suggestionState.abortController?.abort()
- gracefulShutdownSync(1)
- return
- } finally {
- runPhase = 'finally_flush'
- // Flush pending internal events before going idle
- await structuredIO.flushInternalEvents()
- runPhase = 'finally_post_flush'
- if (!isShuttingDown()) {
- notifySessionStateChanged('idle')
- // Drain so the idle session_state_changed SDK event (plus any
- // terminal task_notification bookends emitted during bg-agent
- // teardown) reach the output stream before we block on the next
- // command. The do-while drain above only runs while
- // waitingForAgents; once we're here the next drain would be the
- // top of the next run(), which won't come if input is idle.
- for (const event of drainSdkEvents()) {
- output.enqueue(event)
- }
- }
- running = false
- // Start idle timer when we finish processing and are waiting for input
- idleTimeout.start()
- }
- // Proactive tick: if proactive is active and queue is empty, inject a tick
- if (
- (feature('PROACTIVE') || feature('KAIROS')) &&
- proactiveModule?.isProactiveActive() &&
- !proactiveModule.isProactivePaused()
- ) {
- if (peek(isMainThread) === undefined && !inputClosed) {
- scheduleProactiveTick!()
- return
- }
- }
- // Re-check the queue after releasing the mutex. A message may have
- // arrived (and called run()) between the last dequeue() returning
- // undefined and `running = false` above. In that case the caller
- // saw `running === true` and returned immediately, leaving the
- // message stranded in the queue with no one to process it.
- if (peek(isMainThread) !== undefined) {
- void run()
- return
- }
- // Check for unread teammate messages and process them
- // This mirrors what useInboxPoller does in interactive REPL mode
- // Poll until no more messages (teammates may still be working)
- {
- const currentAppState = getAppState()
- const teamContext = currentAppState.teamContext
- if (teamContext && isTeamLead(teamContext)) {
- const agentName = 'team-lead'
- // Poll for messages while teammates are active
- // This is needed because teammates may send messages while we're waiting
- // Keep polling until the team is shut down
- const POLL_INTERVAL_MS = 500
- while (true) {
- // Check if teammates are still active
- const refreshedState = getAppState()
- const hasActiveTeammates =
- hasActiveInProcessTeammates(refreshedState) ||
- (refreshedState.teamContext &&
- Object.keys(refreshedState.teamContext.teammates).length > 0)
- if (!hasActiveTeammates) {
- logForDebugging(
- '[print.ts] No more active teammates, stopping poll',
- )
- break
- }
- const unread = await readUnreadMessages(
- agentName,
- refreshedState.teamContext?.teamName,
- )
- if (unread.length > 0) {
- logForDebugging(
- `[print.ts] Team-lead found ${unread.length} unread messages`,
- )
- // Mark as read immediately to avoid duplicate processing
- await markMessagesAsRead(
- agentName,
- refreshedState.teamContext?.teamName,
- )
- // Process shutdown_approved messages - remove teammates from team file
- // This mirrors what useInboxPoller does in interactive mode (lines 546-606)
- const teamName = refreshedState.teamContext?.teamName
- for (const m of unread) {
- const shutdownApproval = isShutdownApproved(m.text)
- if (shutdownApproval && teamName) {
- const teammateToRemove = shutdownApproval.from
- logForDebugging(
- `[print.ts] Processing shutdown_approved from ${teammateToRemove}`,
- )
- // Find the teammate ID by name
- const teammateId = refreshedState.teamContext?.teammates
- ? Object.entries(refreshedState.teamContext.teammates).find(
- ([, t]) => t.name === teammateToRemove,
- )?.[0]
- : undefined
- if (teammateId) {
- // Remove from team file
- removeTeammateFromTeamFile(teamName, {
- agentId: teammateId,
- name: teammateToRemove,
- })
- logForDebugging(
- `[print.ts] Removed ${teammateToRemove} from team file`,
- )
- // Unassign tasks owned by this teammate
- await unassignTeammateTasks(
- teamName,
- teammateId,
- teammateToRemove,
- 'shutdown',
- )
- // Remove from teamContext in AppState
- setAppState(prev => {
- if (!prev.teamContext?.teammates) return prev
- if (!(teammateId in prev.teamContext.teammates)) return prev
- const { [teammateId]: _, ...remainingTeammates } =
- prev.teamContext.teammates
- return {
- ...prev,
- teamContext: {
- ...prev.teamContext,
- teammates: remainingTeammates,
- },
- }
- })
- }
- }
- }
- // Format messages same as useInboxPoller
- const formatted = unread
- .map(
- (m: { from: string; text: string; color?: string }) =>
- `<${TEAMMATE_MESSAGE_TAG} teammate_id="${m.from}"${m.color ? ` color="${m.color}"` : ''}>\n${m.text}\n</${TEAMMATE_MESSAGE_TAG}>`,
- )
- .join('\n\n')
- // Enqueue and process
- enqueue({
- mode: 'prompt',
- value: formatted,
- uuid: randomUUID(),
- })
- void run()
- return // run() will come back here after processing
- }
- // No messages - check if we need to prompt for shutdown
- // If input is closed and teammates are active, inject shutdown prompt once
- if (inputClosed && !shutdownPromptInjected) {
- shutdownPromptInjected = true
- logForDebugging(
- '[print.ts] Input closed with active teammates, injecting shutdown prompt',
- )
- enqueue({
- mode: 'prompt',
- value: SHUTDOWN_TEAM_PROMPT,
- uuid: randomUUID(),
- })
- void run()
- return // run() will come back here after processing
- }
- // Wait and check again
- await sleep(POLL_INTERVAL_MS)
- }
- }
- }
- if (inputClosed) {
- // Check for active swarm that needs shutdown
- const hasActiveSwarm = await (async () => {
- // Wait for any working in-process team members to finish
- const currentAppState = getAppState()
- if (hasWorkingInProcessTeammates(currentAppState)) {
- await waitForTeammatesToBecomeIdle(setAppState, currentAppState)
- }
- // Re-fetch state after potential wait
- const refreshedAppState = getAppState()
- const refreshedTeamContext = refreshedAppState.teamContext
- const hasTeamMembersNotCleanedUp =
- refreshedTeamContext &&
- Object.keys(refreshedTeamContext.teammates).length > 0
- return (
- hasTeamMembersNotCleanedUp ||
- hasActiveInProcessTeammates(refreshedAppState)
- )
- })()
- if (hasActiveSwarm) {
- // Team members are idle or pane-based - inject prompt to shut down team
- enqueue({
- mode: 'prompt',
- value: SHUTDOWN_TEAM_PROMPT,
- uuid: randomUUID(),
- })
- void run()
- } else {
- // Wait for any in-flight push suggestion before closing the output stream.
- if (suggestionState.inflightPromise) {
- await Promise.race([suggestionState.inflightPromise, sleep(5000)])
- }
- suggestionState.abortController?.abort()
- suggestionState.abortController = null
- await finalizePendingAsyncHooks()
- unsubscribeSkillChanges()
- unsubscribeAuthStatus?.()
- statusListeners.delete(rateLimitListener)
- output.done()
- }
- }
- }
- // Set up UDS inbox callback so the query loop is kicked off
- // when a message arrives via the UDS socket in headless mode.
- if (feature('UDS_INBOX')) {
- /* eslint-disable @typescript-eslint/no-require-imports */
- const { setOnEnqueue } = require('../utils/udsMessaging.js')
- /* eslint-enable @typescript-eslint/no-require-imports */
- setOnEnqueue(() => {
- if (!inputClosed) {
- void run()
- }
- })
- }
- // Cron scheduler: runs scheduled_tasks.json tasks in SDK/-p mode.
- // Mirrors REPL's useScheduledTasks hook. Fired prompts enqueue + kick
- // off run() directly — unlike REPL, there's no queue subscriber here
- // that drains on enqueue while idle. The run() mutex makes this safe
- // during an active turn: the call no-ops and the post-run recheck at
- // the end of run() picks up the queued command.
- let cronScheduler: import('../utils/cronScheduler.js').CronScheduler | null =
- null
- if (
- cronGate.isKairosCronEnabled()
- ) {
- cronScheduler = cronSchedulerModule.createCronScheduler({
- onFire: prompt => {
- if (inputClosed) return
- enqueue({
- mode: 'prompt',
- value: prompt,
- uuid: randomUUID(),
- priority: 'later',
- // System-generated — matches useScheduledTasks.ts REPL equivalent.
- // Without this, messages.ts metaProp eval is {} → prompt leaks
- // into visible transcript when cron fires mid-turn in -p mode.
- isMeta: true,
- // Threaded to cc_workload= in the billing-header attribution block
- // so the API can serve cron requests at lower QoS. drainCommandQueue
- // reads this per-iteration and hoists it into bootstrap state for
- // the ask() call.
- workload: WORKLOAD_CRON,
- })
- void run()
- },
- isLoading: () => running || inputClosed,
- getJitterConfig: cronJitterConfigModule?.getCronJitterConfig,
- isKilled: () => !cronGate?.isKairosCronEnabled(),
- })
- cronScheduler.start()
- }
- const sendControlResponseSuccess = function (
- message: SDKControlRequest,
- response?: Record<string, unknown>,
- ) {
- output.enqueue({
- type: 'control_response',
- response: {
- subtype: 'success',
- request_id: message.request_id,
- response: response,
- },
- })
- }
- const sendControlResponseError = function (
- message: SDKControlRequest,
- errorMessage: string,
- ) {
- output.enqueue({
- type: 'control_response',
- response: {
- subtype: 'error',
- request_id: message.request_id,
- error: errorMessage,
- },
- })
- }
- // Handle unexpected permission responses by looking up the unresolved tool
- // call in the transcript and executing it
- const handledOrphanedToolUseIds = new Set<string>()
- structuredIO.setUnexpectedResponseCallback(async message => {
- await handleOrphanedPermissionResponse({
- message,
- setAppState,
- handledToolUseIds: handledOrphanedToolUseIds,
- onEnqueued: () => {
- // The first message of a session might be the orphaned permission
- // check rather than a user prompt, so kick off the loop.
- void run()
- },
- })
- })
- // Track active OAuth flows per server so we can abort a previous flow
- // when a new mcp_authenticate request arrives for the same server.
- const activeOAuthFlows = new Map<string, AbortController>()
- // Track manual callback URL submit functions for active OAuth flows.
- // Used when localhost is not reachable (e.g., browser-based IDEs).
- const oauthCallbackSubmitters = new Map<
- string,
- (callbackUrl: string) => void
- >()
- // Track servers where the manual callback was actually invoked (so the
- // automatic reconnect path knows to skip — the extension will reconnect).
- const oauthManualCallbackUsed = new Set<string>()
- // Track OAuth auth-only promises so mcp_oauth_callback_url can await
- // token exchange completion. Reconnect is handled separately by the
- // extension via handleAuthDone → mcp_reconnect.
- const oauthAuthPromises = new Map<string, Promise<void>>()
- // In-flight Anthropic OAuth flow (claude_authenticate). Single-slot: a
- // second authenticate request cleans up the first. The service holds the
- // PKCE verifier + localhost listener; the promise settles after
- // installOAuthTokens — after it resolves, the in-process memoized token
- // cache is already cleared and the next API call picks up the new creds.
- let claudeOAuth: {
- service: OAuthService
- flow: Promise<void>
- } | null = null
- // This is essentially spawning a parallel async task- we have two
- // running in parallel- one reading from stdin and adding to the
- // queue to be processed and another reading from the queue,
- // processing and returning the result of the generation.
- // The process is complete when the input stream completes and
- // the last generation of the queue has complete.
- void (async () => {
- let initialized = false
- logForDiagnosticsNoPII('info', 'cli_message_loop_started')
- for await (const message of structuredIO.structuredInput) {
- // Non-user events are handled inline (no queue). started→completed in
- // the same tick carries no information, so only fire completed.
- // control_response is reported by StructuredIO.processLine (which also
- // sees orphans that never yield here).
- const eventId = 'uuid' in message ? message.uuid : undefined
- if (
- eventId &&
- message.type !== 'user' &&
- message.type !== 'control_response'
- ) {
- notifyCommandLifecycle(eventId, 'completed')
- }
- if (message.type === 'control_request') {
- if (message.request.subtype === 'interrupt') {
- // Track escapes for attribution (ant-only feature)
- if (feature('COMMIT_ATTRIBUTION')) {
- setAppState(prev => ({
- ...prev,
- attribution: {
- ...prev.attribution,
- escapeCount: prev.attribution.escapeCount + 1,
- },
- }))
- }
- if (abortController) {
- abortController.abort()
- }
- suggestionState.abortController?.abort()
- suggestionState.abortController = null
- suggestionState.lastEmitted = null
- suggestionState.pendingSuggestion = null
- sendControlResponseSuccess(message)
- } else if (message.request.subtype === 'end_session') {
- logForDebugging(
- `[print.ts] end_session received, reason=${message.request.reason ?? 'unspecified'}`,
- )
- if (abortController) {
- abortController.abort()
- }
- suggestionState.abortController?.abort()
- suggestionState.abortController = null
- suggestionState.lastEmitted = null
- suggestionState.pendingSuggestion = null
- sendControlResponseSuccess(message)
- break // exits for-await → falls through to inputClosed=true drain below
- } else if (message.request.subtype === 'initialize') {
- // SDK MCP server names from the initialize message
- // Populated by both browser and ProcessTransport sessions
- if (
- message.request.sdkMcpServers &&
- message.request.sdkMcpServers.length > 0
- ) {
- for (const serverName of message.request.sdkMcpServers) {
- // Create placeholder config for SDK MCP servers
- // The actual server connection is managed by the SDK Query class
- sdkMcpConfigs[serverName] = {
- type: 'sdk',
- name: serverName,
- }
- }
- }
- await handleInitializeRequest(
- message.request,
- message.request_id,
- initialized,
- output,
- commands,
- modelInfos,
- structuredIO,
- !!options.enableAuthStatus,
- options,
- agents,
- getAppState,
- )
- // Enable prompt suggestions in AppState when SDK consumer opts in.
- // shouldEnablePromptSuggestion() returns false for non-interactive
- // sessions, but the SDK consumer explicitly requested suggestions.
- if (message.request.promptSuggestions) {
- setAppState(prev => {
- if (prev.promptSuggestionEnabled) return prev
- return { ...prev, promptSuggestionEnabled: true }
- })
- }
- if (
- message.request.agentProgressSummaries &&
- getFeatureValue_CACHED_MAY_BE_STALE('tengu_slate_prism', true)
- ) {
- setSdkAgentProgressSummariesEnabled(true)
- }
- initialized = true
- // If the auto-resume logic pre-enqueued a command, drain it now
- // that initialize has set up systemPrompt, agents, hooks, etc.
- if (hasCommandsInQueue()) {
- void run()
- }
- } else if (message.request.subtype === 'set_permission_mode') {
- const m = message.request // for typescript (TODO: use readonly types to avoid this)
- setAppState(prev => ({
- ...prev,
- toolPermissionContext: handleSetPermissionMode(
- m,
- message.request_id,
- prev.toolPermissionContext,
- output,
- ),
- isUltraplanMode: m.ultraplan ?? prev.isUltraplanMode,
- }))
- // handleSetPermissionMode sends the control_response; the
- // notifySessionMetadataChanged that used to follow here is
- // now fired by onChangeAppState (with externalized mode name).
- } else if (message.request.subtype === 'set_model') {
- const requestedModel = message.request.model ?? 'default'
- const model =
- requestedModel === 'default'
- ? getDefaultMainLoopModel()
- : requestedModel
- activeUserSpecifiedModel = model
- setMainLoopModelOverride(model)
- notifySessionMetadataChanged({ model })
- injectModelSwitchBreadcrumbs(requestedModel, model)
- sendControlResponseSuccess(message)
- } else if (message.request.subtype === 'set_max_thinking_tokens') {
- if (message.request.max_thinking_tokens === null) {
- options.thinkingConfig = undefined
- } else if (message.request.max_thinking_tokens === 0) {
- options.thinkingConfig = { type: 'disabled' }
- } else {
- options.thinkingConfig = {
- type: 'enabled',
- budgetTokens: message.request.max_thinking_tokens,
- }
- }
- sendControlResponseSuccess(message)
- } else if (message.request.subtype === 'mcp_status') {
- sendControlResponseSuccess(message, {
- mcpServers: buildMcpServerStatuses(),
- })
- } else if (message.request.subtype === 'get_context_usage') {
- try {
- const appState = getAppState()
- const data = await collectContextData({
- messages: mutableMessages,
- getAppState,
- options: {
- mainLoopModel: getMainLoopModel(),
- tools: buildAllTools(appState),
- agentDefinitions: appState.agentDefinitions,
- customSystemPrompt: options.systemPrompt,
- appendSystemPrompt: options.appendSystemPrompt,
- },
- })
- sendControlResponseSuccess(message, { ...data })
- } catch (error) {
- sendControlResponseError(message, errorMessage(error))
- }
- } else if (message.request.subtype === 'mcp_message') {
- // Handle MCP notifications from SDK servers
- const mcpRequest = message.request
- const sdkClient = sdkClients.find(
- client => client.name === mcpRequest.server_name,
- )
- // Check client exists - dynamically added SDK servers may have
- // placeholder clients with null client until updateSdkMcp() runs
- if (
- sdkClient &&
- sdkClient.type === 'connected' &&
- sdkClient.client?.transport?.onmessage
- ) {
- sdkClient.client.transport.onmessage(mcpRequest.message)
- }
- sendControlResponseSuccess(message)
- } else if (message.request.subtype === 'rewind_files') {
- const appState = getAppState()
- const result = await handleRewindFiles(
- message.request.user_message_id as UUID,
- appState,
- setAppState,
- message.request.dry_run ?? false,
- )
- if (result.canRewind || message.request.dry_run) {
- sendControlResponseSuccess(message, result)
- } else {
- sendControlResponseError(
- message,
- (result.error as string) ?? 'Unexpected error',
- )
- }
- } else if (message.request.subtype === 'cancel_async_message') {
- const targetUuid = message.request.message_uuid
- const removed = dequeueAllMatching(cmd => cmd.uuid === targetUuid)
- sendControlResponseSuccess(message, {
- cancelled: removed.length > 0,
- })
- } else if (message.request.subtype === 'seed_read_state') {
- // Client observed a Read that was later removed from context (e.g.
- // by snip), so transcript-based seeding missed it. Queued into
- // pendingSeeds; applied at the next clone-replace boundary.
- try {
- // expandPath: all other readFileState writers normalize (~, relative,
- // session cwd vs process cwd). FileEditTool looks up by expandPath'd
- // key — a verbatim client path would miss.
- const normalizedPath = expandPath(message.request.path)
- // Check disk mtime before reading content. If the file changed
- // since the client's observation, readFile would return C_current
- // but we'd store it with the client's M_observed — getChangedFiles
- // then sees disk > cache.timestamp, re-reads, diffs C_current vs
- // C_current = empty, emits no attachment, and the model is never
- // told about the C_observed → C_current change. Skipping the seed
- // makes Edit fail "file not read yet" → forces a fresh Read.
- // Math.floor matches FileReadTool and getFileModificationTime.
- const diskMtime = Math.floor((await stat(normalizedPath)).mtimeMs)
- if (diskMtime <= message.request.mtime) {
- const raw = await readFile(normalizedPath, 'utf-8')
- // Strip BOM + normalize CRLF→LF to match readFileInRange and
- // readFileSyncWithMetadata. FileEditTool's content-compare
- // fallback (for Windows mtime bumps without content change)
- // compares against LF-normalized disk reads.
- const content = (
- raw.charCodeAt(0) === 0xfeff ? raw.slice(1) : raw
- ).replaceAll('\r\n', '\n')
- pendingSeeds.set(normalizedPath, {
- content,
- timestamp: diskMtime,
- offset: undefined,
- limit: undefined,
- })
- }
- } catch {
- // ENOENT etc — skip seeding but still succeed
- }
- sendControlResponseSuccess(message)
- } else if (message.request.subtype === 'mcp_set_servers') {
- const { response, sdkServersChanged } = await applyMcpServerChanges(
- message.request.servers,
- )
- sendControlResponseSuccess(message, response)
- // Connect SDK servers AFTER response to avoid deadlock
- if (sdkServersChanged) {
- void updateSdkMcp()
- }
- } else if (message.request.subtype === 'reload_plugins') {
- try {
- if (
- feature('DOWNLOAD_USER_SETTINGS') &&
- (isEnvTruthy(process.env.CLAUDE_CODE_REMOTE) || getIsRemoteMode())
- ) {
- // Re-pull user settings so enabledPlugins pushed from the
- // user's local CLI take effect before the cache sweep.
- const applied = await redownloadUserSettings()
- if (applied) {
- settingsChangeDetector.notifyChange('userSettings')
- }
- }
- const r = await refreshActivePlugins(setAppState)
- const sdkAgents = currentAgents.filter(
- a => a.source === 'flagSettings',
- )
- currentAgents = [...r.agentDefinitions.allAgents, ...sdkAgents]
- // Reload succeeded — gather response data best-effort so a
- // read failure doesn't mask the successful state change.
- // allSettled so one failure doesn't discard the others.
- let plugins: SDKControlReloadPluginsResponse['plugins'] = []
- const [cmdsR, mcpR, pluginsR] = await Promise.allSettled([
- getCommands(cwd()),
- applyPluginMcpDiff(),
- loadAllPluginsCacheOnly(),
- ])
- if (cmdsR.status === 'fulfilled') {
- currentCommands = cmdsR.value
- } else {
- logError(cmdsR.reason)
- }
- if (mcpR.status === 'rejected') {
- logError(mcpR.reason)
- }
- if (pluginsR.status === 'fulfilled') {
- plugins = pluginsR.value.enabled.map(p => ({
- name: p.name,
- path: p.path,
- source: p.source,
- }))
- } else {
- logError(pluginsR.reason)
- }
- sendControlResponseSuccess(message, {
- commands: currentCommands
- .filter(cmd => cmd.userInvocable !== false)
- .map(cmd => ({
- name: getCommandName(cmd),
- description: formatDescriptionWithSource(cmd),
- argumentHint: cmd.argumentHint || '',
- })),
- agents: currentAgents.map(a => ({
- name: a.agentType,
- description: a.whenToUse,
- model: a.model === 'inherit' ? undefined : a.model,
- })),
- plugins,
- mcpServers: buildMcpServerStatuses(),
- error_count: r.error_count,
- } satisfies SDKControlReloadPluginsResponse)
- } catch (error) {
- sendControlResponseError(message, errorMessage(error))
- }
- } else if (message.request.subtype === 'mcp_reconnect') {
- const currentAppState = getAppState()
- const { serverName } = message.request
- elicitationRegistered.delete(serverName)
- // Config-existence gate must cover the SAME sources as the
- // operations below. SDK-injected servers (query({mcpServers:{...}}))
- // and dynamically-added servers were missing here, so
- // toggleMcpServer/reconnect returned "Server not found" even though
- // the disconnect/reconnect would have worked (gh-31339 / CC-314).
- const config =
- getMcpConfigByName(serverName) ??
- mcpClients.find(c => c.name === serverName)?.config ??
- sdkClients.find(c => c.name === serverName)?.config ??
- dynamicMcpState.clients.find(c => c.name === serverName)?.config ??
- currentAppState.mcp.clients.find(c => c.name === serverName)
- ?.config ??
- null
- if (!config) {
- sendControlResponseError(message, `Server not found: ${serverName}`)
- } else {
- const result = await reconnectMcpServerImpl(serverName, config)
- // Update appState.mcp with the new client, tools, commands, and resources
- const prefix = getMcpPrefix(serverName)
- setAppState(prev => ({
- ...prev,
- mcp: {
- ...prev.mcp,
- clients: prev.mcp.clients.map(c =>
- c.name === serverName ? result.client : c,
- ),
- tools: [
- ...reject(prev.mcp.tools, t => t.name?.startsWith(prefix)),
- ...result.tools,
- ],
- commands: [
- ...reject(prev.mcp.commands, c =>
- commandBelongsToServer(c, serverName),
- ),
- ...result.commands,
- ],
- resources:
- result.resources && result.resources.length > 0
- ? { ...prev.mcp.resources, [serverName]: result.resources }
- : omit(prev.mcp.resources, serverName),
- },
- }))
- // Also update dynamicMcpState so run() picks up the new tools
- // on the next turn (run() reads dynamicMcpState, not appState)
- dynamicMcpState = {
- ...dynamicMcpState,
- clients: [
- ...dynamicMcpState.clients.filter(c => c.name !== serverName),
- result.client,
- ],
- tools: [
- ...dynamicMcpState.tools.filter(
- t => !t.name?.startsWith(prefix),
- ),
- ...result.tools,
- ],
- }
- if (result.client.type === 'connected') {
- registerElicitationHandlers([result.client])
- reregisterChannelHandlerAfterReconnect(result.client)
- sendControlResponseSuccess(message)
- } else {
- const errorMessage =
- result.client.type === 'failed'
- ? (result.client.error ?? 'Connection failed')
- : `Server status: ${result.client.type}`
- sendControlResponseError(message, errorMessage)
- }
- }
- } else if (message.request.subtype === 'mcp_toggle') {
- const currentAppState = getAppState()
- const { serverName, enabled } = message.request
- elicitationRegistered.delete(serverName)
- // Gate must match the client-lookup spread below (which
- // includes sdkClients and dynamicMcpState.clients). Same fix as
- // mcp_reconnect above (gh-31339 / CC-314).
- const config =
- getMcpConfigByName(serverName) ??
- mcpClients.find(c => c.name === serverName)?.config ??
- sdkClients.find(c => c.name === serverName)?.config ??
- dynamicMcpState.clients.find(c => c.name === serverName)?.config ??
- currentAppState.mcp.clients.find(c => c.name === serverName)
- ?.config ??
- null
- if (!config) {
- sendControlResponseError(message, `Server not found: ${serverName}`)
- } else if (!enabled) {
- // Disabling: persist + disconnect (matches TUI toggleMcpServer behavior)
- setMcpServerEnabled(serverName, false)
- const client = [
- ...mcpClients,
- ...sdkClients,
- ...dynamicMcpState.clients,
- ...currentAppState.mcp.clients,
- ].find(c => c.name === serverName)
- if (client && client.type === 'connected') {
- await clearServerCache(serverName, config)
- }
- // Update appState.mcp to reflect disabled status and remove tools/commands/resources
- const prefix = getMcpPrefix(serverName)
- setAppState(prev => ({
- ...prev,
- mcp: {
- ...prev.mcp,
- clients: prev.mcp.clients.map(c =>
- c.name === serverName
- ? { name: serverName, type: 'disabled' as const, config }
- : c,
- ),
- tools: reject(prev.mcp.tools, t => t.name?.startsWith(prefix)),
- commands: reject(prev.mcp.commands, c =>
- commandBelongsToServer(c, serverName),
- ),
- resources: omit(prev.mcp.resources, serverName),
- },
- }))
- sendControlResponseSuccess(message)
- } else {
- // Enabling: persist + reconnect
- setMcpServerEnabled(serverName, true)
- const result = await reconnectMcpServerImpl(serverName, config)
- // Update appState.mcp with the new client, tools, commands, and resources
- // This ensures the LLM sees updated tools after enabling the server
- const prefix = getMcpPrefix(serverName)
- setAppState(prev => ({
- ...prev,
- mcp: {
- ...prev.mcp,
- clients: prev.mcp.clients.map(c =>
- c.name === serverName ? result.client : c,
- ),
- tools: [
- ...reject(prev.mcp.tools, t => t.name?.startsWith(prefix)),
- ...result.tools,
- ],
- commands: [
- ...reject(prev.mcp.commands, c =>
- commandBelongsToServer(c, serverName),
- ),
- ...result.commands,
- ],
- resources:
- result.resources && result.resources.length > 0
- ? { ...prev.mcp.resources, [serverName]: result.resources }
- : omit(prev.mcp.resources, serverName),
- },
- }))
- if (result.client.type === 'connected') {
- registerElicitationHandlers([result.client])
- reregisterChannelHandlerAfterReconnect(result.client)
- sendControlResponseSuccess(message)
- } else {
- const errorMessage =
- result.client.type === 'failed'
- ? (result.client.error ?? 'Connection failed')
- : `Server status: ${result.client.type}`
- sendControlResponseError(message, errorMessage)
- }
- }
- } else if (message.request.subtype === 'channel_enable') {
- const currentAppState = getAppState()
- handleChannelEnable(
- message.request_id,
- message.request.serverName,
- // Pool spread matches mcp_status — all three client sources.
- [
- ...currentAppState.mcp.clients,
- ...sdkClients,
- ...dynamicMcpState.clients,
- ],
- output,
- )
- } else if (message.request.subtype === 'mcp_authenticate') {
- const { serverName } = message.request
- const currentAppState = getAppState()
- const config =
- getMcpConfigByName(serverName) ??
- mcpClients.find(c => c.name === serverName)?.config ??
- currentAppState.mcp.clients.find(c => c.name === serverName)
- ?.config ??
- null
- if (!config) {
- sendControlResponseError(message, `Server not found: ${serverName}`)
- } else if (config.type !== 'sse' && config.type !== 'http') {
- sendControlResponseError(
- message,
- `Server type "${config.type}" does not support OAuth authentication`,
- )
- } else {
- try {
- // Abort any previous in-flight OAuth flow for this server
- activeOAuthFlows.get(serverName)?.abort()
- const controller = new AbortController()
- activeOAuthFlows.set(serverName, controller)
- // Capture the auth URL from the callback
- let resolveAuthUrl: (url: string) => void
- const authUrlPromise = new Promise<string>(resolve => {
- resolveAuthUrl = resolve
- })
- // Start the OAuth flow in the background
- const oauthPromise = performMCPOAuthFlow(
- serverName,
- config,
- url => resolveAuthUrl!(url),
- controller.signal,
- {
- skipBrowserOpen: true,
- onWaitingForCallback: submit => {
- oauthCallbackSubmitters.set(serverName, submit)
- },
- },
- )
- // Wait for the auth URL (or the flow to complete without needing redirect)
- const authUrl = await Promise.race([
- authUrlPromise,
- oauthPromise.then(() => null as string | null),
- ])
- if (authUrl) {
- sendControlResponseSuccess(message, {
- authUrl,
- requiresUserAction: true,
- })
- } else {
- sendControlResponseSuccess(message, {
- requiresUserAction: false,
- })
- }
- // Store auth-only promise for mcp_oauth_callback_url handler.
- // Don't swallow errors — the callback handler needs to detect
- // auth failures and report them to the caller.
- oauthAuthPromises.set(serverName, oauthPromise)
- // Handle background completion — reconnect after auth.
- // When manual callback is used, skip the reconnect here;
- // the extension's handleAuthDone → mcp_reconnect handles it
- // (which also updates dynamicMcpState for tool registration).
- const fullFlowPromise = oauthPromise
- .then(async () => {
- // Don't reconnect if the server was disabled during the OAuth flow
- if (isMcpServerDisabled(serverName)) {
- return
- }
- // Skip reconnect if the manual callback path was used —
- // handleAuthDone will do it via mcp_reconnect (which
- // updates dynamicMcpState for tool registration).
- if (oauthManualCallbackUsed.has(serverName)) {
- return
- }
- // Reconnect the server after successful auth
- const result = await reconnectMcpServerImpl(
- serverName,
- config,
- )
- const prefix = getMcpPrefix(serverName)
- setAppState(prev => ({
- ...prev,
- mcp: {
- ...prev.mcp,
- clients: prev.mcp.clients.map(c =>
- c.name === serverName ? result.client : c,
- ),
- tools: [
- ...reject(prev.mcp.tools, t =>
- t.name?.startsWith(prefix),
- ),
- ...result.tools,
- ],
- commands: [
- ...reject(prev.mcp.commands, c =>
- commandBelongsToServer(c, serverName),
- ),
- ...result.commands,
- ],
- resources:
- result.resources && result.resources.length > 0
- ? {
- ...prev.mcp.resources,
- [serverName]: result.resources,
- }
- : omit(prev.mcp.resources, serverName),
- },
- }))
- // Also update dynamicMcpState so run() picks up the new tools
- // on the next turn (run() reads dynamicMcpState, not appState)
- dynamicMcpState = {
- ...dynamicMcpState,
- clients: [
- ...dynamicMcpState.clients.filter(
- c => c.name !== serverName,
- ),
- result.client,
- ],
- tools: [
- ...dynamicMcpState.tools.filter(
- t => !t.name?.startsWith(prefix),
- ),
- ...result.tools,
- ],
- }
- })
- .catch(error => {
- logForDebugging(
- `MCP OAuth failed for ${serverName}: ${error}`,
- { level: 'error' },
- )
- })
- .finally(() => {
- // Clean up only if this is still the active flow
- if (activeOAuthFlows.get(serverName) === controller) {
- activeOAuthFlows.delete(serverName)
- oauthCallbackSubmitters.delete(serverName)
- oauthManualCallbackUsed.delete(serverName)
- oauthAuthPromises.delete(serverName)
- }
- })
- void fullFlowPromise
- } catch (error) {
- sendControlResponseError(message, errorMessage(error))
- }
- }
- } else if (message.request.subtype === 'mcp_oauth_callback_url') {
- const { serverName, callbackUrl } = message.request
- const submit = oauthCallbackSubmitters.get(serverName)
- if (submit) {
- // Validate the callback URL before submitting. The submit
- // callback in auth.ts silently ignores URLs missing a code
- // param, which would leave the auth promise unresolved and
- // block the control message loop until timeout.
- let hasCodeOrError = false
- try {
- const parsed = new URL(callbackUrl)
- hasCodeOrError =
- parsed.searchParams.has('code') ||
- parsed.searchParams.has('error')
- } catch {
- // Invalid URL
- }
- if (!hasCodeOrError) {
- sendControlResponseError(
- message,
- 'Invalid callback URL: missing authorization code. Please paste the full redirect URL including the code parameter.',
- )
- } else {
- oauthManualCallbackUsed.add(serverName)
- submit(callbackUrl)
- // Wait for auth (token exchange) to complete before responding.
- // Reconnect is handled by the extension via handleAuthDone →
- // mcp_reconnect (which updates dynamicMcpState for tools).
- const authPromise = oauthAuthPromises.get(serverName)
- if (authPromise) {
- try {
- await authPromise
- sendControlResponseSuccess(message)
- } catch (error) {
- sendControlResponseError(
- message,
- error instanceof Error
- ? error.message
- : 'OAuth authentication failed',
- )
- }
- } else {
- sendControlResponseSuccess(message)
- }
- }
- } else {
- sendControlResponseError(
- message,
- `No active OAuth flow for server: ${serverName}`,
- )
- }
- } else if (message.request.subtype === 'claude_authenticate') {
- // Anthropic OAuth over the control channel. The SDK client owns
- // the user's browser (we're headless in -p mode); we hand back
- // both URLs and wait. Automatic URL → localhost listener catches
- // the redirect if the browser is on this host; manual URL → the
- // success page shows "code#state" for claude_oauth_callback.
- const { loginWithClaudeAi } = message.request
- // Clean up any prior flow. cleanup() closes the localhost listener
- // and nulls the manual resolver. The prior `flow` promise is left
- // pending (AuthCodeListener.close() does not reject) but its object
- // graph becomes unreachable once the server handle is released and
- // is GC'd — no fd or port is held.
- claudeOAuth?.service.cleanup()
- logEvent('tengu_oauth_flow_start', {
- loginWithClaudeAi: loginWithClaudeAi ?? true,
- })
- const service = new OAuthService()
- let urlResolver!: (urls: {
- manualUrl: string
- automaticUrl: string
- }) => void
- const urlPromise = new Promise<{
- manualUrl: string
- automaticUrl: string
- }>(resolve => {
- urlResolver = resolve
- })
- const flow = service
- .startOAuthFlow(
- async (manualUrl, automaticUrl) => {
- // automaticUrl is always defined when skipBrowserOpen is set;
- // the signature is optional only for the existing single-arg callers.
- urlResolver({ manualUrl, automaticUrl: automaticUrl! })
- },
- {
- loginWithClaudeAi: loginWithClaudeAi ?? true,
- skipBrowserOpen: true,
- },
- )
- .then(async tokens => {
- // installOAuthTokens: performLogout (clear stale state) →
- // store profile → saveOAuthTokensIfNeeded → clearOAuthTokenCache
- // → clearAuthRelatedCaches. After this resolves, the memoized
- // getClaudeAIOAuthTokens in this process is invalidated; the
- // next API call re-reads keychain/file and works. No respawn.
- await installOAuthTokens(tokens)
- logEvent('tengu_oauth_success', {
- loginWithClaudeAi: loginWithClaudeAi ?? true,
- })
- })
- .finally(() => {
- service.cleanup()
- if (claudeOAuth?.service === service) {
- claudeOAuth = null
- }
- })
- claudeOAuth = { service, flow }
- // Attach the rejection handler before awaiting so a synchronous
- // startOAuthFlow failure doesn't surface as an unhandled rejection.
- // The claude_oauth_callback handler re-awaits flow for the manual
- // path and surfaces the real error to the client.
- void flow.catch(err =>
- logForDebugging(`claude_authenticate flow ended: ${err}`, {
- level: 'info',
- }),
- )
- try {
- // Race against flow: if startOAuthFlow rejects before calling
- // the authURLHandler (e.g. AuthCodeListener.start() fails with
- // EACCES or fd exhaustion), urlPromise would pend forever and
- // wedge the stdin loop. flow resolving first is unreachable in
- // practice (it's suspended on the same urls we're waiting for).
- const { manualUrl, automaticUrl } = await Promise.race([
- urlPromise,
- flow.then(() => {
- throw new Error(
- 'OAuth flow completed without producing auth URLs',
- )
- }),
- ])
- sendControlResponseSuccess(message, {
- manualUrl,
- automaticUrl,
- })
- } catch (error) {
- sendControlResponseError(message, errorMessage(error))
- }
- } else if (
- message.request.subtype === 'claude_oauth_callback' ||
- message.request.subtype === 'claude_oauth_wait_for_completion'
- ) {
- if (!claudeOAuth) {
- sendControlResponseError(
- message,
- 'No active claude_authenticate flow',
- )
- } else {
- // Inject the manual code synchronously — must happen in stdin
- // message order so a subsequent claude_authenticate doesn't
- // replace the service before this code lands.
- if (message.request.subtype === 'claude_oauth_callback') {
- claudeOAuth.service.handleManualAuthCodeInput({
- authorizationCode: message.request.authorizationCode,
- state: message.request.state,
- })
- }
- // Detach the await — the stdin reader is serial and blocking
- // here deadlocks claude_oauth_wait_for_completion: flow may
- // only resolve via a future claude_oauth_callback on stdin,
- // which can't be read while we're parked. Capture the binding;
- // claudeOAuth is nulled in flow's own .finally.
- const { flow } = claudeOAuth
- void flow.then(
- () => {
- const accountInfo = getAccountInformation()
- sendControlResponseSuccess(message, {
- account: {
- email: accountInfo?.email,
- organization: accountInfo?.organization,
- subscriptionType: accountInfo?.subscription,
- tokenSource: accountInfo?.tokenSource,
- apiKeySource: accountInfo?.apiKeySource,
- apiProvider: getAPIProvider(),
- },
- })
- },
- (error: unknown) =>
- sendControlResponseError(message, errorMessage(error)),
- )
- }
- } else if (message.request.subtype === 'mcp_clear_auth') {
- const { serverName } = message.request
- const currentAppState = getAppState()
- const config =
- getMcpConfigByName(serverName) ??
- mcpClients.find(c => c.name === serverName)?.config ??
- currentAppState.mcp.clients.find(c => c.name === serverName)
- ?.config ??
- null
- if (!config) {
- sendControlResponseError(message, `Server not found: ${serverName}`)
- } else if (config.type !== 'sse' && config.type !== 'http') {
- sendControlResponseError(
- message,
- `Cannot clear auth for server type "${config.type}"`,
- )
- } else {
- await revokeServerTokens(serverName, config)
- const result = await reconnectMcpServerImpl(serverName, config)
- const prefix = getMcpPrefix(serverName)
- setAppState(prev => ({
- ...prev,
- mcp: {
- ...prev.mcp,
- clients: prev.mcp.clients.map(c =>
- c.name === serverName ? result.client : c,
- ),
- tools: [
- ...reject(prev.mcp.tools, t => t.name?.startsWith(prefix)),
- ...result.tools,
- ],
- commands: [
- ...reject(prev.mcp.commands, c =>
- commandBelongsToServer(c, serverName),
- ),
- ...result.commands,
- ],
- resources:
- result.resources && result.resources.length > 0
- ? {
- ...prev.mcp.resources,
- [serverName]: result.resources,
- }
- : omit(prev.mcp.resources, serverName),
- },
- }))
- sendControlResponseSuccess(message, {})
- }
- } else if (message.request.subtype === 'apply_flag_settings') {
- // Snapshot the current model before applying — we need to detect
- // model switches so we can inject breadcrumbs and notify listeners.
- const prevModel = getMainLoopModel()
- // Merge the provided settings into the in-memory flag settings
- const existing = getFlagSettingsInline() ?? {}
- const incoming = message.request.settings
- // Shallow-merge top-level keys; getSettingsForSource handles
- // the deep merge with file-based flag settings via mergeWith.
- // JSON serialization drops `undefined`, so callers use `null`
- // to signal "clear this key". Convert nulls to deletions so
- // SettingsSchema().safeParse() doesn't reject the whole object
- // (z.string().optional() accepts string | undefined, not null).
- const merged = { ...existing, ...incoming }
- for (const key of Object.keys(merged)) {
- if (merged[key as keyof typeof merged] === null) {
- delete merged[key as keyof typeof merged]
- }
- }
- setFlagSettingsInline(merged)
- // Route through notifyChange so fanOut() resets the settings cache
- // before listeners run. The subscriber at :392 calls
- // applySettingsChange for us. Pre-#20625 this was a direct
- // applySettingsChange() call that relied on its own internal reset —
- // now that the reset is centralized in fanOut, a direct call here
- // would read stale cached settings and silently drop the update.
- // Bonus: going through notifyChange also tells the other subscribers
- // (loadPluginHooks, sandbox-adapter) about the change, which the
- // previous direct call skipped.
- settingsChangeDetector.notifyChange('flagSettings')
- // If the incoming settings include a model change, update the
- // override so getMainLoopModel() reflects it. The override has
- // higher priority than the settings cascade in
- // getUserSpecifiedModelSetting(), so without this update,
- // getMainLoopModel() returns the stale override and the model
- // change is silently ignored (matching set_model at :2811).
- if ('model' in incoming) {
- if (incoming.model != null) {
- setMainLoopModelOverride(String(incoming.model))
- } else {
- setMainLoopModelOverride(undefined)
- }
- }
- // If the model changed, inject breadcrumbs so the model sees the
- // mid-conversation switch, and notify metadata listeners (CCR).
- const newModel = getMainLoopModel()
- if (newModel !== prevModel) {
- activeUserSpecifiedModel = newModel
- const modelArg = incoming.model ? String(incoming.model) : 'default'
- notifySessionMetadataChanged({ model: newModel })
- injectModelSwitchBreadcrumbs(modelArg, newModel)
- }
- sendControlResponseSuccess(message)
- } else if (message.request.subtype === 'get_settings') {
- const currentAppState = getAppState()
- const model = getMainLoopModel()
- // modelSupportsEffort gate matches claude.ts — applied.effort must
- // mirror what actually goes to the API, not just what's configured.
- const effort = modelSupportsEffort(model)
- ? resolveAppliedEffort(model, currentAppState.effortValue)
- : undefined
- sendControlResponseSuccess(message, {
- ...getSettingsWithSources(),
- applied: {
- model,
- // Numeric effort (ant-only) → null; SDK schema is string-level only.
- effort: typeof effort === 'string' ? effort : null,
- },
- })
- } else if (message.request.subtype === 'stop_task') {
- const { task_id: taskId } = message.request
- try {
- await stopTask(taskId, {
- getAppState,
- setAppState,
- })
- sendControlResponseSuccess(message, {})
- } catch (error) {
- sendControlResponseError(message, errorMessage(error))
- }
- } else if (message.request.subtype === 'generate_session_title') {
- // Fire-and-forget so the Haiku call does not block the stdin loop
- // (which would delay processing of subsequent user messages /
- // interrupts for the duration of the API roundtrip).
- const { description, persist } = message.request
- // Reuse the live controller only if it has not already been aborted
- // (e.g. by interrupt()); an aborted signal would cause queryHaiku to
- // immediately throw APIUserAbortError → {title: null}.
- const titleSignal = (
- abortController && !abortController.signal.aborted
- ? abortController
- : createAbortController()
- ).signal
- void (async () => {
- try {
- const title = await generateSessionTitle(description, titleSignal)
- if (title && persist) {
- try {
- saveAiGeneratedTitle(getSessionId() as UUID, title)
- } catch (e) {
- logError(e)
- }
- }
- sendControlResponseSuccess(message, { title })
- } catch (e) {
- // Unreachable in practice — generateSessionTitle wraps its
- // own body and returns null, saveAiGeneratedTitle is wrapped
- // above. Propagate (not swallow) so unexpected failures are
- // visible to the SDK caller (hostComms.ts catches and logs).
- sendControlResponseError(message, errorMessage(e))
- }
- })()
- } else if (message.request.subtype === 'side_question') {
- // Same fire-and-forget pattern as generate_session_title above —
- // the forked agent's API roundtrip must not block the stdin loop.
- //
- // The snapshot captured by stopHooks (for querySource === 'sdk')
- // holds the exact systemPrompt/userContext/systemContext/messages
- // sent on the last main-thread turn. Reusing them gives a byte-
- // identical prefix → prompt cache hit.
- //
- // Fallback (resume before first turn completes — no snapshot yet):
- // rebuild from scratch. buildSideQuestionFallbackParams mirrors
- // QueryEngine.ts:ask()'s system prompt assembly (including
- // --system-prompt / --append-system-prompt) so the rebuilt prefix
- // matches in the common case. May still miss the cache for
- // coordinator mode or memory-mechanics extras — acceptable, the
- // alternative is the side question failing entirely.
- const { question } = message.request
- void (async () => {
- try {
- const saved = getLastCacheSafeParams()
- const cacheSafeParams = saved
- ? {
- ...saved,
- // If the last turn was interrupted, the snapshot holds an
- // already-aborted controller; createChildAbortController in
- // createSubagentContext would propagate it and the fork
- // would die before sending a request. The controller is
- // not part of the cache key — swapping in a fresh one is
- // safe. Same guard as generate_session_title above.
- toolUseContext: {
- ...saved.toolUseContext,
- abortController: createAbortController(),
- },
- }
- : await buildSideQuestionFallbackParams({
- tools: buildAllTools(getAppState()),
- commands: currentCommands,
- mcpClients: [
- ...getAppState().mcp.clients,
- ...sdkClients,
- ...dynamicMcpState.clients,
- ],
- messages: mutableMessages,
- readFileState,
- getAppState,
- setAppState,
- customSystemPrompt: options.systemPrompt,
- appendSystemPrompt: options.appendSystemPrompt,
- thinkingConfig: options.thinkingConfig,
- agents: currentAgents,
- })
- const result = await runSideQuestion({
- question,
- cacheSafeParams,
- })
- sendControlResponseSuccess(message, { response: result.response })
- } catch (e) {
- sendControlResponseError(message, errorMessage(e))
- }
- })()
- } else if (
- (feature('PROACTIVE') || feature('KAIROS')) &&
- (message.request as { subtype: string }).subtype === 'set_proactive'
- ) {
- const req = message.request as unknown as {
- subtype: string
- enabled: boolean
- }
- if (req.enabled) {
- if (!proactiveModule!.isProactiveActive()) {
- proactiveModule!.activateProactive('command')
- scheduleProactiveTick!()
- }
- } else {
- proactiveModule!.deactivateProactive()
- }
- sendControlResponseSuccess(message)
- } else if (message.request.subtype === 'remote_control') {
- if (message.request.enabled) {
- if (bridgeHandle) {
- // Already connected
- sendControlResponseSuccess(message, {
- session_url: getRemoteSessionUrl(
- bridgeHandle.bridgeSessionId,
- bridgeHandle.sessionIngressUrl,
- ),
- connect_url: buildBridgeConnectUrl(
- bridgeHandle.environmentId,
- bridgeHandle.sessionIngressUrl,
- ),
- environment_id: bridgeHandle.environmentId,
- })
- } else {
- // initReplBridge surfaces gate-failure reasons via
- // onStateChange('failed', detail) before returning null.
- // Capture so the control-response error is actionable
- // ("/login", "disabled by your organization's policy", etc.)
- // instead of a generic "initialization failed".
- let bridgeFailureDetail: string | undefined
- try {
- const { initReplBridge } = await import(
- 'src/bridge/initReplBridge.js'
- )
- const handle = await initReplBridge({
- onInboundMessage(msg) {
- const fields = extractInboundMessageFields(msg)
- if (!fields) return
- const { content, uuid } = fields
- enqueue({
- value: content,
- mode: 'prompt' as const,
- uuid,
- skipSlashCommands: true,
- })
- void run()
- },
- onPermissionResponse(response) {
- // Forward bridge permission responses into the
- // stdin processing loop so they resolve pending
- // permission requests from the SDK consumer.
- structuredIO.injectControlResponse(response)
- },
- onInterrupt() {
- abortController?.abort()
- },
- onSetModel(model) {
- const resolved =
- model === 'default' ? getDefaultMainLoopModel() : model
- activeUserSpecifiedModel = resolved
- setMainLoopModelOverride(resolved)
- },
- onSetMaxThinkingTokens(maxTokens) {
- if (maxTokens === null) {
- options.thinkingConfig = undefined
- } else if (maxTokens === 0) {
- options.thinkingConfig = { type: 'disabled' }
- } else {
- options.thinkingConfig = {
- type: 'enabled',
- budgetTokens: maxTokens,
- }
- }
- },
- onStateChange(state, detail) {
- if (state === 'failed') {
- bridgeFailureDetail = detail
- }
- logForDebugging(
- `[bridge:sdk] State change: ${state}${detail ? ` — ${detail}` : ''}`,
- )
- output.enqueue({
- type: 'system' as StdoutMessage['type'],
- subtype: 'bridge_state' as string,
- state,
- detail,
- uuid: randomUUID(),
- session_id: getSessionId(),
- } as StdoutMessage)
- },
- initialMessages:
- mutableMessages.length > 0 ? mutableMessages : undefined,
- })
- if (!handle) {
- sendControlResponseError(
- message,
- bridgeFailureDetail ??
- 'Remote Control initialization failed',
- )
- } else {
- bridgeHandle = handle
- bridgeLastForwardedIndex = mutableMessages.length
- // Forward permission requests to the bridge
- structuredIO.setOnControlRequestSent(request => {
- handle.sendControlRequest(request)
- })
- // Cancel stale bridge permission prompts when the SDK
- // consumer resolves a can_use_tool request first.
- structuredIO.setOnControlRequestResolved(requestId => {
- handle.sendControlCancelRequest(requestId)
- })
- sendControlResponseSuccess(message, {
- session_url: getRemoteSessionUrl(
- handle.bridgeSessionId,
- handle.sessionIngressUrl,
- ),
- connect_url: buildBridgeConnectUrl(
- handle.environmentId,
- handle.sessionIngressUrl,
- ),
- environment_id: handle.environmentId,
- })
- }
- } catch (err) {
- sendControlResponseError(message, errorMessage(err))
- }
- }
- } else {
- // Disable
- if (bridgeHandle) {
- structuredIO.setOnControlRequestSent(undefined)
- structuredIO.setOnControlRequestResolved(undefined)
- await bridgeHandle.teardown()
- bridgeHandle = null
- }
- sendControlResponseSuccess(message)
- }
- } else {
- // Unknown control request subtype — send an error response so
- // the caller doesn't hang waiting for a reply that never comes.
- sendControlResponseError(
- message,
- `Unsupported control request subtype: ${(message.request as { subtype: string }).subtype}`,
- )
- }
- continue
- } else if (message.type === 'control_response') {
- // Replay control_response messages when replay mode is enabled
- if (options.replayUserMessages) {
- output.enqueue(message)
- }
- continue
- } else if (message.type === 'keep_alive') {
- // Silently ignore keep-alive messages
- continue
- } else if (message.type === 'update_environment_variables') {
- // Handled in structuredIO.ts, but TypeScript needs the type guard
- continue
- } else if (message.type === 'assistant' || message.type === 'system') {
- // History replay from bridge: inject into mutableMessages as
- // conversation context so the model sees prior turns.
- const internalMsgs = toInternalMessages([message])
- mutableMessages.push(...internalMsgs)
- // Echo assistant messages back so CCR displays them
- if (message.type === 'assistant' && options.replayUserMessages) {
- output.enqueue(message)
- }
- continue
- }
- // After handling control, keep-alive, env-var, assistant, and system
- // messages above, only user messages should remain.
- if (message.type !== 'user') {
- continue
- }
- // First prompt message implicitly initializes if not already done.
- initialized = true
- // Check for duplicate user message - skip if already processed
- if (message.uuid) {
- const sessionId = getSessionId() as UUID
- const existsInSession = await doesMessageExistInSession(
- sessionId,
- message.uuid,
- )
- // Check both historical duplicates (from file) and runtime duplicates (this session)
- if (existsInSession || receivedMessageUuids.has(message.uuid)) {
- logForDebugging(`Skipping duplicate user message: ${message.uuid}`)
- // Send acknowledgment for duplicate message if replay mode is enabled
- if (options.replayUserMessages) {
- logForDebugging(
- `Sending acknowledgment for duplicate user message: ${message.uuid}`,
- )
- output.enqueue({
- type: 'user',
- content: message.message?.content ?? '',
- message: message.message,
- session_id: sessionId,
- parent_tool_use_id: null,
- uuid: message.uuid,
- timestamp: message.timestamp,
- isReplay: true,
- } as unknown as SDKUserMessageReplay)
- }
- // Historical dup = transcript already has this turn's output, so it
- // ran but its lifecycle was never closed (interrupted before ack).
- // Runtime dups don't need this — the original enqueue path closes them.
- if (existsInSession) {
- notifyCommandLifecycle(message.uuid, 'completed')
- }
- // Don't enqueue duplicate messages for execution
- continue
- }
- // Track this UUID to prevent runtime duplicates
- trackReceivedMessageUuid(message.uuid)
- }
- enqueue({
- mode: 'prompt' as const,
- // file_attachments rides the protobuf catchall from the web composer.
- // Same-ref no-op when absent (no 'file_attachments' key).
- value: await resolveAndPrepend(message, message.message.content),
- uuid: message.uuid,
- priority: message.priority,
- })
- // Increment prompt count for attribution tracking and save snapshot
- // The snapshot persists promptCount so it survives compaction
- if (feature('COMMIT_ATTRIBUTION')) {
- setAppState(prev => ({
- ...prev,
- attribution: incrementPromptCount(prev.attribution, snapshot => {
- void recordAttributionSnapshot(snapshot).catch(error => {
- logForDebugging(`Attribution: Failed to save snapshot: ${error}`)
- })
- }),
- }))
- }
- void run()
- }
- inputClosed = true
- cronScheduler?.stop()
- if (!running) {
- // If a push-suggestion is in-flight, wait for it to emit before closing
- // the output stream (5 s safety timeout to prevent hanging).
- if (suggestionState.inflightPromise) {
- await Promise.race([suggestionState.inflightPromise, sleep(5000)])
- }
- suggestionState.abortController?.abort()
- suggestionState.abortController = null
- await finalizePendingAsyncHooks()
- unsubscribeSkillChanges()
- unsubscribeAuthStatus?.()
- statusListeners.delete(rateLimitListener)
- output.done()
- }
- })()
- return output
- }
- /**
- * Creates a CanUseToolFn that incorporates a custom permission prompt tool.
- * This function converts the permissionPromptTool into a CanUseToolFn that can be used in ask.tsx
- */
- export function createCanUseToolWithPermissionPrompt(
- permissionPromptTool: PermissionPromptTool,
- ): CanUseToolFn {
- const canUseTool: CanUseToolFn = async (
- tool,
- input,
- toolUseContext,
- assistantMessage,
- toolUseId,
- forceDecision,
- ) => {
- const mainPermissionResult =
- forceDecision ??
- (await hasPermissionsToUseTool(
- tool,
- input,
- toolUseContext,
- assistantMessage,
- toolUseId,
- ))
- // If the tool is allowed or denied, return the result
- if (
- mainPermissionResult.behavior === 'allow' ||
- mainPermissionResult.behavior === 'deny'
- ) {
- return mainPermissionResult
- }
- // Race the permission prompt tool against the abort signal.
- //
- // Why we need this: The permission prompt tool may block indefinitely waiting
- // for user input (e.g., via stdin or a UI dialog). If the user triggers an
- // interrupt (Ctrl+C), we need to detect it even while the tool is blocked.
- // Without this race, the abort check would only run AFTER the tool completes,
- // which may never happen if the tool is waiting for input that will never come.
- //
- // The second check (combinedSignal.aborted) handles a race condition where
- // abort fires after Promise.race resolves but before we reach this check.
- const { signal: combinedSignal, cleanup: cleanupAbortListener } =
- createCombinedAbortSignal(toolUseContext.abortController.signal)
- // Check if already aborted before starting the race
- if (combinedSignal.aborted) {
- cleanupAbortListener()
- return {
- behavior: 'deny',
- message: 'Permission prompt was aborted.',
- decisionReason: {
- type: 'permissionPromptTool' as const,
- permissionPromptToolName: tool.name,
- toolResult: undefined,
- },
- }
- }
- const abortPromise = new Promise<'aborted'>(resolve => {
- combinedSignal.addEventListener('abort', () => resolve('aborted'), {
- once: true,
- })
- })
- const toolCallPromise = permissionPromptTool.call(
- {
- tool_name: tool.name,
- input,
- tool_use_id: toolUseId,
- },
- toolUseContext,
- canUseTool,
- assistantMessage,
- )
- const raceResult = await Promise.race([toolCallPromise, abortPromise])
- cleanupAbortListener()
- if (raceResult === 'aborted' || combinedSignal.aborted) {
- return {
- behavior: 'deny',
- message: 'Permission prompt was aborted.',
- decisionReason: {
- type: 'permissionPromptTool' as const,
- permissionPromptToolName: tool.name,
- toolResult: undefined,
- },
- }
- }
- // TypeScript narrowing: after the abort check, raceResult must be ToolResult
- const result = raceResult as Awaited<typeof toolCallPromise>
- const permissionToolResultBlockParam =
- permissionPromptTool.mapToolResultToToolResultBlockParam(result.data, '1')
- if (
- !permissionToolResultBlockParam.content ||
- !Array.isArray(permissionToolResultBlockParam.content) ||
- !permissionToolResultBlockParam.content[0] ||
- permissionToolResultBlockParam.content[0].type !== 'text' ||
- typeof permissionToolResultBlockParam.content[0].text !== 'string'
- ) {
- throw new Error(
- 'Permission prompt tool returned an invalid result. Expected a single text block param with type="text" and a string text value.',
- )
- }
- return permissionPromptToolResultToPermissionDecision(
- permissionToolOutputSchema().parse(
- safeParseJSON(permissionToolResultBlockParam.content[0].text),
- ),
- permissionPromptTool,
- input,
- toolUseContext,
- )
- }
- return canUseTool
- }
- // Exported for testing — regression: this used to crash at construction when
- // getMcpTools() was empty (before per-server connects populated appState).
- export function getCanUseToolFn(
- permissionPromptToolName: string | undefined,
- structuredIO: StructuredIO,
- getMcpTools: () => Tool[],
- onPermissionPrompt?: (details: RequiresActionDetails) => void,
- ): CanUseToolFn {
- if (permissionPromptToolName === 'stdio') {
- return structuredIO.createCanUseTool(onPermissionPrompt)
- }
- if (!permissionPromptToolName) {
- return async (
- tool,
- input,
- toolUseContext,
- assistantMessage,
- toolUseId,
- forceDecision,
- ) =>
- forceDecision ??
- (await hasPermissionsToUseTool(
- tool,
- input,
- toolUseContext,
- assistantMessage,
- toolUseId,
- ))
- }
- // Lazy lookup: MCP connects are per-server incremental in print mode, so
- // the tool may not be in appState yet at init time. Resolve on first call
- // (first permission prompt), by which point connects have had time to finish.
- let resolved: CanUseToolFn | null = null
- return async (
- tool,
- input,
- toolUseContext,
- assistantMessage,
- toolUseId,
- forceDecision,
- ) => {
- if (!resolved) {
- const mcpTools = getMcpTools()
- const permissionPromptTool = mcpTools.find(t =>
- toolMatchesName(t, permissionPromptToolName),
- ) as PermissionPromptTool | undefined
- if (!permissionPromptTool) {
- const error = `Error: MCP tool ${permissionPromptToolName} (passed via --permission-prompt-tool) not found. Available MCP tools: ${mcpTools.map(t => t.name).join(', ') || 'none'}`
- process.stderr.write(`${error}\n`)
- gracefulShutdownSync(1)
- throw new Error(error)
- }
- if (!permissionPromptTool.inputJSONSchema) {
- const error = `Error: tool ${permissionPromptToolName} (passed via --permission-prompt-tool) must be an MCP tool`
- process.stderr.write(`${error}\n`)
- gracefulShutdownSync(1)
- throw new Error(error)
- }
- resolved = createCanUseToolWithPermissionPrompt(permissionPromptTool)
- }
- return resolved(
- tool,
- input,
- toolUseContext,
- assistantMessage,
- toolUseId,
- forceDecision,
- )
- }
- }
- async function handleInitializeRequest(
- request: SDKControlInitializeRequest,
- requestId: string,
- initialized: boolean,
- output: Stream<StdoutMessage>,
- commands: Command[],
- modelInfos: ModelInfo[],
- structuredIO: StructuredIO,
- enableAuthStatus: boolean,
- options: {
- systemPrompt: string | undefined
- appendSystemPrompt: string | undefined
- agent?: string | undefined
- userSpecifiedModel?: string | undefined
- [key: string]: unknown
- },
- agents: AgentDefinition[],
- getAppState: () => AppState,
- ): Promise<void> {
- if (initialized) {
- output.enqueue({
- type: 'control_response',
- response: {
- subtype: 'error',
- error: 'Already initialized',
- request_id: requestId,
- pending_permission_requests:
- structuredIO.getPendingPermissionRequests(),
- },
- })
- return
- }
- // Apply systemPrompt/appendSystemPrompt from stdin to avoid ARG_MAX limits
- if (request.systemPrompt !== undefined) {
- options.systemPrompt = request.systemPrompt
- }
- if (request.appendSystemPrompt !== undefined) {
- options.appendSystemPrompt = request.appendSystemPrompt
- }
- if (request.promptSuggestions !== undefined) {
- options.promptSuggestions = request.promptSuggestions
- }
- // Merge agents from stdin to avoid ARG_MAX limits
- if (request.agents) {
- const stdinAgents = parseAgentsFromJson(request.agents, 'flagSettings')
- agents.push(...stdinAgents)
- }
- // Re-evaluate main thread agent after SDK agents are merged
- // This allows --agent to reference agents defined via SDK
- if (options.agent) {
- // If main.tsx already found this agent (filesystem-defined), it already
- // applied systemPrompt/model/initialPrompt. Skip to avoid double-apply.
- const alreadyResolved = getMainThreadAgentType() === options.agent
- const mainThreadAgent = agents.find(a => a.agentType === options.agent)
- if (mainThreadAgent && !alreadyResolved) {
- // Update the main thread agent type in bootstrap state
- setMainThreadAgentType(mainThreadAgent.agentType)
- // Apply the agent's system prompt if user hasn't specified a custom one
- // SDK agents are always custom agents (not built-in), so getSystemPrompt() takes no args
- if (!options.systemPrompt && !isBuiltInAgent(mainThreadAgent)) {
- const agentSystemPrompt = mainThreadAgent.getSystemPrompt()
- if (agentSystemPrompt) {
- options.systemPrompt = agentSystemPrompt
- }
- }
- // Apply the agent's model if user didn't specify one and agent has a model
- if (
- !options.userSpecifiedModel &&
- mainThreadAgent.model &&
- mainThreadAgent.model !== 'inherit'
- ) {
- const agentModel = parseUserSpecifiedModel(mainThreadAgent.model)
- setMainLoopModelOverride(agentModel)
- }
- // SDK-defined agents arrive via init, so main.tsx's lookup missed them.
- if (mainThreadAgent.initialPrompt) {
- structuredIO.prependUserMessage(mainThreadAgent.initialPrompt)
- }
- } else if (mainThreadAgent?.initialPrompt) {
- // Filesystem-defined agent (alreadyResolved by main.tsx). main.tsx
- // handles initialPrompt for the string inputPrompt case, but when
- // inputPrompt is an AsyncIterable (SDK stream-json), it can't
- // concatenate — fall back to prependUserMessage here.
- structuredIO.prependUserMessage(mainThreadAgent.initialPrompt)
- }
- }
- const settings = getSettings_DEPRECATED()
- const outputStyle = settings?.outputStyle || DEFAULT_OUTPUT_STYLE_NAME
- const availableOutputStyles = await getAllOutputStyles(getCwd())
- // Get account information
- const accountInfo = getAccountInformation()
- if (request.hooks) {
- const hooks: Partial<Record<HookEvent, HookCallbackMatcher[]>> = {}
- for (const [event, matchers] of Object.entries(request.hooks) as [string, Array<{ hookCallbackIds: string[]; timeout?: number; matcher?: string }>][]) {
- hooks[event as HookEvent] = matchers.map(matcher => {
- const callbacks = matcher.hookCallbackIds.map(callbackId => {
- return structuredIO.createHookCallback(callbackId, matcher.timeout)
- })
- return {
- matcher: matcher.matcher,
- hooks: callbacks,
- }
- })
- }
- registerHookCallbacks(hooks)
- }
- if (request.jsonSchema) {
- setInitJsonSchema(request.jsonSchema)
- }
- const initResponse: SDKControlInitializeResponse = {
- commands: commands
- .filter(cmd => cmd.userInvocable !== false)
- .map(cmd => ({
- name: getCommandName(cmd),
- description: formatDescriptionWithSource(cmd),
- argumentHint: cmd.argumentHint || '',
- })),
- agents: agents.map(agent => ({
- name: agent.agentType,
- description: agent.whenToUse,
- // 'inherit' is an internal sentinel; normalize to undefined for the public API
- model: agent.model === 'inherit' ? undefined : agent.model,
- })),
- output_style: outputStyle,
- available_output_styles: Object.keys(availableOutputStyles),
- models: modelInfos,
- account: {
- email: accountInfo?.email,
- organization: accountInfo?.organization,
- subscriptionType: accountInfo?.subscription,
- tokenSource: accountInfo?.tokenSource,
- apiKeySource: accountInfo?.apiKeySource,
- // getAccountInformation() returns undefined under 3P providers, so the
- // other fields are all absent. apiProvider disambiguates "not logged
- // in" (firstParty + tokenSource:none) from "3P, login not applicable".
- apiProvider: getAPIProvider(),
- },
- pid: process.pid,
- }
- if (isFastModeEnabled() && isFastModeAvailable()) {
- const appState = getAppState()
- initResponse.fast_mode_state = getFastModeState(
- options.userSpecifiedModel ?? null,
- appState.fastMode,
- )
- }
- output.enqueue({
- type: 'control_response',
- response: {
- subtype: 'success',
- request_id: requestId,
- response: initResponse,
- },
- })
- // After the initialize message, check the auth status-
- // This will get notified of changes, but we also want to send the
- // initial state.
- if (enableAuthStatus) {
- const authStatusManager = AwsAuthStatusManager.getInstance()
- const status = authStatusManager.getStatus()
- if (status) {
- output.enqueue({
- type: 'auth_status',
- isAuthenticating: status.isAuthenticating,
- output: status.output,
- error: status.error,
- uuid: randomUUID(),
- session_id: getSessionId(),
- })
- }
- }
- }
- async function handleRewindFiles(
- userMessageId: UUID,
- appState: AppState,
- setAppState: (updater: (prev: AppState) => AppState) => void,
- dryRun: boolean,
- ): Promise<RewindFilesResult> {
- if (!fileHistoryEnabled()) {
- return { canRewind: false, error: 'File rewinding is not enabled.', filesChanged: [] }
- }
- if (!fileHistoryCanRestore(appState.fileHistory, userMessageId)) {
- return {
- canRewind: false,
- error: 'No file checkpoint found for this message.',
- filesChanged: [],
- }
- }
- if (dryRun) {
- const diffStats = await fileHistoryGetDiffStats(
- appState.fileHistory,
- userMessageId,
- )
- return {
- canRewind: true,
- filesChanged: diffStats?.filesChanged,
- insertions: diffStats?.insertions,
- deletions: diffStats?.deletions,
- }
- }
- try {
- await fileHistoryRewind(
- updater =>
- setAppState(prev => ({
- ...prev,
- fileHistory: updater(prev.fileHistory),
- })),
- userMessageId,
- )
- } catch (error) {
- return {
- canRewind: false,
- error: `Failed to rewind: ${errorMessage(error)}`,
- filesChanged: [],
- }
- }
- return { canRewind: true, filesChanged: [] }
- }
- function handleSetPermissionMode(
- request: { mode: InternalPermissionMode },
- requestId: string,
- toolPermissionContext: ToolPermissionContext,
- output: Stream<StdoutMessage>,
- ): ToolPermissionContext {
- // Check if trying to switch to bypassPermissions mode
- if (request.mode === 'bypassPermissions') {
- if (isBypassPermissionsModeDisabled()) {
- output.enqueue({
- type: 'control_response',
- response: {
- subtype: 'error',
- request_id: requestId,
- error:
- 'Cannot set permission mode to bypassPermissions because it is disabled by settings or configuration',
- },
- })
- return toolPermissionContext
- }
- if (!toolPermissionContext.isBypassPermissionsModeAvailable) {
- output.enqueue({
- type: 'control_response',
- response: {
- subtype: 'error',
- request_id: requestId,
- error:
- 'Cannot set permission mode to bypassPermissions because the session was not launched with --dangerously-skip-permissions',
- },
- })
- return toolPermissionContext
- }
- }
- // Check if trying to switch to auto mode without the classifier gate
- if (
- feature('TRANSCRIPT_CLASSIFIER') &&
- request.mode === 'auto' &&
- !isAutoModeGateEnabled()
- ) {
- const reason = getAutoModeUnavailableReason()
- output.enqueue({
- type: 'control_response',
- response: {
- subtype: 'error',
- request_id: requestId,
- error: reason
- ? `Cannot set permission mode to auto: ${getAutoModeUnavailableNotification(reason)}`
- : 'Cannot set permission mode to auto',
- },
- })
- return toolPermissionContext
- }
- // Allow the mode switch
- output.enqueue({
- type: 'control_response',
- response: {
- subtype: 'success',
- request_id: requestId,
- response: {
- mode: request.mode,
- },
- },
- })
- return {
- ...transitionPermissionMode(
- toolPermissionContext.mode,
- request.mode,
- toolPermissionContext,
- ),
- mode: request.mode,
- }
- }
- /**
- * IDE-triggered channel enable. Derives the ChannelEntry from the connection's
- * pluginSource (IDE can't spoof kind/marketplace — we only take the server
- * name), appends it to session allowedChannels, and runs the full gate. On
- * gate failure, rolls back the append. On success, registers a notification
- * handler that enqueues channel messages at priority:'next' — drainCommandQueue
- * picks them up between turns.
- *
- * Intentionally does NOT register the claude/channel/permission handler that
- * useManageMCPConnections sets up for interactive mode. That handler resolves
- * a pending dialog inside handleInteractivePermission — but print.ts never
- * calls handleInteractivePermission. When SDK permission lands on 'ask', it
- * goes to the consumer's canUseTool callback over stdio; there is no CLI-side
- * dialog for a remote "yes tbxkq" to resolve. If an IDE wants channel-relayed
- * tool approval, that's IDE-side plumbing against its own pending-map. (Also
- * gated separately by tengu_harbor_permissions — not yet shipping on
- * interactive either.)
- */
- function handleChannelEnable(
- requestId: string,
- serverName: string,
- connectionPool: readonly MCPServerConnection[],
- output: Stream<StdoutMessage>,
- ): void {
- const respondError = (error: string) =>
- output.enqueue({
- type: 'control_response',
- response: { subtype: 'error', request_id: requestId, error },
- })
- if (!(feature('KAIROS') || feature('KAIROS_CHANNELS'))) {
- return respondError('channels feature not available in this build')
- }
- // Only a 'connected' client has .capabilities and .client to register the
- // handler on. The pool spread at the call site matches mcp_status.
- const connection = connectionPool.find(
- c => c.name === serverName && c.type === 'connected',
- )
- if (!connection || connection.type !== 'connected') {
- return respondError(`server ${serverName} is not connected`)
- }
- const pluginSource = connection.config.pluginSource
- const parsed = pluginSource ? parsePluginIdentifier(pluginSource) : undefined
- if (!parsed?.marketplace) {
- // No pluginSource or @-less source — can never pass the {plugin,
- // marketplace}-keyed allowlist. Short-circuit with the same reason the
- // gate would produce.
- return respondError(
- `server ${serverName} is not plugin-sourced; channel_enable requires a marketplace plugin`,
- )
- }
- const entry: ChannelEntry = {
- kind: 'plugin',
- name: parsed.name,
- marketplace: parsed.marketplace,
- }
- // Idempotency: don't double-append on repeat enable.
- const prior = getAllowedChannels()
- const already = prior.some(
- e =>
- e.kind === 'plugin' &&
- e.name === entry.name &&
- e.marketplace === entry.marketplace,
- )
- if (!already) setAllowedChannels([...prior, entry])
- const gate = gateChannelServer(
- serverName,
- connection.capabilities,
- pluginSource,
- )
- if (gate.action === 'skip') {
- // Rollback — only remove the entry we appended.
- if (!already) setAllowedChannels(prior)
- return respondError(gate.reason)
- }
- const pluginId =
- `${entry.name}@${entry.marketplace}` as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS
- logMCPDebug(serverName, 'Channel notifications registered')
- logEvent('tengu_mcp_channel_enable', { plugin: pluginId })
- // Identical enqueue shape to the interactive register block in
- // useManageMCPConnections. drainCommandQueue processes it between turns —
- // channel messages queue at priority 'next' and are seen by the model on
- // the turn after they arrive.
- connection.client.setNotificationHandler(
- ChannelMessageNotificationSchema(),
- async notification => {
- const { content, meta } = notification.params
- logMCPDebug(
- serverName,
- `notifications/claude/channel: ${content.slice(0, 80)}`,
- )
- logEvent('tengu_mcp_channel_message', {
- content_length: content.length,
- meta_key_count: Object.keys(meta ?? {}).length,
- entry_kind:
- 'plugin' as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS,
- is_dev: false,
- plugin: pluginId,
- })
- enqueue({
- mode: 'prompt',
- value: wrapChannelMessage(serverName, content, meta),
- priority: 'next',
- isMeta: true,
- origin: { kind: 'channel', server: serverName } as unknown as string,
- skipSlashCommands: true,
- })
- },
- )
- output.enqueue({
- type: 'control_response',
- response: {
- subtype: 'success',
- request_id: requestId,
- response: undefined,
- },
- })
- }
- /**
- * Re-register the channel notification handler after mcp_reconnect /
- * mcp_toggle creates a new client. handleChannelEnable bound the handler to
- * the OLD client object; allowedChannels survives the reconnect but the
- * handler binding does not. Without this, channel messages silently drop
- * after a reconnect while the IDE still believes the channel is live.
- *
- * Mirrors the interactive CLI's onConnectionAttempt in
- * useManageMCPConnections, which re-gates on every new connection. Paired
- * with registerElicitationHandlers at the same call sites.
- *
- * No-op if the server was never channel-enabled: gateChannelServer calls
- * findChannelEntry internally and returns skip/session for an unlisted
- * server, so reconnecting a non-channel MCP server costs one feature-flag
- * check.
- */
- function reregisterChannelHandlerAfterReconnect(
- connection: MCPServerConnection,
- ): void {
- if (!(feature('KAIROS') || feature('KAIROS_CHANNELS'))) return
- if (connection.type !== 'connected') return
- const gate = gateChannelServer(
- connection.name,
- connection.capabilities,
- connection.config.pluginSource,
- )
- if (gate.action !== 'register') return
- const entry = findChannelEntry(connection.name, getAllowedChannels())
- const pluginId =
- entry?.kind === 'plugin'
- ? (`${entry.name}@${entry.marketplace}` as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS)
- : undefined
- logMCPDebug(
- connection.name,
- 'Channel notifications re-registered after reconnect',
- )
- connection.client.setNotificationHandler(
- ChannelMessageNotificationSchema(),
- async notification => {
- const { content, meta } = notification.params
- logMCPDebug(
- connection.name,
- `notifications/claude/channel: ${content.slice(0, 80)}`,
- )
- logEvent('tengu_mcp_channel_message', {
- content_length: content.length,
- meta_key_count: Object.keys(meta ?? {}).length,
- entry_kind:
- entry?.kind as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS,
- is_dev: entry?.dev ?? false,
- plugin: pluginId,
- })
- enqueue({
- mode: 'prompt',
- value: wrapChannelMessage(connection.name, content, meta),
- priority: 'next',
- isMeta: true,
- origin: { kind: 'channel', server: connection.name } as unknown as string,
- skipSlashCommands: true,
- })
- },
- )
- }
- /**
- * Emits an error message in the correct format based on outputFormat.
- * When using stream-json, writes JSON to stdout; otherwise writes plain text to stderr.
- */
- function emitLoadError(
- message: string,
- outputFormat: string | undefined,
- ): void {
- if (outputFormat === 'stream-json') {
- const errorResult = {
- type: 'result',
- subtype: 'error_during_execution',
- duration_ms: 0,
- duration_api_ms: 0,
- is_error: true,
- num_turns: 0,
- stop_reason: null,
- session_id: getSessionId(),
- total_cost_usd: 0,
- usage: EMPTY_USAGE,
- modelUsage: {},
- permission_denials: [],
- uuid: randomUUID(),
- errors: [message],
- }
- process.stdout.write(jsonStringify(errorResult) + '\n')
- } else {
- process.stderr.write(message + '\n')
- }
- }
- /**
- * Removes an interrupted user message and its synthetic assistant sentinel
- * from the message array. Used during gateway-triggered restarts to clean up
- * the message history before re-enqueuing the interrupted prompt.
- *
- * @internal Exported for testing
- */
- export function removeInterruptedMessage(
- messages: Message[],
- interruptedUserMessage: NormalizedUserMessage,
- ): void {
- const idx = messages.findIndex(m => m.uuid === interruptedUserMessage.uuid)
- if (idx !== -1) {
- // Remove the user message and the sentinel that immediately follows it.
- // splice safely handles the case where idx is the last element.
- messages.splice(idx, 2)
- }
- }
- type LoadInitialMessagesResult = {
- messages: Message[]
- turnInterruptionState?: TurnInterruptionState
- agentSetting?: string
- }
- async function loadInitialMessages(
- setAppState: (f: (prev: AppState) => AppState) => void,
- options: {
- continue: boolean | undefined
- teleport: string | true | null | undefined
- resume: string | boolean | undefined
- resumeSessionAt: string | undefined
- forkSession: boolean | undefined
- outputFormat: string | undefined
- sessionStartHooksPromise?: ReturnType<typeof processSessionStartHooks>
- restoredWorkerState: Promise<SessionExternalMetadata | null>
- },
- ): Promise<LoadInitialMessagesResult> {
- const persistSession = !isSessionPersistenceDisabled()
- // Handle continue in print mode
- if (options.continue) {
- try {
- logEvent('tengu_continue_print', {})
- const result = await loadConversationForResume(
- undefined /* sessionId */,
- undefined /* file path */,
- )
- if (result) {
- // Match coordinator mode to the resumed session's mode
- if (feature('COORDINATOR_MODE') && coordinatorModeModule) {
- const warning = coordinatorModeModule.matchSessionMode(result.mode)
- if (warning) {
- process.stderr.write(warning + '\n')
- // Refresh agent definitions to reflect the mode switch
- const {
- getAgentDefinitionsWithOverrides,
- getActiveAgentsFromList,
- } =
- // eslint-disable-next-line @typescript-eslint/no-require-imports
- require('../tools/AgentTool/loadAgentsDir.js') as typeof import('../tools/AgentTool/loadAgentsDir.js')
- getAgentDefinitionsWithOverrides.cache.clear?.()
- const freshAgentDefs = await getAgentDefinitionsWithOverrides(
- getCwd(),
- )
- setAppState(prev => ({
- ...prev,
- agentDefinitions: {
- ...freshAgentDefs,
- allAgents: freshAgentDefs.allAgents,
- activeAgents: getActiveAgentsFromList(freshAgentDefs.allAgents),
- },
- }))
- }
- }
- // Reuse the resumed session's ID
- if (!options.forkSession) {
- if (result.sessionId) {
- switchSession(
- asSessionId(result.sessionId),
- result.fullPath ? dirname(result.fullPath) : null,
- )
- if (persistSession) {
- await resetSessionFilePointer()
- }
- }
- }
- restoreSessionStateFromLog(result, setAppState)
- // Restore session metadata so it's re-appended on exit via reAppendSessionMetadata
- restoreSessionMetadata(
- options.forkSession
- ? { ...result, worktreeSession: undefined }
- : result,
- )
- // Write mode entry for the resumed session
- if (feature('COORDINATOR_MODE') && coordinatorModeModule) {
- saveMode(
- coordinatorModeModule.isCoordinatorMode()
- ? 'coordinator'
- : 'normal',
- )
- }
- return {
- messages: result.messages,
- turnInterruptionState: result.turnInterruptionState,
- agentSetting: result.agentSetting,
- }
- }
- } catch (error) {
- logError(error)
- gracefulShutdownSync(1)
- return { messages: [] }
- }
- }
- // Handle teleport in print mode
- if (options.teleport) {
- try {
- if (!isPolicyAllowed('allow_remote_sessions')) {
- throw new Error(
- "Remote sessions are disabled by your organization's policy.",
- )
- }
- logEvent('tengu_teleport_print', {})
- if (typeof options.teleport !== 'string') {
- throw new Error('No session ID provided for teleport')
- }
- const {
- checkOutTeleportedSessionBranch,
- processMessagesForTeleportResume,
- teleportResumeCodeSession,
- validateGitState,
- } = await import('src/utils/teleport.js')
- await validateGitState()
- const teleportResult = await teleportResumeCodeSession(options.teleport)
- const { branchError } = await checkOutTeleportedSessionBranch(
- teleportResult.branch,
- )
- return {
- messages: processMessagesForTeleportResume(
- teleportResult.log,
- branchError,
- ),
- }
- } catch (error) {
- logError(error)
- gracefulShutdownSync(1)
- return { messages: [] }
- }
- }
- // Handle resume in print mode (accepts session ID or URL)
- // URLs are [ANT-ONLY]
- if (options.resume) {
- try {
- logEvent('tengu_resume_print', {})
- // In print mode - we require a valid session ID, JSONL file or URL
- const parsedSessionId = parseSessionIdentifier(
- typeof options.resume === 'string' ? options.resume : '',
- )
- if (!parsedSessionId) {
- let errorMessage =
- 'Error: --resume requires a valid session ID when used with --print. Usage: claude -p --resume <session-id>'
- if (typeof options.resume === 'string') {
- errorMessage += `. Session IDs must be in UUID format (e.g., 550e8400-e29b-41d4-a716-446655440000). Provided value "${options.resume}" is not a valid UUID`
- }
- emitLoadError(errorMessage, options.outputFormat)
- gracefulShutdownSync(1)
- return { messages: [] }
- }
- // Hydrate local transcript from remote before loading
- if (isEnvTruthy(process.env.CLAUDE_CODE_USE_CCR_V2)) {
- // Await restore alongside hydration so SSE catchup lands on
- // restored state, not a fresh default.
- const [, metadata] = await Promise.all([
- hydrateFromCCRv2InternalEvents(parsedSessionId.sessionId),
- options.restoredWorkerState,
- ])
- if (metadata) {
- setAppState(externalMetadataToAppState(metadata))
- if (typeof metadata.model === 'string') {
- setMainLoopModelOverride(metadata.model)
- }
- }
- } else if (
- parsedSessionId.isUrl &&
- parsedSessionId.ingressUrl &&
- isEnvTruthy(process.env.ENABLE_SESSION_PERSISTENCE)
- ) {
- // v1: fetch session logs from Session Ingress
- await hydrateRemoteSession(
- parsedSessionId.sessionId,
- parsedSessionId.ingressUrl,
- )
- }
- // Load the conversation with the specified session ID
- const result = await loadConversationForResume(
- parsedSessionId.sessionId,
- parsedSessionId.jsonlFile || undefined,
- )
- // hydrateFromCCRv2InternalEvents writes an empty transcript file for
- // fresh sessions (writeFile(sessionFile, '') with zero events), so
- // loadConversationForResume returns {messages: []} not null. Treat
- // empty the same as null so SessionStart still fires.
- if (!result || result.messages.length === 0) {
- // For URL-based or CCR v2 resume, start with empty session (it was hydrated but empty)
- if (
- parsedSessionId.isUrl ||
- isEnvTruthy(process.env.CLAUDE_CODE_USE_CCR_V2)
- ) {
- // Execute SessionStart hooks for startup since we're starting a new session
- return {
- messages: await (options.sessionStartHooksPromise ??
- processSessionStartHooks('startup')),
- }
- } else {
- emitLoadError(
- `No conversation found with session ID: ${parsedSessionId.sessionId}`,
- options.outputFormat,
- )
- gracefulShutdownSync(1)
- return { messages: [] }
- }
- }
- // Handle resumeSessionAt feature
- if (options.resumeSessionAt) {
- const index = result.messages.findIndex(
- m => m.uuid === options.resumeSessionAt,
- )
- if (index < 0) {
- emitLoadError(
- `No message found with message.uuid of: ${options.resumeSessionAt}`,
- options.outputFormat,
- )
- gracefulShutdownSync(1)
- return { messages: [] }
- }
- result.messages = index >= 0 ? result.messages.slice(0, index + 1) : []
- }
- // Match coordinator mode to the resumed session's mode
- if (feature('COORDINATOR_MODE') && coordinatorModeModule) {
- const warning = coordinatorModeModule.matchSessionMode(result.mode)
- if (warning) {
- process.stderr.write(warning + '\n')
- // Refresh agent definitions to reflect the mode switch
- const { getAgentDefinitionsWithOverrides, getActiveAgentsFromList } =
- // eslint-disable-next-line @typescript-eslint/no-require-imports
- require('../tools/AgentTool/loadAgentsDir.js') as typeof import('../tools/AgentTool/loadAgentsDir.js')
- getAgentDefinitionsWithOverrides.cache.clear?.()
- const freshAgentDefs = await getAgentDefinitionsWithOverrides(
- getCwd(),
- )
- setAppState(prev => ({
- ...prev,
- agentDefinitions: {
- ...freshAgentDefs,
- allAgents: freshAgentDefs.allAgents,
- activeAgents: getActiveAgentsFromList(freshAgentDefs.allAgents),
- },
- }))
- }
- }
- // Reuse the resumed session's ID
- if (!options.forkSession && result.sessionId) {
- switchSession(
- asSessionId(result.sessionId),
- result.fullPath ? dirname(result.fullPath) : null,
- )
- if (persistSession) {
- await resetSessionFilePointer()
- }
- }
- restoreSessionStateFromLog(result, setAppState)
- // Restore session metadata so it's re-appended on exit via reAppendSessionMetadata
- restoreSessionMetadata(
- options.forkSession
- ? { ...result, worktreeSession: undefined }
- : result,
- )
- // Write mode entry for the resumed session
- if (feature('COORDINATOR_MODE') && coordinatorModeModule) {
- saveMode(
- coordinatorModeModule.isCoordinatorMode() ? 'coordinator' : 'normal',
- )
- }
- return {
- messages: result.messages,
- turnInterruptionState: result.turnInterruptionState,
- agentSetting: result.agentSetting,
- }
- } catch (error) {
- logError(error)
- const errorMessage =
- error instanceof Error
- ? `Failed to resume session: ${error.message}`
- : 'Failed to resume session with --print mode'
- emitLoadError(errorMessage, options.outputFormat)
- gracefulShutdownSync(1)
- return { messages: [] }
- }
- }
- // Join the SessionStart hooks promise kicked in main.tsx (or run fresh if
- // it wasn't kicked — e.g. --continue with no prior session falls through
- // here with sessionStartHooksPromise undefined because main.tsx guards on continue)
- return {
- messages: await (options.sessionStartHooksPromise ??
- processSessionStartHooks('startup')),
- }
- }
- function getStructuredIO(
- inputPrompt: string | AsyncIterable<string>,
- options: {
- sdkUrl: string | undefined
- replayUserMessages?: boolean
- },
- ): StructuredIO {
- let inputStream: AsyncIterable<string>
- if (typeof inputPrompt === 'string') {
- if (inputPrompt.trim() !== '') {
- // Normalize to a streaming input.
- inputStream = fromArray([
- jsonStringify({
- type: 'user',
- content: inputPrompt,
- uuid: '',
- session_id: '',
- message: {
- role: 'user',
- content: inputPrompt,
- },
- parent_tool_use_id: null,
- } satisfies SDKUserMessage),
- ])
- } else {
- // Empty string - create empty stream
- inputStream = fromArray([])
- }
- } else {
- inputStream = inputPrompt
- }
- // Use RemoteIO if sdkUrl is provided, otherwise use regular StructuredIO
- return options.sdkUrl
- ? new RemoteIO(options.sdkUrl, inputStream, options.replayUserMessages)
- : new StructuredIO(inputStream, options.replayUserMessages)
- }
- /**
- * Handles unexpected permission responses by looking up the unresolved tool
- * call in the transcript and enqueuing it for execution.
- *
- * Returns true if a permission was enqueued, false otherwise.
- */
- export async function handleOrphanedPermissionResponse({
- message,
- setAppState,
- onEnqueued,
- handledToolUseIds,
- }: {
- message: SDKControlResponse
- setAppState: (f: (prev: AppState) => AppState) => void
- onEnqueued?: () => void
- handledToolUseIds: Set<string>
- }): Promise<boolean> {
- const responseInner = message.response as { subtype?: string; response?: Record<string, unknown>; request_id?: string } | undefined
- if (
- responseInner?.subtype === 'success' &&
- responseInner.response?.toolUseID &&
- typeof responseInner.response.toolUseID === 'string'
- ) {
- const permissionResult = responseInner.response as PermissionResult & { toolUseID?: string }
- const toolUseID = permissionResult.toolUseID
- if (!toolUseID) {
- return false
- }
- logForDebugging(
- `handleOrphanedPermissionResponse: received orphaned control_response for toolUseID=${toolUseID} request_id=${responseInner.request_id}`,
- )
- // Prevent re-processing the same orphaned tool_use. Without this guard,
- // duplicate control_response deliveries (e.g. from WebSocket reconnect)
- // cause the same tool to be executed multiple times, producing duplicate
- // tool_use IDs in the messages array and a 400 error from the API.
- // Once corrupted, every retry accumulates more duplicates.
- if (handledToolUseIds.has(toolUseID)) {
- logForDebugging(
- `handleOrphanedPermissionResponse: skipping duplicate orphaned permission for toolUseID=${toolUseID} (already handled)`,
- )
- return false
- }
- const assistantMessage = await findUnresolvedToolUse(toolUseID)
- if (!assistantMessage) {
- logForDebugging(
- `handleOrphanedPermissionResponse: no unresolved tool_use found for toolUseID=${toolUseID} (already resolved in transcript)`,
- )
- return false
- }
- handledToolUseIds.add(toolUseID)
- logForDebugging(
- `handleOrphanedPermissionResponse: enqueuing orphaned permission for toolUseID=${toolUseID} messageID=${assistantMessage.message.id}`,
- )
- enqueue({
- mode: 'orphaned-permission' as const,
- value: [],
- orphanedPermission: {
- permissionResult,
- assistantMessage,
- },
- })
- onEnqueued?.()
- return true
- }
- return false
- }
- export type DynamicMcpState = {
- clients: MCPServerConnection[]
- tools: Tools
- configs: Record<string, ScopedMcpServerConfig>
- }
- /**
- * Converts a process transport config to a scoped config.
- * The types are structurally compatible, so we just add the scope.
- */
- function toScopedConfig(
- config: McpServerConfigForProcessTransport,
- ): ScopedMcpServerConfig {
- // McpServerConfigForProcessTransport is a subset of McpServerConfig
- // (it excludes IDE-specific types like sse-ide and ws-ide)
- // Adding scope makes it a valid ScopedMcpServerConfig
- return { ...config, scope: 'dynamic' } as ScopedMcpServerConfig
- }
- /**
- * State for SDK MCP servers that run in the SDK process.
- */
- export type SdkMcpState = {
- configs: Record<string, McpSdkServerConfig>
- clients: MCPServerConnection[]
- tools: Tools
- }
- /**
- * Result of handleMcpSetServers - contains new state and response data.
- */
- export type McpSetServersResult = {
- response: SDKControlMcpSetServersResponse
- newSdkState: SdkMcpState
- newDynamicState: DynamicMcpState
- sdkServersChanged: boolean
- }
- /**
- * Handles mcp_set_servers requests by processing both SDK and process-based servers.
- * SDK servers run in the SDK process; process-based servers are spawned by the CLI.
- *
- * Applies enterprise allowedMcpServers/deniedMcpServers policy — same filter as
- * --mcp-config (see filterMcpServersByPolicy call in main.tsx). Without this,
- * SDK V2 Query.setMcpServers() was a second policy bypass vector. Blocked servers
- * are reported in response.errors so the SDK consumer knows why they weren't added.
- */
- export async function handleMcpSetServers(
- servers: Record<string, McpServerConfigForProcessTransport>,
- sdkState: SdkMcpState,
- dynamicState: DynamicMcpState,
- setAppState: (f: (prev: AppState) => AppState) => void,
- ): Promise<McpSetServersResult> {
- // Enforce enterprise MCP policy on process-based servers (stdio/http/sse).
- // Mirrors the --mcp-config filter in main.tsx — both user-controlled injection
- // paths must have the same gate. type:'sdk' servers are exempt (SDK-managed,
- // CLI never spawns/connects for them — see filterMcpServersByPolicy jsdoc).
- // Blocked servers go into response.errors so the SDK caller sees why.
- const { allowed: allowedServers, blocked } = filterMcpServersByPolicy(servers)
- const policyErrors: Record<string, string> = {}
- for (const name of blocked) {
- policyErrors[name] =
- 'Blocked by enterprise policy (allowedMcpServers/deniedMcpServers)'
- }
- // Separate SDK servers from process-based servers
- const sdkServers: Record<string, McpSdkServerConfig> = {}
- const processServers: Record<string, McpServerConfigForProcessTransport> = {}
- for (const [name, config] of Object.entries(allowedServers)) {
- if ((config.type as string) === 'sdk') {
- sdkServers[name] = config as unknown as McpSdkServerConfig
- } else {
- processServers[name] = config
- }
- }
- // Handle SDK servers
- const currentSdkNames = new Set(Object.keys(sdkState.configs))
- const newSdkNames = new Set(Object.keys(sdkServers))
- const sdkAdded: string[] = []
- const sdkRemoved: string[] = []
- const newSdkConfigs = { ...sdkState.configs }
- let newSdkClients = [...sdkState.clients]
- let newSdkTools = [...sdkState.tools]
- // Remove SDK servers no longer in desired state
- for (const name of currentSdkNames) {
- if (!newSdkNames.has(name)) {
- const client = newSdkClients.find(c => c.name === name)
- if (client && client.type === 'connected') {
- await client.cleanup()
- }
- newSdkClients = newSdkClients.filter(c => c.name !== name)
- const prefix = `mcp__${name}__`
- newSdkTools = newSdkTools.filter(t => !t.name.startsWith(prefix))
- delete newSdkConfigs[name]
- sdkRemoved.push(name)
- }
- }
- // Add new SDK servers as pending - they'll be upgraded to connected
- // when updateSdkMcp() runs on the next query
- for (const [name, config] of Object.entries(sdkServers)) {
- if (!currentSdkNames.has(name)) {
- newSdkConfigs[name] = config
- const pendingClient: MCPServerConnection = {
- type: 'pending',
- name,
- config: { ...config, scope: 'dynamic' as const },
- }
- newSdkClients = [...newSdkClients, pendingClient]
- sdkAdded.push(name)
- }
- }
- // Handle process-based servers
- const processResult = await reconcileMcpServers(
- processServers,
- dynamicState,
- setAppState,
- )
- return {
- response: {
- added: [...sdkAdded, ...processResult.response.added],
- removed: [...sdkRemoved, ...processResult.response.removed],
- errors: { ...policyErrors, ...processResult.response.errors },
- },
- newSdkState: {
- configs: newSdkConfigs,
- clients: newSdkClients,
- tools: newSdkTools,
- },
- newDynamicState: processResult.newState,
- sdkServersChanged: sdkAdded.length > 0 || sdkRemoved.length > 0,
- }
- }
- /**
- * Reconciles the current set of dynamic MCP servers with a new desired state.
- * Handles additions, removals, and config changes.
- */
- export async function reconcileMcpServers(
- desiredConfigs: Record<string, McpServerConfigForProcessTransport>,
- currentState: DynamicMcpState,
- setAppState: (f: (prev: AppState) => AppState) => void,
- ): Promise<{
- response: SDKControlMcpSetServersResponse
- newState: DynamicMcpState
- }> {
- const currentNames = new Set(Object.keys(currentState.configs))
- const desiredNames = new Set(Object.keys(desiredConfigs))
- const toRemove = [...currentNames].filter(n => !desiredNames.has(n))
- const toAdd = [...desiredNames].filter(n => !currentNames.has(n))
- // Check for config changes (same name, different config)
- const toCheck = [...currentNames].filter(n => desiredNames.has(n))
- const toReplace = toCheck.filter(name => {
- const currentConfig = currentState.configs[name]
- const desiredConfigRaw = desiredConfigs[name]
- if (!currentConfig || !desiredConfigRaw) return true
- const desiredConfig = toScopedConfig(desiredConfigRaw)
- return !areMcpConfigsEqual(currentConfig, desiredConfig)
- })
- const removed: string[] = []
- const added: string[] = []
- const errors: Record<string, string> = {}
- let newClients = [...currentState.clients]
- let newTools = [...currentState.tools]
- // Remove old servers (including ones being replaced)
- for (const name of [...toRemove, ...toReplace]) {
- const client = newClients.find(c => c.name === name)
- const config = currentState.configs[name]
- if (client && config) {
- if (client.type === 'connected') {
- try {
- await client.cleanup()
- } catch (e) {
- logError(e)
- }
- }
- // Clear the memoization cache
- await clearServerCache(name, config)
- }
- // Remove tools from this server
- const prefix = `mcp__${name}__`
- newTools = newTools.filter(t => !t.name.startsWith(prefix))
- // Remove from clients list
- newClients = newClients.filter(c => c.name !== name)
- // Track removal (only for actually removed, not replaced)
- if (toRemove.includes(name)) {
- removed.push(name)
- }
- }
- // Add new servers (including replacements)
- for (const name of [...toAdd, ...toReplace]) {
- const config = desiredConfigs[name]
- if (!config) continue
- const scopedConfig = toScopedConfig(config)
- // SDK servers are managed by the SDK process, not the CLI.
- // Just track them without trying to connect.
- if ((config.type as string) === 'sdk') {
- added.push(name)
- continue
- }
- try {
- const client = await connectToServer(name, scopedConfig)
- newClients.push(client)
- if (client.type === 'connected') {
- const serverTools = await fetchToolsForClient(client)
- newTools.push(...serverTools)
- } else if (client.type === 'failed') {
- errors[name] = client.error || 'Connection failed'
- }
- added.push(name)
- } catch (e) {
- const err = toError(e)
- errors[name] = err.message
- logError(err)
- }
- }
- // Build new configs
- const newConfigs: Record<string, ScopedMcpServerConfig> = {}
- for (const name of desiredNames) {
- const config = desiredConfigs[name]
- if (config) {
- newConfigs[name] = toScopedConfig(config)
- }
- }
- const newState: DynamicMcpState = {
- clients: newClients,
- tools: newTools,
- configs: newConfigs,
- }
- // Update AppState with the new tools
- setAppState(prev => {
- // Get all dynamic server names (current + new)
- const allDynamicServerNames = new Set([
- ...Object.keys(currentState.configs),
- ...Object.keys(newConfigs),
- ])
- // Remove old dynamic tools
- const nonDynamicTools = prev.mcp.tools.filter(t => {
- for (const serverName of allDynamicServerNames) {
- if (t.name.startsWith(`mcp__${serverName}__`)) {
- return false
- }
- }
- return true
- })
- // Remove old dynamic clients
- const nonDynamicClients = prev.mcp.clients.filter(c => {
- return !allDynamicServerNames.has(c.name)
- })
- return {
- ...prev,
- mcp: {
- ...prev.mcp,
- tools: [...nonDynamicTools, ...newTools],
- clients: [...nonDynamicClients, ...newClients],
- },
- }
- })
- return {
- response: { added, removed, errors },
- newState,
- }
- }
|