iii-stream
v0.11.6-next.3Build durable streams for real-time data subscriptions.
full markdown
/workers/iii-stream.md?version=0.11.6-next.3. paste it into an llm prompt or pipe it through curl from a worker.install
configuration
- adapter:
config:
file_path: ./data/stream_store
store_method: file_based
name: kv
host: 127.0.0.1
port: 3112dependencies
readme
iii-stream
Durable streams for real-time data subscriptions. Streams organize data hierarchically: stream_name > group_id > item_id. Clients subscribe via WebSocket and receive real-time updates when items change.
When a worker triggers stream::set, the engine:
- Persists the data via the configured adapter (Redis or KvStore)
- Publishes a notification to all WebSocket clients subscribed to that stream and group
- Evaluates registered
streamtriggers and fires matching handlers
Sample Configuration
- name: iii-stream
config:
port: ${STREAM_PORT:3112}
host: 0.0.0.0
adapter:
name: redis
config:
redis_url: ${REDIS_URL:redis://localhost:6379}Configuration
| Field | Type | Description |
|---|---|---|
port |
number | The port to listen on. Defaults to 3112. |
host |
string | The host to listen on. Defaults to 0.0.0.0. |
auth_function |
string | Function ID to authenticate WebSocket connections. |
adapter |
Adapter | Adapter for stream storage and real-time delivery. |
Adapters
redis
Uses Redis as the backend. Stores stream data in Redis and uses Redis Pub/Sub for real-time delivery.
name: redis
config:
redis_url: ${REDIS_URL:redis://localhost:6379}kv
Built-in key-value store. Supports in-memory or file-based persistence. No external dependencies required.
name: kv
config:
store_method: file_based
file_path: ./data/streams_store.db| Field | Type | Description |
|---|---|---|
store_method |
string | in_memory (lost on restart) or file_based (persisted to disk). |
file_path |
string | Directory path for file-based storage. |
Functions
stream::set
Sets a value in the stream. Notifies all WebSocket subscribers and fires stream triggers.
Parameters: stream_name (string), group_id (string), item_id (string), data (any)
Returns: old_value (any), new_value (any)
stream::get
Gets a value from the stream.
Parameters: stream_name (string), group_id (string), item_id (string)
Returns: value (any)
stream::delete
Deletes a value from the stream.
Parameters: stream_name (string), group_id (string), item_id (string)
Returns: old_value (any)
stream::list
Retrieves all items in a group.
Parameters: stream_name (string), group_id (string)
Returns: group (any[])
stream::list_groups
Lists all groups in a stream.
Parameters: stream_name (string)
Returns: groups (string[])
stream::list_all
Lists all streams with their group metadata.
Returns: stream (object[]), count (number)
stream::send
Sends a custom event to all subscribers of a stream group (without persisting).
Parameters: stream_name (string), group_id (string), type (string), data (any), id (string, optional)
stream::update
Atomically updates an item using a list of operations (set, merge, increment, decrement, append, remove).
Parameters: stream_name (string), group_id (string), item_id (string), ops (UpdateOp[])
Returns: old_value (any), new_value (any)
Authentication
Define a function that receives request data (headers, path, query_params, addr) and returns { context: ... }. Set it in config:
- name: iii-stream
config:
auth_function: onAuthTypeScript:
iii.registerFunction('onAuth', (input) => ({
context: { name: 'John Doe' },
}))Python:
def on_auth(input):
return {'context': {'name': 'John Doe'}}
iii.register_function("onAuth", on_auth)Trigger Types
stream
Fires when an item changes (via stream::set, stream::update, or stream::delete).
| Config Field | Type | Description |
|---|---|---|
stream_name |
string | Required. Only changes on this stream fire the handler. |
group_id |
string | If set, only changes within this group fire the handler. |
item_id |
string | If set, only changes to this specific item fire the handler. |
condition_function_id |
string | Function ID for conditional execution. |
Payload fields: type (create/update/delete), timestamp, streamName, groupId, id, event (object with type and data).
stream:join and stream:leave
Fire when a client connects or disconnects via WebSocket.
Payload fields: subscription_id, stream_name, group_id, id (optional), context (from auth).
const fn = iii.registerFunction('onJoin', (input) => {
console.log(`Joined ${input.stream_name}/${input.group_id}`, input.context)
return {}
})
iii.registerTrigger({
type: 'stream:join',
function_id: fn.id,
config: {},
})Usage Example: Real-Time Presence
import { registerWorker, TriggerAction } from 'iii-sdk'
const iii = registerWorker('ws://localhost:49134')
// Set presence
iii.trigger({
function_id: 'stream::set',
payload: {
stream_name: 'presence',
group_id: 'room-1',
item_id: 'user-123',
data: { name: 'Alice', online: true, lastSeen: new Date().toISOString() },
},
action: TriggerAction.Void(),
})
// Get a user
const user = await iii.trigger({
function_id: 'stream::get',
payload: { stream_name: 'presence', group_id: 'room-1', item_id: 'user-123' },
})
// List all members in a room
const roomMembers = await iii.trigger({
function_id: 'stream::list',
payload: { stream_name: 'presence', group_id: 'room-1' },
})Clients connect via WebSocket to ws://host:3112/stream/presence/room-1/ and receive real-time updates when items change.
api reference (json)
{
"functions": [
{
"description": "Delete a value from a stream",
"metadata": {},
"name": "stream::delete",
"request_schema": {
"$schema": "http://json-schema.org/draft-07/schema#",
"properties": {
"group_id": {
"type": "string"
},
"item_id": {
"type": "string"
},
"stream_name": {
"type": "string"
}
},
"required": [
"group_id",
"item_id",
"stream_name"
],
"title": "StreamDeleteInput",
"type": "object"
},
"response_schema": {
"$schema": "http://json-schema.org/draft-07/schema#",
"properties": {
"old_value": {
"description": "The value before the update (None if key didn't exist)"
}
},
"title": "DeleteResult",
"type": "object"
}
},
{
"description": "Get a value from a stream",
"metadata": {},
"name": "stream::get",
"request_schema": {
"$schema": "http://json-schema.org/draft-07/schema#",
"properties": {
"group_id": {
"type": "string"
},
"item_id": {
"type": "string"
},
"stream_name": {
"type": "string"
}
},
"required": [
"group_id",
"item_id",
"stream_name"
],
"title": "StreamGetInput",
"type": "object"
},
"response_schema": {}
},
{
"description": "List all items in a stream group",
"metadata": {},
"name": "stream::list",
"request_schema": {
"$schema": "http://json-schema.org/draft-07/schema#",
"properties": {
"group_id": {
"type": "string"
},
"stream_name": {
"type": "string"
}
},
"required": [
"group_id",
"stream_name"
],
"title": "StreamListInput",
"type": "object"
},
"response_schema": {}
},
{
"description": "List all available stream with metadata",
"metadata": {},
"name": "stream::list_all",
"request_schema": {
"$schema": "http://json-schema.org/draft-07/schema#",
"description": "Input for stream.listAll (empty struct)",
"title": "StreamListAllInput",
"type": "object"
},
"response_schema": {
"$schema": "http://json-schema.org/draft-07/schema#",
"definitions": {
"StreamMetadata": {
"description": "Metadata for a stream (used by stream.listAll)",
"properties": {
"groups": {
"items": {
"type": "string"
},
"type": "array"
},
"id": {
"type": "string"
}
},
"required": [
"groups",
"id"
],
"type": "object"
}
},
"properties": {
"count": {
"format": "uint",
"minimum": 0,
"type": "integer"
},
"stream": {
"items": {
"$ref": "#/definitions/StreamMetadata"
},
"type": "array"
}
},
"required": [
"count",
"stream"
],
"title": "StreamListAllResult",
"type": "object"
}
},
{
"description": "List all groups in a stream",
"metadata": {},
"name": "stream::list_groups",
"request_schema": {
"$schema": "http://json-schema.org/draft-07/schema#",
"properties": {
"stream_name": {
"type": "string"
}
},
"required": [
"stream_name"
],
"title": "StreamListGroupsInput",
"type": "object"
},
"response_schema": {}
},
{
"description": "Send a custom event to stream subscribers",
"metadata": {},
"name": "stream::send",
"request_schema": {
"$schema": "http://json-schema.org/draft-07/schema#",
"properties": {
"data": true,
"group_id": {
"type": "string"
},
"id": {
"type": [
"string",
"null"
]
},
"stream_name": {
"type": "string"
},
"type": {
"type": "string"
}
},
"required": [
"data",
"group_id",
"stream_name",
"type"
],
"title": "StreamSendInput",
"type": "object"
},
"response_schema": {}
},
{
"description": "Set a value in a stream",
"metadata": {},
"name": "stream::set",
"request_schema": {
"$schema": "http://json-schema.org/draft-07/schema#",
"properties": {
"data": true,
"group_id": {
"type": "string"
},
"item_id": {
"type": "string"
},
"stream_name": {
"type": "string"
}
},
"required": [
"data",
"group_id",
"item_id",
"stream_name"
],
"title": "StreamSetInput",
"type": "object"
},
"response_schema": {
"$schema": "http://json-schema.org/draft-07/schema#",
"properties": {
"new_value": {
"description": "The value after the update"
},
"old_value": {
"description": "The value before the update (None if key didn't exist)"
}
},
"required": [
"new_value"
],
"title": "SetResult",
"type": "object"
}
},
{
"description": "Atomically update a stream value with multiple operations",
"metadata": {},
"name": "stream::update",
"request_schema": {
"$schema": "http://json-schema.org/draft-07/schema#",
"definitions": {
"FieldPath": {
"description": "Represents a path to a field in a JSON object",
"type": "string"
},
"MergePath": {
"anyOf": [
{
"type": "string"
},
{
"items": {
"type": "string"
},
"type": "array"
}
],
"description": "Path target for a [`UpdateOp::Merge`] operation. Accepts either a single string (legacy / first-level field) or an array of literal segments (nested path).\n\nPath normalization rules applied by the engine: - absent / `Single(\"\")` / `Segments(vec![])` → root merge - `Single(\"foo\")` is equivalent to `Segments(vec![\"foo\".into()])` - `Segments([\"a\", \"b\", \"c\"])` walks three literal keys, never interpreting dots specially. `Segments(vec![\"a.b\".into()])` is a single literal key named `\"a.b\"`.\n\n**Variant ordering is load-bearing.** `#[serde(untagged)]` tries variants in declaration order — `Single` MUST come before `Segments` so a JSON string deserializes into `Single` rather than failing the array match first."
},
"UpdateOp": {
"description": "Operations that can be performed atomically on a stream value",
"oneOf": [
{
"description": "Set a value at path (overwrite)",
"properties": {
"path": {
"$ref": "#/definitions/FieldPath"
},
"type": {
"enum": [
"set"
],
"type": "string"
},
"value": true
},
"required": [
"path",
"type"
],
"type": "object"
},
{
"description": "Merge object into existing value (object-only). Path may be omitted (root merge), a single first-level key, or an array of literal segments for nested merge. See [`MergePath`].",
"properties": {
"path": {
"anyOf": [
{
"$ref": "#/definitions/MergePath"
},
{
"type": "null"
}
]
},
"type": {
"enum": [
"merge"
],
"type": "string"
},
"value": true
},
"required": [
"type",
"value"
],
"type": "object"
},
{
"description": "Increment numeric value",
"properties": {
"by": {
"format": "int64",
"type": "integer"
},
"path": {
"$ref": "#/definitions/FieldPath"
},
"type": {
"enum": [
"increment"
],
"type": "string"
}
},
"required": [
"by",
"path",
"type"
],
"type": "object"
},
{
"description": "Decrement numeric value",
"properties": {
"by": {
"format": "int64",
"type": "integer"
},
"path": {
"$ref": "#/definitions/FieldPath"
},
"type": {
"enum": [
"decrement"
],
"type": "string"
}
},
"required": [
"by",
"path",
"type"
],
"type": "object"
},
{
"description": "Append an element to an array or concatenate a string",
"properties": {
"path": {
"$ref": "#/definitions/FieldPath"
},
"type": {
"enum": [
"append"
],
"type": "string"
},
"value": true
},
"required": [
"path",
"type",
"value"
],
"type": "object"
},
{
"description": "Remove a field",
"properties": {
"path": {
"$ref": "#/definitions/FieldPath"
},
"type": {
"enum": [
"remove"
],
"type": "string"
}
},
"required": [
"path",
"type"
],
"type": "object"
}
]
}
},
"description": "Input for atomic stream update operations",
"properties": {
"group_id": {
"type": "string"
},
"item_id": {
"type": "string"
},
"ops": {
"items": {
"$ref": "#/definitions/UpdateOp"
},
"type": "array"
},
"stream_name": {
"type": "string"
}
},
"required": [
"group_id",
"item_id",
"ops",
"stream_name"
],
"title": "StreamUpdateInput",
"type": "object"
},
"response_schema": {
"$schema": "http://json-schema.org/draft-07/schema#",
"definitions": {
"UpdateOpError": {
"description": "Per-op error reported by an atomic update operation.",
"properties": {
"code": {
"description": "Stable error code, e.g. `\"merge.path.too_deep\"`.",
"type": "string"
},
"doc_url": {
"description": "Optional documentation URL for this error class.",
"type": [
"string",
"null"
]
},
"message": {
"description": "Human-readable description with concrete numbers when applicable.",
"type": "string"
},
"op_index": {
"description": "Index of the offending op within the original `ops` array.",
"format": "uint",
"minimum": 0,
"type": "integer"
}
},
"required": [
"code",
"message",
"op_index"
],
"type": "object"
}
},
"description": "Result of an atomic update operation",
"properties": {
"errors": {
"description": "Errors encountered while applying ops. Successfully applied ops are still reflected in `new_value`. Field is omitted from JSON when empty for backward compatibility.",
"items": {
"$ref": "#/definitions/UpdateOpError"
},
"type": "array"
},
"new_value": {
"description": "The value after the update"
},
"old_value": {
"description": "The value before the update (None if key didn't exist)"
}
},
"required": [
"new_value"
],
"title": "UpdateResult",
"type": "object"
}
}
],
"triggers": [
{
"description": "Stream leave trigger",
"invocation_schema": {
"$schema": "http://json-schema.org/draft-07/schema#",
"properties": {
"condition_function_id": {
"description": "Optional function ID to evaluate before invoking handler",
"type": [
"string",
"null"
]
},
"stream_name": {
"description": "Stream name to watch",
"type": [
"string",
"null"
]
}
},
"title": "StreamJoinLeaveTriggerConfig",
"type": "object"
},
"metadata": {},
"name": "stream:leave",
"return_schema": {
"$schema": "http://json-schema.org/draft-07/schema#",
"properties": {
"event_type": {
"description": "Event type (stream:join or stream:leave)",
"type": "string"
},
"group_id": {
"description": "Group ID",
"type": "string"
},
"id": {
"description": "Peer ID",
"type": [
"string",
"null"
]
},
"stream_name": {
"description": "Stream name",
"type": "string"
},
"timestamp": {
"description": "Event timestamp (ms)",
"format": "int64",
"type": "integer"
}
},
"required": [
"event_type",
"group_id",
"stream_name",
"timestamp"
],
"title": "StreamJoinLeaveCallRequest",
"type": "object"
}
},
{
"description": "Log event trigger",
"invocation_schema": {
"$schema": "http://json-schema.org/draft-07/schema#",
"definitions": {
"LogLevel": {
"enum": [
"all",
"debug",
"info",
"warn",
"error"
],
"type": "string"
}
},
"properties": {
"level": {
"anyOf": [
{
"$ref": "#/definitions/LogLevel"
},
{
"type": "null"
}
],
"default": "all",
"description": "Minimum log level to trigger on"
}
},
"title": "LogTriggerConfig",
"type": "object"
},
"metadata": {},
"name": "log",
"return_schema": {
"$schema": "http://json-schema.org/draft-07/schema#",
"properties": {
"attributes": {
"description": "Log attributes"
},
"body": {
"description": "Log message body",
"type": "string"
},
"instrumentation_scope_name": {
"description": "Instrumentation scope name",
"type": "string"
},
"instrumentation_scope_version": {
"description": "Instrumentation scope version",
"type": "string"
},
"observed_timestamp_unix_nano": {
"description": "Observed timestamp in nanoseconds",
"format": "uint64",
"minimum": 0,
"type": "integer"
},
"resource": {
"description": "OpenTelemetry resource"
},
"service_name": {
"description": "Service name",
"type": "string"
},
"severity_number": {
"description": "Severity number (OpenTelemetry)",
"format": "uint32",
"minimum": 0,
"type": "integer"
},
"severity_text": {
"description": "Severity text (e.g. INFO, ERROR)",
"type": "string"
},
"span_id": {
"description": "Span ID",
"type": "string"
},
"timestamp_unix_nano": {
"description": "Log timestamp in nanoseconds",
"format": "uint64",
"minimum": 0,
"type": "integer"
},
"trace_id": {
"description": "Trace ID",
"type": "string"
}
},
"required": [
"attributes",
"body",
"instrumentation_scope_name",
"instrumentation_scope_version",
"observed_timestamp_unix_nano",
"resource",
"service_name",
"severity_number",
"severity_text",
"span_id",
"timestamp_unix_nano",
"trace_id"
],
"title": "LogCallRequest",
"type": "object"
}
},
{
"description": "Stream trigger",
"invocation_schema": {
"$schema": "http://json-schema.org/draft-07/schema#",
"properties": {
"condition_function_id": {
"description": "Optional function ID to evaluate before invoking handler",
"type": [
"string",
"null"
]
},
"group_id": {
"description": "Group ID filter",
"type": [
"string",
"null"
]
},
"item_id": {
"description": "Item ID filter",
"type": [
"string",
"null"
]
},
"stream_name": {
"description": "Stream name to watch",
"type": [
"string",
"null"
]
}
},
"title": "StreamTriggerConfig",
"type": "object"
},
"metadata": {},
"name": "stream",
"return_schema": {
"$schema": "http://json-schema.org/draft-07/schema#",
"properties": {
"event": {
"description": "Event-specific data (create/update/delete/sync payload)"
},
"event_type": {
"description": "Stream event type (create, update, delete, sync)",
"type": "string"
},
"group_id": {
"description": "Group ID",
"type": "string"
},
"id": {
"description": "Item ID",
"type": [
"string",
"null"
]
},
"stream_name": {
"description": "Stream name",
"type": "string"
},
"timestamp": {
"description": "Event timestamp (ms)",
"format": "int64",
"type": "integer"
}
},
"required": [
"event",
"event_type",
"group_id",
"stream_name",
"timestamp"
],
"title": "StreamCallRequest",
"type": "object"
}
},
{
"description": "Stream join trigger",
"invocation_schema": {
"$schema": "http://json-schema.org/draft-07/schema#",
"properties": {
"condition_function_id": {
"description": "Optional function ID to evaluate before invoking handler",
"type": [
"string",
"null"
]
},
"stream_name": {
"description": "Stream name to watch",
"type": [
"string",
"null"
]
}
},
"title": "StreamJoinLeaveTriggerConfig",
"type": "object"
},
"metadata": {},
"name": "stream:join",
"return_schema": {
"$schema": "http://json-schema.org/draft-07/schema#",
"properties": {
"event_type": {
"description": "Event type (stream:join or stream:leave)",
"type": "string"
},
"group_id": {
"description": "Group ID",
"type": "string"
},
"id": {
"description": "Peer ID",
"type": [
"string",
"null"
]
},
"stream_name": {
"description": "Stream name",
"type": "string"
},
"timestamp": {
"description": "Event timestamp (ms)",
"format": "int64",
"type": "integer"
}
},
"required": [
"event_type",
"group_id",
"stream_name",
"timestamp"
],
"title": "StreamJoinLeaveCallRequest",
"type": "object"
}
}
]
}