flushGate.ts 1.9 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071
  1. /**
  2. * State machine for gating message writes during an initial flush.
  3. *
  4. * When a bridge session starts, historical messages are flushed to the
  5. * server via a single HTTP POST. During that flush, new messages must
  6. * be queued to prevent them from arriving at the server interleaved
  7. * with the historical messages.
  8. *
  9. * Lifecycle:
  10. * start() → enqueue() returns true, items are queued
  11. * end() → returns queued items for draining, enqueue() returns false
  12. * drop() → discards queued items (permanent transport close)
  13. * deactivate() → clears active flag without dropping items
  14. * (transport replacement — new transport will drain)
  15. */
  16. export class FlushGate<T> {
  17. private _active = false
  18. private _pending: T[] = []
  19. get active(): boolean {
  20. return this._active
  21. }
  22. get pendingCount(): number {
  23. return this._pending.length
  24. }
  25. /** Mark flush as in-progress. enqueue() will start queuing items. */
  26. start(): void {
  27. this._active = true
  28. }
  29. /**
  30. * End the flush and return any queued items for draining.
  31. * Caller is responsible for sending the returned items.
  32. */
  33. end(): T[] {
  34. this._active = false
  35. return this._pending.splice(0)
  36. }
  37. /**
  38. * If flush is active, queue the items and return true.
  39. * If flush is not active, return false (caller should send directly).
  40. */
  41. enqueue(...items: T[]): boolean {
  42. if (!this._active) return false
  43. this._pending.push(...items)
  44. return true
  45. }
  46. /**
  47. * Discard all queued items (permanent transport close).
  48. * Returns the number of items dropped.
  49. */
  50. drop(): number {
  51. this._active = false
  52. const count = this._pending.length
  53. this._pending.length = 0
  54. return count
  55. }
  56. /**
  57. * Clear the active flag without dropping queued items.
  58. * Used when the transport is replaced (onWorkReceived) — the new
  59. * transport's flush will drain the pending items.
  60. */
  61. deactivate(): void {
  62. this._active = false
  63. }
  64. }