Activity Stream
The activity stream is Polyant’s in-process pub/sub bus for operational events. Every interesting moment in the engine — an inbound message, a tool call, a memory write, a scheduled task firing, a webhook match — is emitted as a structured event onto a single shared bus. Subscribers (today: the SSE controller that feeds the Activity admin panel) attach to that bus and receive events synchronously.
It is deliberately small and unfancy: one process, one EventEmitter, a bounded ring buffer for late subscribers, no persistence. If you need durable history, look at Audit Logs or Conversations.
Design goals
- Zero coupling to consumers. Producers don’t know who is listening; they call an emitter function and move on.
- Synchronous emit. No queueing, no async hops between producer and bus. A producer that has work to do is never blocked waiting on a subscriber.
- Late-subscriber friendly. Reconnecting clients see a small replay of recent events so the UI doesn’t appear empty.
- Bounded memory. The ring buffer caps at 100 events; a crashed subscriber can’t leak memory.
Topology
PRODUCERS
┌──────────────────────────────────────────────────────────┐
│ │
│ pipeline.ts ──► emitInbound │
│ channel-manager.ts ──► emitOutbound │
│ extractor.ts ──► emitMemory │
│ scheduler.service.ts ──► emitCron │
│ server/webhooks/webhook.controller.ts ──► emitWebhook │
│ agent-invoke.helpers.ts ──► emitAgentHandoff(start/end) │
│ pipeline.ts ──► emitConversation │
│ │
│ ai-gateway (chat/chatStream) ──► bus-emitter (LLM tap) │
│ live tool-call + reasoning events │
│ │
└──────────────────────────────────────────────────────────┘
│
▼
┌──────────────────────────┐
│ activity-bus.ts │
│ ────────────────────── │
│ • EventEmitter │
│ • Ring buffer (100) │
│ • subscribe(handler) │
└──────────────────────────┘
│
┌─────────────────┴─────────────────┐
▼ ▼
┌─────────────────────┐ ┌────────────────────┐
│ activity-stream │ │ (future subscribers)│
│ .controller.ts │ │ metrics, audit, │
│ GET /api/activity- │ │ external sinks │
│ stream/live (SSE) │ └────────────────────┘
└─────────────────────┘
│
▼
Admin panel /activityTwo emit paths
1. Per-category emitters
Each producer calls a small typed helper from packages/engine/src/activity-stream/emitters/. There is one file per category and the name maps 1:1 to the event kind seen in the UI.
| Emitter | Called from | Kind |
|---|---|---|
emit-inbound.ts | packages/engine/src/pipeline.ts | inbound |
emit-outbound.ts | packages/engine/src/channels/channel-manager.ts | outbound |
emit-memory.ts | packages/engine/src/memory/extractor.ts | memory |
emit-cron.ts | packages/engine/src/scheduled-tasks/scheduler.service.ts | cron |
emit-webhook.ts | packages/engine/src/server/webhooks/webhook.controller.ts | webhook |
emit-agent-handoff.ts | packages/engine/src/agents/.../agent-invoke.helpers.ts | agent-handoff (start + end) |
emit-conversation.ts | packages/engine/src/pipeline.ts | conversation |
All seven helpers funnel into the single bus via activityBus.publish(event).
2. The LLM tap
The supervisor’s reasoning is the most useful surface to watch, but the supervisor doesn’t know about the activity bus — it just calls aiGateway.chat() or aiGateway.chatStream(). Rather than thread emit-calls through agent code, the gateway is wrapped by packages/engine/src/activity-stream/bus-emitter.ts:
- Streaming path.
chatStream()returns anAsyncIterablewhosefullStreamis intercepted. Each tool-call delta and each reasoning chunk is published as atoolorthinkingevent in flight. - Non-streaming path.
chat()returns the finalresponse.steps[]array; the emitter replays the steps as a batch oftoolevents after the call completes.
This means every supervisor turn — and every nested sub-agent turn invoked through the same gateway — appears in the Activity feed without any per-tool wiring.
The bus itself
packages/engine/src/activity-stream/activity-bus.ts is ~80 lines. The contract is:
interface ActivityBus {
publish(event: ActivityEvent): void; // synchronous fan-out
subscribe(handler: (e: ActivityEvent) => void): () => void; // returns unsubscribe
replay(): ActivityEvent[]; // last ≤100 events, in order
}The implementation is a singleton (export const activityBus = new ActivityBus()) built on a node EventEmitter plus an in-memory array trimmed to BUFFER_SIZE = 100. Publishing is fire-and-forget: it never throws, never awaits, and never knows whether anyone is listening.
The buffer is global, not per-subscriber. A new subscriber receives the replay synchronously inside subscribe(), then a steady stream of live events from that point on.
Why not a queue?
The bus is intentionally not durable, not transactional, not back-pressuring. It is an observability surface, not a delivery channel. Durable work (memory extraction, scheduled task runs, webhook backlog) all has its own persistent queue table elsewhere. If the activity stream drops an event because a subscriber is slow, nothing in the system breaks — the operator just doesn’t see one row in the live view.
See also
- Activity (admin panel) — the UI fed by this bus.
- REST API reference — SSE endpoint — the
GET /api/activity-stream/livecontract.
Code references
packages/engine/src/activity-stream/activity-bus.ts— the bus.packages/engine/src/activity-stream/bus-emitter.ts— the LLM tap.packages/engine/src/activity-stream/emitters/*.ts— seven per-category emitters.packages/engine/src/activity-stream/activity-stream.controller.ts— SSE endpoint (GET /api/activity-stream/live).packages/engine/src/activity-stream/event-formatters.ts— domain-event → SSE payload mapping.