iii-stream
v0.14.0-next.1Build durable streams for real-time data subscriptions.
skill doc
React to stream item changes and subscriber lifecycle
how-toWhen to use
The iii-stream worker exposes three reactive trigger types — registered through iii.registerTrigger({ type, function_id, config }) rather than through stream::*-style engine functions. They cover two distinct concerns: data changes to stream items, and subscriber lifecycle events on WebSocket clients.
| Trigger type | Fires on | Typical use |
|---|---|---|
stream |
Successful stream::set, stream::update, stream::delete, or stream::send matching the configured filter. |
Server-side change watcher: derived projections, audit logs, downstream notifications. |
stream:join |
A WebSocket client subscribes to a (stream_name, group_id, [id]). |
Authorize the subscription (return { unauthorized: true } to reject) and/or record per-connection setup. |
stream:leave |
A WebSocket client unsubscribes (explicit leave message or socket close). |
Cleanup paired with stream:join: decrement counters, release per-subscription resources, broadcast "user-left" signals. |
Reach for these when:
- A stream change should kick off side effects in another function (audit logs, projections, notifications) without polling
stream::list. - You need to gate WebSocket subscriptions server-side — verifying that the connection's
auth_functioncontext has access to the requested(stream_name, group_id)— rather than trusting client-side filtering. - You want server-side reactions to subscriber connect/disconnect for presence counters, per-subscription rate buckets, or audit trails.
Use stream::get instead when you only need to pull the current value on demand. Use stream::send inside a trigger handler when the side effect is "broadcast a transient notification" — set already broadcasts; send lets you emit an additional, distinct event.
Prerequisite: the iii-stream worker must be enabled in config.yaml. Handlers and triggers are registered from a connected worker via iii.registerFunction and iii.registerTrigger — not through the stream::* engine functions.
Inputs
Registration is the same two-step pattern for all three trigger types: define the handler function, then bind it to the trigger.
Handler function
Register any function id. The handler receives the event payload documented in Outputs (same shape the engine passes to condition_function_id when configured).
// iii.registerFunction — handler id only; no engine payload.
{ "id": "presence::on-change" }stream trigger registration (data-change)
{
"type": "stream", // required. Must be exactly "stream".
"function_id": "presence::on-change", // required. Handler invoked when the trigger matches.
"config": {
"stream_name": "presence", // required. Non-empty. Registration fails synchronously if missing.
"group_id": "room-1", // optional. Empty string or omitted = match every group within the stream.
"item_id": "user-123", // optional. Empty string or omitted = match every item within the (filtered) group.
"condition_function_id": "auth::should-fire" // optional. Engine invokes this with the event; handler runs only on truthy.
}
}type, function_id, and config.stream_name are required. stream_name cannot be empty — the worker indexes triggers by stream name and unwraps the field, so registration without it fails synchronously. group_id and item_id use empty-string-as-wildcard semantics: an omitted or empty value means "match anything." The match is exact equality otherwise — no glob, no prefix matching.
| Config field | When omitted or empty | When set |
|---|---|---|
group_id |
Matches every group in the stream | Matches only events whose group_id equals this string |
item_id |
Matches every item in the matched group(s) | Matches only events whose id (the item id) equals this string |
condition_function_id |
Handler runs whenever stream/group/item match | Engine invokes the named function with the event; handler is skipped on false or condition error |
stream:join trigger registration (subscription start, with optional auth gate)
{
"type": "stream:join", // required.
"function_id": "presence::on-join", // required.
"config": {} // takes no fields. Branch inside the handler on the event's stream_name/group_id/id.
}stream:join takes no config fields. It fires for every subscription on every stream. To narrow, branch inside the handler.
The handler's return value is special — { "unauthorized": true } rejects the subscription before any data flows; anything else (or no return) lets the subscription proceed. See Outputs below.
stream:leave trigger registration (subscription teardown)
{
"type": "stream:leave",
"function_id": "presence::on-leave",
"config": {} // takes no fields. Same shape as stream:join's config.
}stream:leave also takes no config and is not an authorization gate — by the time the handler fires, the subscription is already gone. The return value is ignored.
Outputs
The handler payloads differ between the data-change trigger and the lifecycle pair.
stream event payload (StreamWrapperMessage)
{
"type": "stream", // always the literal "stream"
"timestamp": 1716220800000, // milliseconds since epoch (UTC)
"streamName": "presence", // camelCase — the source uses serde rename
"groupId": "room-1",
"id": "user-123", // null on stream::send calls that omit the `id` field (group-wide broadcast)
"event": {
"type": "create", // discriminant: "create" | "update" | "delete" | "event"
"data": { "online": true, "name": "Alice" }
}
}- The wrapper field names use camelCase (
streamName,groupId) because they're declared with#[serde(rename = ...)]in the source. event.typeis the discriminant. The four shapes:"create"— fired bystream::set/stream::updatewhen the item didn't exist before.event.datais the new value."update"— fired bystream::set/stream::updatewhen the item existed.event.datais the new value."delete"— fired bystream::delete.event.datais the value that was removed."event"— fired bystream::send.event.datais wrapped one extra level:{ type:., data: }
idisnullonly onstream::sendcalls that omit theidfield. Onset/update/deleteit always carries theitem_id.
The handler's return value is ignored. Errors are logged but do not affect the original stream::* call site — trigger fan-out runs on a spawned task after the originating call returns.
stream:join and stream:leave event payload (StreamJoinLeaveEvent)
{
"subscription_id": "f8e2c0a4-...", // worker-issued, unique per subscribe call
"stream_name": "presence", // snake_case — different from the data-trigger payload
"group_id": "room-1",
"id": "user-123", // null when subscribing to the whole (stream, group) rather than a single item
"context": { "user_id": "alice@example.com" } // whatever the worker config's auth_function returned for the connection; null if no auth_function is set
}- All field names are snake_case here — do not confuse with the data-trigger's camelCase wrapper.
subscription_idmatches between astream:joinevent and its pairedstream:leaveevent for the same subscription. Use it as a join key for per-subscription state.contextis captured at WebSocket handshake time and the same value flows into every join/leave event for the lifetime of the connection.
For stream:join only, the handler's return value drives authorization:
{ "unauthorized": true } // reject the subscription
{ "unauthorized": false } // allow (same as omitting)
{} // allow (default)
null // allow (default)Only the unauthorized field is read; any other shape — including a return value that fails to parse — is treated as "allow." Errors in the join handler are logged but do not block the subscription.
For stream:leave, the return value is ignored.
Worked example
Watch every change in presence / room-1 so a handler fires on every create/update/delete/event:
{
"type": "stream",
"function_id": "presence::on-change",
"config": { "stream_name": "presence", "group_id": "room-1" }
}Three patterns build on this base:
- Watch a
(stream_name, group_id)for changes. The registration above; branch inside the handler onevent.event.typeto distinguishcreate/update/delete/event. - Authorize subscriptions before any data flows. Register a
stream:joinhandler that inspects the event'scontext/group_idand returns{ "unauthorized": true }to reject; pair with astream:leavehandler matched onsubscription_idfor symmetric cleanup. - Drive a derived projection. From inside a
streamhandler, callstream::set/stream::updateagainst a different(stream_name, group_id)to maintain counters, search indexes, or audit logs; the originating write has already returned, so a slow projection cannot delay or block writers.
For runnable scaffolds covering these patterns end-to-end (TypeScript, Python, and Rust), see the stream worker source and the SDK usage examples in the iii main repo.
Related
stream::set,stream::update,stream::delete,stream::send— the four functions whose calls thestreamtrigger reacts to.stream::get— read on demand instead of registering a trigger.auth_function(worker config) — a function id set on theiii-streamworker; the engine invokes it once per WebSocket handshake with{ headers, path, query_params, addr }, expects{ context:back, and stamps that} contextinto every subsequentstream:join/stream:leaveevent for the connection.