| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275 |
- import { jsonStringify } from '../../utils/slowOperations.js'
- /**
- * Serial ordered event uploader with batching, retry, and backpressure.
- *
- * - enqueue() adds events to a pending buffer
- * - At most 1 POST in-flight at a time
- * - Drains up to maxBatchSize items per POST
- * - New events accumulate while in-flight
- * - On failure: exponential backoff (clamped), retries indefinitely
- * until success or close() — unless maxConsecutiveFailures is set,
- * in which case the failing batch is dropped and drain advances
- * - flush() blocks until pending is empty and kicks drain if needed
- * - Backpressure: enqueue() blocks when maxQueueSize is reached
- */
- /**
- * Throw from config.send() to make the uploader wait a server-supplied
- * duration before retrying (e.g. 429 with Retry-After). When retryAfterMs
- * is set, it overrides exponential backoff for that attempt — clamped to
- * [baseDelayMs, maxDelayMs] and jittered so a misbehaving server can
- * neither hot-loop nor stall the client, and many sessions sharing a rate
- * limit don't all pounce at the same instant. Without retryAfterMs, behaves
- * like any other thrown error (exponential backoff).
- */
- export class RetryableError extends Error {
- constructor(
- message: string,
- readonly retryAfterMs?: number,
- ) {
- super(message)
- }
- }
- type SerialBatchEventUploaderConfig<T> = {
- /** Max items per POST (1 = no batching) */
- maxBatchSize: number
- /**
- * Max serialized bytes per POST. First item always goes in regardless of
- * size; subsequent items only if cumulative JSON bytes stay under this.
- * Undefined = no byte limit (count-only batching).
- */
- maxBatchBytes?: number
- /** Max pending items before enqueue() blocks */
- maxQueueSize: number
- /** The actual HTTP call — caller controls payload format */
- send: (batch: T[]) => Promise<void>
- /** Base delay for exponential backoff (ms) */
- baseDelayMs: number
- /** Max delay cap (ms) */
- maxDelayMs: number
- /** Random jitter range added to retry delay (ms) */
- jitterMs: number
- /**
- * After this many consecutive send() failures, drop the failing batch
- * and move on to the next pending item with a fresh failure budget.
- * Undefined = retry indefinitely (default).
- */
- maxConsecutiveFailures?: number
- /** Called when a batch is dropped for hitting maxConsecutiveFailures. */
- onBatchDropped?: (batchSize: number, failures: number) => void
- }
- export class SerialBatchEventUploader<T> {
- private pending: T[] = []
- private pendingAtClose = 0
- private draining = false
- private closed = false
- private backpressureResolvers: Array<() => void> = []
- private sleepResolve: (() => void) | null = null
- private flushResolvers: Array<() => void> = []
- private droppedBatches = 0
- private readonly config: SerialBatchEventUploaderConfig<T>
- constructor(config: SerialBatchEventUploaderConfig<T>) {
- this.config = config
- }
- /**
- * Monotonic count of batches dropped via maxConsecutiveFailures. Callers
- * can snapshot before flush() and compare after to detect silent drops
- * (flush() resolves normally even when batches were dropped).
- */
- get droppedBatchCount(): number {
- return this.droppedBatches
- }
- /**
- * Pending queue depth. After close(), returns the count at close time —
- * close() clears the queue but shutdown diagnostics may read this after.
- */
- get pendingCount(): number {
- return this.closed ? this.pendingAtClose : this.pending.length
- }
- /**
- * Add events to the pending buffer. Returns immediately if space is
- * available. Blocks (awaits) if the buffer is full — caller pauses
- * until drain frees space.
- */
- async enqueue(events: T | T[]): Promise<void> {
- if (this.closed) return
- const items = Array.isArray(events) ? events : [events]
- if (items.length === 0) return
- // Backpressure: wait until there's space
- while (
- this.pending.length + items.length > this.config.maxQueueSize &&
- !this.closed
- ) {
- await new Promise<void>(resolve => {
- this.backpressureResolvers.push(resolve)
- })
- }
- if (this.closed) return
- this.pending.push(...items)
- void this.drain()
- }
- /**
- * Block until all pending events have been sent.
- * Used at turn boundaries and graceful shutdown.
- */
- flush(): Promise<void> {
- if (this.pending.length === 0 && !this.draining) {
- return Promise.resolve()
- }
- void this.drain()
- return new Promise<void>(resolve => {
- this.flushResolvers.push(resolve)
- })
- }
- /**
- * Drop pending events and stop processing.
- * Resolves any blocked enqueue() and flush() callers.
- */
- close(): void {
- if (this.closed) return
- this.closed = true
- this.pendingAtClose = this.pending.length
- this.pending = []
- this.sleepResolve?.()
- this.sleepResolve = null
- for (const resolve of this.backpressureResolvers) resolve()
- this.backpressureResolvers = []
- for (const resolve of this.flushResolvers) resolve()
- this.flushResolvers = []
- }
- /**
- * Drain loop. At most one instance runs at a time (guarded by this.draining).
- * Sends batches serially. On failure, backs off and retries indefinitely.
- */
- private async drain(): Promise<void> {
- if (this.draining || this.closed) return
- this.draining = true
- let failures = 0
- try {
- while (this.pending.length > 0 && !this.closed) {
- const batch = this.takeBatch()
- if (batch.length === 0) continue
- try {
- await this.config.send(batch)
- failures = 0
- } catch (err) {
- failures++
- if (
- this.config.maxConsecutiveFailures !== undefined &&
- failures >= this.config.maxConsecutiveFailures
- ) {
- this.droppedBatches++
- this.config.onBatchDropped?.(batch.length, failures)
- failures = 0
- this.releaseBackpressure()
- continue
- }
- // Re-queue the failed batch at the front. Use concat (single
- // allocation) instead of unshift(...batch) which shifts every
- // pending item batch.length times. Only hit on failure path.
- this.pending = batch.concat(this.pending)
- const retryAfterMs =
- err instanceof RetryableError ? err.retryAfterMs : undefined
- await this.sleep(this.retryDelay(failures, retryAfterMs))
- continue
- }
- // Release backpressure waiters if space opened up
- this.releaseBackpressure()
- }
- } finally {
- this.draining = false
- // Notify flush waiters if queue is empty
- if (this.pending.length === 0) {
- for (const resolve of this.flushResolvers) resolve()
- this.flushResolvers = []
- }
- }
- }
- /**
- * Pull the next batch from pending. Respects both maxBatchSize and
- * maxBatchBytes. The first item is always taken; subsequent items only
- * if adding them keeps the cumulative JSON size under maxBatchBytes.
- *
- * Un-serializable items (BigInt, circular refs, throwing toJSON) are
- * dropped in place — they can never be sent and leaving them at
- * pending[0] would poison the queue and hang flush() forever.
- */
- private takeBatch(): T[] {
- const { maxBatchSize, maxBatchBytes } = this.config
- if (maxBatchBytes === undefined) {
- return this.pending.splice(0, maxBatchSize)
- }
- let bytes = 0
- let count = 0
- while (count < this.pending.length && count < maxBatchSize) {
- let itemBytes: number
- try {
- itemBytes = Buffer.byteLength(jsonStringify(this.pending[count]))
- } catch {
- this.pending.splice(count, 1)
- continue
- }
- if (count > 0 && bytes + itemBytes > maxBatchBytes) break
- bytes += itemBytes
- count++
- }
- return this.pending.splice(0, count)
- }
- private retryDelay(failures: number, retryAfterMs?: number): number {
- const jitter = Math.random() * this.config.jitterMs
- if (retryAfterMs !== undefined) {
- // Jitter on top of the server's hint prevents thundering herd when
- // many sessions share a rate limit and all receive the same
- // Retry-After. Clamp first, then spread — same shape as the
- // exponential path (effective ceiling is maxDelayMs + jitterMs).
- const clamped = Math.max(
- this.config.baseDelayMs,
- Math.min(retryAfterMs, this.config.maxDelayMs),
- )
- return clamped + jitter
- }
- const exponential = Math.min(
- this.config.baseDelayMs * 2 ** (failures - 1),
- this.config.maxDelayMs,
- )
- return exponential + jitter
- }
- private releaseBackpressure(): void {
- const resolvers = this.backpressureResolvers
- this.backpressureResolvers = []
- for (const resolve of resolvers) resolve()
- }
- private sleep(ms: number): Promise<void> {
- return new Promise(resolve => {
- this.sleepResolve = resolve
- setTimeout(
- (self, resolve) => {
- self.sleepResolve = null
- resolve()
- },
- ms,
- this,
- resolve,
- )
- })
- }
- }
|