cronTasksLock.ts 6.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195
  1. // Scheduler lease lock for .claude/scheduled_tasks.json.
  2. //
  3. // When multiple Claude sessions run in the same project directory, only one
  4. // should drive the cron scheduler. The first session to acquire this lock
  5. // becomes the scheduler; others stay passive and periodically probe the lock.
  6. // If the owner dies (PID no longer running), a passive session takes over.
  7. //
  8. // Pattern mirrors computerUseLock.ts: O_EXCL atomic create, PID liveness
  9. // probe, stale-lock recovery, cleanup-on-exit.
  10. import { mkdir, readFile, unlink, writeFile } from 'fs/promises'
  11. import { dirname, join } from 'path'
  12. import { z } from 'zod/v4'
  13. import { getProjectRoot, getSessionId } from '../bootstrap/state.js'
  14. import { registerCleanup } from './cleanupRegistry.js'
  15. import { logForDebugging } from './debug.js'
  16. import { getErrnoCode } from './errors.js'
  17. import { isProcessRunning } from './genericProcessUtils.js'
  18. import { safeParseJSON } from './json.js'
  19. import { lazySchema } from './lazySchema.js'
  20. import { jsonStringify } from './slowOperations.js'
  21. const LOCK_FILE_REL = join('.claude', 'scheduled_tasks.lock')
  22. const schedulerLockSchema = lazySchema(() =>
  23. z.object({
  24. sessionId: z.string(),
  25. pid: z.number(),
  26. acquiredAt: z.number(),
  27. }),
  28. )
  29. type SchedulerLock = z.infer<ReturnType<typeof schedulerLockSchema>>
  30. /**
  31. * Options for out-of-REPL callers (Agent SDK daemon) that don't have
  32. * bootstrap state. When omitted, falls back to getProjectRoot() +
  33. * getSessionId() as before. lockIdentity should be stable for the lifetime
  34. * of one daemon process (e.g. a randomUUID() captured at startup).
  35. */
  36. export type SchedulerLockOptions = {
  37. dir?: string
  38. lockIdentity?: string
  39. }
  40. let unregisterCleanup: (() => void) | undefined
  41. // Suppress repeat "held by X" log lines when polling a live owner.
  42. let lastBlockedBy: string | undefined
  43. function getLockPath(dir?: string): string {
  44. return join(dir ?? getProjectRoot(), LOCK_FILE_REL)
  45. }
  46. async function readLock(dir?: string): Promise<SchedulerLock | undefined> {
  47. let raw: string
  48. try {
  49. raw = await readFile(getLockPath(dir), 'utf8')
  50. } catch {
  51. return undefined
  52. }
  53. const result = schedulerLockSchema().safeParse(safeParseJSON(raw, false))
  54. return result.success ? result.data : undefined
  55. }
  56. async function tryCreateExclusive(
  57. lock: SchedulerLock,
  58. dir?: string,
  59. ): Promise<boolean> {
  60. const path = getLockPath(dir)
  61. const body = jsonStringify(lock)
  62. try {
  63. await writeFile(path, body, { flag: 'wx' })
  64. return true
  65. } catch (e: unknown) {
  66. const code = getErrnoCode(e)
  67. if (code === 'EEXIST') return false
  68. if (code === 'ENOENT') {
  69. // .claude/ doesn't exist yet — create it and retry once. In steady
  70. // state the dir already exists (scheduled_tasks.json lives there),
  71. // so this path is hit at most once.
  72. await mkdir(dirname(path), { recursive: true })
  73. try {
  74. await writeFile(path, body, { flag: 'wx' })
  75. return true
  76. } catch (retryErr: unknown) {
  77. if (getErrnoCode(retryErr) === 'EEXIST') return false
  78. throw retryErr
  79. }
  80. }
  81. throw e
  82. }
  83. }
  84. function registerLockCleanup(opts?: SchedulerLockOptions): void {
  85. unregisterCleanup?.()
  86. unregisterCleanup = registerCleanup(async () => {
  87. await releaseSchedulerLock(opts)
  88. })
  89. }
  90. /**
  91. * Try to acquire the scheduler lock for the current session.
  92. * Returns true on success, false if another live session holds it.
  93. *
  94. * Uses O_EXCL ('wx') for atomic test-and-set. If the file exists:
  95. * - Already ours → true (idempotent re-acquire)
  96. * - Another live PID → false
  97. * - Stale (PID dead / corrupt) → unlink and retry exclusive create once
  98. *
  99. * If two sessions race to recover a stale lock, only one create succeeds.
  100. */
  101. export async function tryAcquireSchedulerLock(
  102. opts?: SchedulerLockOptions,
  103. ): Promise<boolean> {
  104. const dir = opts?.dir
  105. // "sessionId" in the lock file is really just a stable owner key. REPL
  106. // uses getSessionId(); daemon callers supply their own UUID. PID remains
  107. // the liveness signal regardless.
  108. const sessionId = opts?.lockIdentity ?? getSessionId()
  109. const lock: SchedulerLock = {
  110. sessionId,
  111. pid: process.pid,
  112. acquiredAt: Date.now(),
  113. }
  114. if (await tryCreateExclusive(lock, dir)) {
  115. lastBlockedBy = undefined
  116. registerLockCleanup(opts)
  117. logForDebugging(
  118. `[ScheduledTasks] acquired scheduler lock (PID ${process.pid})`,
  119. )
  120. return true
  121. }
  122. const existing = await readLock(dir)
  123. // Already ours (idempotent). After --resume the session ID is restored
  124. // but the process has a new PID — update the lock file so other sessions
  125. // see a live PID and don't steal it.
  126. if (existing?.sessionId === sessionId) {
  127. if (existing.pid !== process.pid) {
  128. await writeFile(getLockPath(dir), jsonStringify(lock))
  129. registerLockCleanup(opts)
  130. }
  131. return true
  132. }
  133. // Corrupt or unparseable — treat as stale.
  134. // Another live session — blocked.
  135. if (existing && isProcessRunning(existing.pid)) {
  136. if (lastBlockedBy !== existing.sessionId) {
  137. lastBlockedBy = existing.sessionId
  138. logForDebugging(
  139. `[ScheduledTasks] scheduler lock held by session ${existing.sessionId} (PID ${existing.pid})`,
  140. )
  141. }
  142. return false
  143. }
  144. // Stale — unlink and retry the exclusive create once.
  145. if (existing) {
  146. logForDebugging(
  147. `[ScheduledTasks] recovering stale scheduler lock from PID ${existing.pid}`,
  148. )
  149. }
  150. await unlink(getLockPath(dir)).catch(() => {})
  151. if (await tryCreateExclusive(lock, dir)) {
  152. lastBlockedBy = undefined
  153. registerLockCleanup(opts)
  154. return true
  155. }
  156. // Another session won the recovery race.
  157. return false
  158. }
  159. /**
  160. * Release the scheduler lock if the current session owns it.
  161. */
  162. export async function releaseSchedulerLock(
  163. opts?: SchedulerLockOptions,
  164. ): Promise<void> {
  165. unregisterCleanup?.()
  166. unregisterCleanup = undefined
  167. lastBlockedBy = undefined
  168. const dir = opts?.dir
  169. const sessionId = opts?.lockIdentity ?? getSessionId()
  170. const existing = await readLock(dir)
  171. if (!existing || existing.sessionId !== sessionId) return
  172. try {
  173. await unlink(getLockPath(dir))
  174. logForDebugging('[ScheduledTasks] released scheduler lock')
  175. } catch {
  176. // Already gone.
  177. }
  178. }