SerialBatchEventUploader.ts 8.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275
  1. import { jsonStringify } from '../../utils/slowOperations.js'
  2. /**
  3. * Serial ordered event uploader with batching, retry, and backpressure.
  4. *
  5. * - enqueue() adds events to a pending buffer
  6. * - At most 1 POST in-flight at a time
  7. * - Drains up to maxBatchSize items per POST
  8. * - New events accumulate while in-flight
  9. * - On failure: exponential backoff (clamped), retries indefinitely
  10. * until success or close() — unless maxConsecutiveFailures is set,
  11. * in which case the failing batch is dropped and drain advances
  12. * - flush() blocks until pending is empty and kicks drain if needed
  13. * - Backpressure: enqueue() blocks when maxQueueSize is reached
  14. */
  15. /**
  16. * Throw from config.send() to make the uploader wait a server-supplied
  17. * duration before retrying (e.g. 429 with Retry-After). When retryAfterMs
  18. * is set, it overrides exponential backoff for that attempt — clamped to
  19. * [baseDelayMs, maxDelayMs] and jittered so a misbehaving server can
  20. * neither hot-loop nor stall the client, and many sessions sharing a rate
  21. * limit don't all pounce at the same instant. Without retryAfterMs, behaves
  22. * like any other thrown error (exponential backoff).
  23. */
  24. export class RetryableError extends Error {
  25. constructor(
  26. message: string,
  27. readonly retryAfterMs?: number,
  28. ) {
  29. super(message)
  30. }
  31. }
  32. type SerialBatchEventUploaderConfig<T> = {
  33. /** Max items per POST (1 = no batching) */
  34. maxBatchSize: number
  35. /**
  36. * Max serialized bytes per POST. First item always goes in regardless of
  37. * size; subsequent items only if cumulative JSON bytes stay under this.
  38. * Undefined = no byte limit (count-only batching).
  39. */
  40. maxBatchBytes?: number
  41. /** Max pending items before enqueue() blocks */
  42. maxQueueSize: number
  43. /** The actual HTTP call — caller controls payload format */
  44. send: (batch: T[]) => Promise<void>
  45. /** Base delay for exponential backoff (ms) */
  46. baseDelayMs: number
  47. /** Max delay cap (ms) */
  48. maxDelayMs: number
  49. /** Random jitter range added to retry delay (ms) */
  50. jitterMs: number
  51. /**
  52. * After this many consecutive send() failures, drop the failing batch
  53. * and move on to the next pending item with a fresh failure budget.
  54. * Undefined = retry indefinitely (default).
  55. */
  56. maxConsecutiveFailures?: number
  57. /** Called when a batch is dropped for hitting maxConsecutiveFailures. */
  58. onBatchDropped?: (batchSize: number, failures: number) => void
  59. }
  60. export class SerialBatchEventUploader<T> {
  61. private pending: T[] = []
  62. private pendingAtClose = 0
  63. private draining = false
  64. private closed = false
  65. private backpressureResolvers: Array<() => void> = []
  66. private sleepResolve: (() => void) | null = null
  67. private flushResolvers: Array<() => void> = []
  68. private droppedBatches = 0
  69. private readonly config: SerialBatchEventUploaderConfig<T>
  70. constructor(config: SerialBatchEventUploaderConfig<T>) {
  71. this.config = config
  72. }
  73. /**
  74. * Monotonic count of batches dropped via maxConsecutiveFailures. Callers
  75. * can snapshot before flush() and compare after to detect silent drops
  76. * (flush() resolves normally even when batches were dropped).
  77. */
  78. get droppedBatchCount(): number {
  79. return this.droppedBatches
  80. }
  81. /**
  82. * Pending queue depth. After close(), returns the count at close time —
  83. * close() clears the queue but shutdown diagnostics may read this after.
  84. */
  85. get pendingCount(): number {
  86. return this.closed ? this.pendingAtClose : this.pending.length
  87. }
  88. /**
  89. * Add events to the pending buffer. Returns immediately if space is
  90. * available. Blocks (awaits) if the buffer is full — caller pauses
  91. * until drain frees space.
  92. */
  93. async enqueue(events: T | T[]): Promise<void> {
  94. if (this.closed) return
  95. const items = Array.isArray(events) ? events : [events]
  96. if (items.length === 0) return
  97. // Backpressure: wait until there's space
  98. while (
  99. this.pending.length + items.length > this.config.maxQueueSize &&
  100. !this.closed
  101. ) {
  102. await new Promise<void>(resolve => {
  103. this.backpressureResolvers.push(resolve)
  104. })
  105. }
  106. if (this.closed) return
  107. this.pending.push(...items)
  108. void this.drain()
  109. }
  110. /**
  111. * Block until all pending events have been sent.
  112. * Used at turn boundaries and graceful shutdown.
  113. */
  114. flush(): Promise<void> {
  115. if (this.pending.length === 0 && !this.draining) {
  116. return Promise.resolve()
  117. }
  118. void this.drain()
  119. return new Promise<void>(resolve => {
  120. this.flushResolvers.push(resolve)
  121. })
  122. }
  123. /**
  124. * Drop pending events and stop processing.
  125. * Resolves any blocked enqueue() and flush() callers.
  126. */
  127. close(): void {
  128. if (this.closed) return
  129. this.closed = true
  130. this.pendingAtClose = this.pending.length
  131. this.pending = []
  132. this.sleepResolve?.()
  133. this.sleepResolve = null
  134. for (const resolve of this.backpressureResolvers) resolve()
  135. this.backpressureResolvers = []
  136. for (const resolve of this.flushResolvers) resolve()
  137. this.flushResolvers = []
  138. }
  139. /**
  140. * Drain loop. At most one instance runs at a time (guarded by this.draining).
  141. * Sends batches serially. On failure, backs off and retries indefinitely.
  142. */
  143. private async drain(): Promise<void> {
  144. if (this.draining || this.closed) return
  145. this.draining = true
  146. let failures = 0
  147. try {
  148. while (this.pending.length > 0 && !this.closed) {
  149. const batch = this.takeBatch()
  150. if (batch.length === 0) continue
  151. try {
  152. await this.config.send(batch)
  153. failures = 0
  154. } catch (err) {
  155. failures++
  156. if (
  157. this.config.maxConsecutiveFailures !== undefined &&
  158. failures >= this.config.maxConsecutiveFailures
  159. ) {
  160. this.droppedBatches++
  161. this.config.onBatchDropped?.(batch.length, failures)
  162. failures = 0
  163. this.releaseBackpressure()
  164. continue
  165. }
  166. // Re-queue the failed batch at the front. Use concat (single
  167. // allocation) instead of unshift(...batch) which shifts every
  168. // pending item batch.length times. Only hit on failure path.
  169. this.pending = batch.concat(this.pending)
  170. const retryAfterMs =
  171. err instanceof RetryableError ? err.retryAfterMs : undefined
  172. await this.sleep(this.retryDelay(failures, retryAfterMs))
  173. continue
  174. }
  175. // Release backpressure waiters if space opened up
  176. this.releaseBackpressure()
  177. }
  178. } finally {
  179. this.draining = false
  180. // Notify flush waiters if queue is empty
  181. if (this.pending.length === 0) {
  182. for (const resolve of this.flushResolvers) resolve()
  183. this.flushResolvers = []
  184. }
  185. }
  186. }
  187. /**
  188. * Pull the next batch from pending. Respects both maxBatchSize and
  189. * maxBatchBytes. The first item is always taken; subsequent items only
  190. * if adding them keeps the cumulative JSON size under maxBatchBytes.
  191. *
  192. * Un-serializable items (BigInt, circular refs, throwing toJSON) are
  193. * dropped in place — they can never be sent and leaving them at
  194. * pending[0] would poison the queue and hang flush() forever.
  195. */
  196. private takeBatch(): T[] {
  197. const { maxBatchSize, maxBatchBytes } = this.config
  198. if (maxBatchBytes === undefined) {
  199. return this.pending.splice(0, maxBatchSize)
  200. }
  201. let bytes = 0
  202. let count = 0
  203. while (count < this.pending.length && count < maxBatchSize) {
  204. let itemBytes: number
  205. try {
  206. itemBytes = Buffer.byteLength(jsonStringify(this.pending[count]))
  207. } catch {
  208. this.pending.splice(count, 1)
  209. continue
  210. }
  211. if (count > 0 && bytes + itemBytes > maxBatchBytes) break
  212. bytes += itemBytes
  213. count++
  214. }
  215. return this.pending.splice(0, count)
  216. }
  217. private retryDelay(failures: number, retryAfterMs?: number): number {
  218. const jitter = Math.random() * this.config.jitterMs
  219. if (retryAfterMs !== undefined) {
  220. // Jitter on top of the server's hint prevents thundering herd when
  221. // many sessions share a rate limit and all receive the same
  222. // Retry-After. Clamp first, then spread — same shape as the
  223. // exponential path (effective ceiling is maxDelayMs + jitterMs).
  224. const clamped = Math.max(
  225. this.config.baseDelayMs,
  226. Math.min(retryAfterMs, this.config.maxDelayMs),
  227. )
  228. return clamped + jitter
  229. }
  230. const exponential = Math.min(
  231. this.config.baseDelayMs * 2 ** (failures - 1),
  232. this.config.maxDelayMs,
  233. )
  234. return exponential + jitter
  235. }
  236. private releaseBackpressure(): void {
  237. const resolvers = this.backpressureResolvers
  238. this.backpressureResolvers = []
  239. for (const resolve of resolvers) resolve()
  240. }
  241. private sleep(ms: number): Promise<void> {
  242. return new Promise(resolve => {
  243. this.sleepResolve = resolve
  244. setTimeout(
  245. (self, resolve) => {
  246. self.sleepResolve = null
  247. resolve()
  248. },
  249. ms,
  250. this,
  251. resolve,
  252. )
  253. })
  254. }
  255. }