iii / worker
$worker

iii-stream

v0.11.6-next.3

Build durable streams for real-time data subscriptions.

engine module
baked into the iii engine; no separate install required.
agent-ready brief for v0.11.6-next.3
install + config + dependencies + readme + api reference, all in one place. fetch as agent-context.md for an llm to consume.
the same content rendered as discrete blocks below is exposed as a single markdown document at /workers/iii-stream.md?version=0.11.6-next.3. paste it into an llm prompt or pipe it through curl from a worker.

install

install
$iii worker add iii-stream@0.11.6-next.3

configuration

iii-config.yaml
- adapter:
    config:
      file_path: ./data/stream_store
      store_method: file_based
    name: kv
  host: 127.0.0.1
  port: 3112

dependencies

no dependencies for v0.11.6-next.3

readme

README.md

iii-stream

Durable streams for real-time data subscriptions. Streams organize data hierarchically: stream_name > group_id > item_id. Clients subscribe via WebSocket and receive real-time updates when items change.

When a worker triggers stream::set, the engine:

  1. Persists the data via the configured adapter (Redis or KvStore)
  2. Publishes a notification to all WebSocket clients subscribed to that stream and group
  3. Evaluates registered stream triggers and fires matching handlers

Sample Configuration

- name: iii-stream
  config:
    port: ${STREAM_PORT:3112}
    host: 0.0.0.0
    adapter:
      name: redis
      config:
        redis_url: ${REDIS_URL:redis://localhost:6379}

Configuration

Field Type Description
port number The port to listen on. Defaults to 3112.
host string The host to listen on. Defaults to 0.0.0.0.
auth_function string Function ID to authenticate WebSocket connections.
adapter Adapter Adapter for stream storage and real-time delivery.

Adapters

redis

Uses Redis as the backend. Stores stream data in Redis and uses Redis Pub/Sub for real-time delivery.

name: redis
config:
  redis_url: ${REDIS_URL:redis://localhost:6379}

kv

Built-in key-value store. Supports in-memory or file-based persistence. No external dependencies required.

name: kv
config:
  store_method: file_based
  file_path: ./data/streams_store.db
Field Type Description
store_method string in_memory (lost on restart) or file_based (persisted to disk).
file_path string Directory path for file-based storage.

Functions

stream::set

Sets a value in the stream. Notifies all WebSocket subscribers and fires stream triggers.

Parameters: stream_name (string), group_id (string), item_id (string), data (any)

Returns: old_value (any), new_value (any)

stream::get

Gets a value from the stream.

Parameters: stream_name (string), group_id (string), item_id (string)

Returns: value (any)

stream::delete

Deletes a value from the stream.

Parameters: stream_name (string), group_id (string), item_id (string)

Returns: old_value (any)

stream::list

Retrieves all items in a group.

Parameters: stream_name (string), group_id (string)

Returns: group (any[])

stream::list_groups

Lists all groups in a stream.

Parameters: stream_name (string)

Returns: groups (string[])

stream::list_all

Lists all streams with their group metadata.

Returns: stream (object[]), count (number)

stream::send

Sends a custom event to all subscribers of a stream group (without persisting).

Parameters: stream_name (string), group_id (string), type (string), data (any), id (string, optional)

stream::update

Atomically updates an item using a list of operations (set, merge, increment, decrement, append, remove).

Parameters: stream_name (string), group_id (string), item_id (string), ops (UpdateOp[])

Returns: old_value (any), new_value (any)

Authentication

Define a function that receives request data (headers, path, query_params, addr) and returns { context: ... }. Set it in config:

- name: iii-stream
  config:
    auth_function: onAuth

TypeScript:

iii.registerFunction('onAuth', (input) => ({
  context: { name: 'John Doe' },
}))

Python:

def on_auth(input):
    return {'context': {'name': 'John Doe'}}

iii.register_function("onAuth", on_auth)

Trigger Types

stream

Fires when an item changes (via stream::set, stream::update, or stream::delete).

Config Field Type Description
stream_name string Required. Only changes on this stream fire the handler.
group_id string If set, only changes within this group fire the handler.
item_id string If set, only changes to this specific item fire the handler.
condition_function_id string Function ID for conditional execution.

Payload fields: type (create/update/delete), timestamp, streamName, groupId, id, event (object with type and data).

stream:join and stream:leave

Fire when a client connects or disconnects via WebSocket.

Payload fields: subscription_id, stream_name, group_id, id (optional), context (from auth).

const fn = iii.registerFunction('onJoin', (input) => {
  console.log(`Joined ${input.stream_name}/${input.group_id}`, input.context)
  return {}
})

iii.registerTrigger({
  type: 'stream:join',
  function_id: fn.id,
  config: {},
})

Usage Example: Real-Time Presence

import { registerWorker, TriggerAction } from 'iii-sdk'

const iii = registerWorker('ws://localhost:49134')

// Set presence
iii.trigger({
  function_id: 'stream::set',
  payload: {
    stream_name: 'presence',
    group_id: 'room-1',
    item_id: 'user-123',
    data: { name: 'Alice', online: true, lastSeen: new Date().toISOString() },
  },
  action: TriggerAction.Void(),
})

// Get a user
const user = await iii.trigger({
  function_id: 'stream::get',
  payload: { stream_name: 'presence', group_id: 'room-1', item_id: 'user-123' },
})

// List all members in a room
const roomMembers = await iii.trigger({
  function_id: 'stream::list',
  payload: { stream_name: 'presence', group_id: 'room-1' },
})

Clients connect via WebSocket to ws://host:3112/stream/presence/room-1/ and receive real-time updates when items change.

api reference (json)

agent-api-reference.json
{
  "functions": [
    {
      "description": "Delete a value from a stream",
      "metadata": {},
      "name": "stream::delete",
      "request_schema": {
        "$schema": "http://json-schema.org/draft-07/schema#",
        "properties": {
          "group_id": {
            "type": "string"
          },
          "item_id": {
            "type": "string"
          },
          "stream_name": {
            "type": "string"
          }
        },
        "required": [
          "group_id",
          "item_id",
          "stream_name"
        ],
        "title": "StreamDeleteInput",
        "type": "object"
      },
      "response_schema": {
        "$schema": "http://json-schema.org/draft-07/schema#",
        "properties": {
          "old_value": {
            "description": "The value before the update (None if key didn't exist)"
          }
        },
        "title": "DeleteResult",
        "type": "object"
      }
    },
    {
      "description": "Get a value from a stream",
      "metadata": {},
      "name": "stream::get",
      "request_schema": {
        "$schema": "http://json-schema.org/draft-07/schema#",
        "properties": {
          "group_id": {
            "type": "string"
          },
          "item_id": {
            "type": "string"
          },
          "stream_name": {
            "type": "string"
          }
        },
        "required": [
          "group_id",
          "item_id",
          "stream_name"
        ],
        "title": "StreamGetInput",
        "type": "object"
      },
      "response_schema": {}
    },
    {
      "description": "List all items in a stream group",
      "metadata": {},
      "name": "stream::list",
      "request_schema": {
        "$schema": "http://json-schema.org/draft-07/schema#",
        "properties": {
          "group_id": {
            "type": "string"
          },
          "stream_name": {
            "type": "string"
          }
        },
        "required": [
          "group_id",
          "stream_name"
        ],
        "title": "StreamListInput",
        "type": "object"
      },
      "response_schema": {}
    },
    {
      "description": "List all available stream with metadata",
      "metadata": {},
      "name": "stream::list_all",
      "request_schema": {
        "$schema": "http://json-schema.org/draft-07/schema#",
        "description": "Input for stream.listAll (empty struct)",
        "title": "StreamListAllInput",
        "type": "object"
      },
      "response_schema": {
        "$schema": "http://json-schema.org/draft-07/schema#",
        "definitions": {
          "StreamMetadata": {
            "description": "Metadata for a stream (used by stream.listAll)",
            "properties": {
              "groups": {
                "items": {
                  "type": "string"
                },
                "type": "array"
              },
              "id": {
                "type": "string"
              }
            },
            "required": [
              "groups",
              "id"
            ],
            "type": "object"
          }
        },
        "properties": {
          "count": {
            "format": "uint",
            "minimum": 0,
            "type": "integer"
          },
          "stream": {
            "items": {
              "$ref": "#/definitions/StreamMetadata"
            },
            "type": "array"
          }
        },
        "required": [
          "count",
          "stream"
        ],
        "title": "StreamListAllResult",
        "type": "object"
      }
    },
    {
      "description": "List all groups in a stream",
      "metadata": {},
      "name": "stream::list_groups",
      "request_schema": {
        "$schema": "http://json-schema.org/draft-07/schema#",
        "properties": {
          "stream_name": {
            "type": "string"
          }
        },
        "required": [
          "stream_name"
        ],
        "title": "StreamListGroupsInput",
        "type": "object"
      },
      "response_schema": {}
    },
    {
      "description": "Send a custom event to stream subscribers",
      "metadata": {},
      "name": "stream::send",
      "request_schema": {
        "$schema": "http://json-schema.org/draft-07/schema#",
        "properties": {
          "data": true,
          "group_id": {
            "type": "string"
          },
          "id": {
            "type": [
              "string",
              "null"
            ]
          },
          "stream_name": {
            "type": "string"
          },
          "type": {
            "type": "string"
          }
        },
        "required": [
          "data",
          "group_id",
          "stream_name",
          "type"
        ],
        "title": "StreamSendInput",
        "type": "object"
      },
      "response_schema": {}
    },
    {
      "description": "Set a value in a stream",
      "metadata": {},
      "name": "stream::set",
      "request_schema": {
        "$schema": "http://json-schema.org/draft-07/schema#",
        "properties": {
          "data": true,
          "group_id": {
            "type": "string"
          },
          "item_id": {
            "type": "string"
          },
          "stream_name": {
            "type": "string"
          }
        },
        "required": [
          "data",
          "group_id",
          "item_id",
          "stream_name"
        ],
        "title": "StreamSetInput",
        "type": "object"
      },
      "response_schema": {
        "$schema": "http://json-schema.org/draft-07/schema#",
        "properties": {
          "new_value": {
            "description": "The value after the update"
          },
          "old_value": {
            "description": "The value before the update (None if key didn't exist)"
          }
        },
        "required": [
          "new_value"
        ],
        "title": "SetResult",
        "type": "object"
      }
    },
    {
      "description": "Atomically update a stream value with multiple operations",
      "metadata": {},
      "name": "stream::update",
      "request_schema": {
        "$schema": "http://json-schema.org/draft-07/schema#",
        "definitions": {
          "FieldPath": {
            "description": "Represents a path to a field in a JSON object",
            "type": "string"
          },
          "MergePath": {
            "anyOf": [
              {
                "type": "string"
              },
              {
                "items": {
                  "type": "string"
                },
                "type": "array"
              }
            ],
            "description": "Path target for a [`UpdateOp::Merge`] operation. Accepts either a single string (legacy / first-level field) or an array of literal segments (nested path).\n\nPath normalization rules applied by the engine: - absent / `Single(\"\")` / `Segments(vec![])` → root merge - `Single(\"foo\")` is equivalent to `Segments(vec![\"foo\".into()])` - `Segments([\"a\", \"b\", \"c\"])` walks three literal keys, never interpreting dots specially. `Segments(vec![\"a.b\".into()])` is a single literal key named `\"a.b\"`.\n\n**Variant ordering is load-bearing.** `#[serde(untagged)]` tries variants in declaration order — `Single` MUST come before `Segments` so a JSON string deserializes into `Single` rather than failing the array match first."
          },
          "UpdateOp": {
            "description": "Operations that can be performed atomically on a stream value",
            "oneOf": [
              {
                "description": "Set a value at path (overwrite)",
                "properties": {
                  "path": {
                    "$ref": "#/definitions/FieldPath"
                  },
                  "type": {
                    "enum": [
                      "set"
                    ],
                    "type": "string"
                  },
                  "value": true
                },
                "required": [
                  "path",
                  "type"
                ],
                "type": "object"
              },
              {
                "description": "Merge object into existing value (object-only). Path may be omitted (root merge), a single first-level key, or an array of literal segments for nested merge. See [`MergePath`].",
                "properties": {
                  "path": {
                    "anyOf": [
                      {
                        "$ref": "#/definitions/MergePath"
                      },
                      {
                        "type": "null"
                      }
                    ]
                  },
                  "type": {
                    "enum": [
                      "merge"
                    ],
                    "type": "string"
                  },
                  "value": true
                },
                "required": [
                  "type",
                  "value"
                ],
                "type": "object"
              },
              {
                "description": "Increment numeric value",
                "properties": {
                  "by": {
                    "format": "int64",
                    "type": "integer"
                  },
                  "path": {
                    "$ref": "#/definitions/FieldPath"
                  },
                  "type": {
                    "enum": [
                      "increment"
                    ],
                    "type": "string"
                  }
                },
                "required": [
                  "by",
                  "path",
                  "type"
                ],
                "type": "object"
              },
              {
                "description": "Decrement numeric value",
                "properties": {
                  "by": {
                    "format": "int64",
                    "type": "integer"
                  },
                  "path": {
                    "$ref": "#/definitions/FieldPath"
                  },
                  "type": {
                    "enum": [
                      "decrement"
                    ],
                    "type": "string"
                  }
                },
                "required": [
                  "by",
                  "path",
                  "type"
                ],
                "type": "object"
              },
              {
                "description": "Append an element to an array or concatenate a string",
                "properties": {
                  "path": {
                    "$ref": "#/definitions/FieldPath"
                  },
                  "type": {
                    "enum": [
                      "append"
                    ],
                    "type": "string"
                  },
                  "value": true
                },
                "required": [
                  "path",
                  "type",
                  "value"
                ],
                "type": "object"
              },
              {
                "description": "Remove a field",
                "properties": {
                  "path": {
                    "$ref": "#/definitions/FieldPath"
                  },
                  "type": {
                    "enum": [
                      "remove"
                    ],
                    "type": "string"
                  }
                },
                "required": [
                  "path",
                  "type"
                ],
                "type": "object"
              }
            ]
          }
        },
        "description": "Input for atomic stream update operations",
        "properties": {
          "group_id": {
            "type": "string"
          },
          "item_id": {
            "type": "string"
          },
          "ops": {
            "items": {
              "$ref": "#/definitions/UpdateOp"
            },
            "type": "array"
          },
          "stream_name": {
            "type": "string"
          }
        },
        "required": [
          "group_id",
          "item_id",
          "ops",
          "stream_name"
        ],
        "title": "StreamUpdateInput",
        "type": "object"
      },
      "response_schema": {
        "$schema": "http://json-schema.org/draft-07/schema#",
        "definitions": {
          "UpdateOpError": {
            "description": "Per-op error reported by an atomic update operation.",
            "properties": {
              "code": {
                "description": "Stable error code, e.g. `\"merge.path.too_deep\"`.",
                "type": "string"
              },
              "doc_url": {
                "description": "Optional documentation URL for this error class.",
                "type": [
                  "string",
                  "null"
                ]
              },
              "message": {
                "description": "Human-readable description with concrete numbers when applicable.",
                "type": "string"
              },
              "op_index": {
                "description": "Index of the offending op within the original `ops` array.",
                "format": "uint",
                "minimum": 0,
                "type": "integer"
              }
            },
            "required": [
              "code",
              "message",
              "op_index"
            ],
            "type": "object"
          }
        },
        "description": "Result of an atomic update operation",
        "properties": {
          "errors": {
            "description": "Errors encountered while applying ops. Successfully applied ops are still reflected in `new_value`. Field is omitted from JSON when empty for backward compatibility.",
            "items": {
              "$ref": "#/definitions/UpdateOpError"
            },
            "type": "array"
          },
          "new_value": {
            "description": "The value after the update"
          },
          "old_value": {
            "description": "The value before the update (None if key didn't exist)"
          }
        },
        "required": [
          "new_value"
        ],
        "title": "UpdateResult",
        "type": "object"
      }
    }
  ],
  "triggers": [
    {
      "description": "Stream leave trigger",
      "invocation_schema": {
        "$schema": "http://json-schema.org/draft-07/schema#",
        "properties": {
          "condition_function_id": {
            "description": "Optional function ID to evaluate before invoking handler",
            "type": [
              "string",
              "null"
            ]
          },
          "stream_name": {
            "description": "Stream name to watch",
            "type": [
              "string",
              "null"
            ]
          }
        },
        "title": "StreamJoinLeaveTriggerConfig",
        "type": "object"
      },
      "metadata": {},
      "name": "stream:leave",
      "return_schema": {
        "$schema": "http://json-schema.org/draft-07/schema#",
        "properties": {
          "event_type": {
            "description": "Event type (stream:join or stream:leave)",
            "type": "string"
          },
          "group_id": {
            "description": "Group ID",
            "type": "string"
          },
          "id": {
            "description": "Peer ID",
            "type": [
              "string",
              "null"
            ]
          },
          "stream_name": {
            "description": "Stream name",
            "type": "string"
          },
          "timestamp": {
            "description": "Event timestamp (ms)",
            "format": "int64",
            "type": "integer"
          }
        },
        "required": [
          "event_type",
          "group_id",
          "stream_name",
          "timestamp"
        ],
        "title": "StreamJoinLeaveCallRequest",
        "type": "object"
      }
    },
    {
      "description": "Log event trigger",
      "invocation_schema": {
        "$schema": "http://json-schema.org/draft-07/schema#",
        "definitions": {
          "LogLevel": {
            "enum": [
              "all",
              "debug",
              "info",
              "warn",
              "error"
            ],
            "type": "string"
          }
        },
        "properties": {
          "level": {
            "anyOf": [
              {
                "$ref": "#/definitions/LogLevel"
              },
              {
                "type": "null"
              }
            ],
            "default": "all",
            "description": "Minimum log level to trigger on"
          }
        },
        "title": "LogTriggerConfig",
        "type": "object"
      },
      "metadata": {},
      "name": "log",
      "return_schema": {
        "$schema": "http://json-schema.org/draft-07/schema#",
        "properties": {
          "attributes": {
            "description": "Log attributes"
          },
          "body": {
            "description": "Log message body",
            "type": "string"
          },
          "instrumentation_scope_name": {
            "description": "Instrumentation scope name",
            "type": "string"
          },
          "instrumentation_scope_version": {
            "description": "Instrumentation scope version",
            "type": "string"
          },
          "observed_timestamp_unix_nano": {
            "description": "Observed timestamp in nanoseconds",
            "format": "uint64",
            "minimum": 0,
            "type": "integer"
          },
          "resource": {
            "description": "OpenTelemetry resource"
          },
          "service_name": {
            "description": "Service name",
            "type": "string"
          },
          "severity_number": {
            "description": "Severity number (OpenTelemetry)",
            "format": "uint32",
            "minimum": 0,
            "type": "integer"
          },
          "severity_text": {
            "description": "Severity text (e.g. INFO, ERROR)",
            "type": "string"
          },
          "span_id": {
            "description": "Span ID",
            "type": "string"
          },
          "timestamp_unix_nano": {
            "description": "Log timestamp in nanoseconds",
            "format": "uint64",
            "minimum": 0,
            "type": "integer"
          },
          "trace_id": {
            "description": "Trace ID",
            "type": "string"
          }
        },
        "required": [
          "attributes",
          "body",
          "instrumentation_scope_name",
          "instrumentation_scope_version",
          "observed_timestamp_unix_nano",
          "resource",
          "service_name",
          "severity_number",
          "severity_text",
          "span_id",
          "timestamp_unix_nano",
          "trace_id"
        ],
        "title": "LogCallRequest",
        "type": "object"
      }
    },
    {
      "description": "Stream trigger",
      "invocation_schema": {
        "$schema": "http://json-schema.org/draft-07/schema#",
        "properties": {
          "condition_function_id": {
            "description": "Optional function ID to evaluate before invoking handler",
            "type": [
              "string",
              "null"
            ]
          },
          "group_id": {
            "description": "Group ID filter",
            "type": [
              "string",
              "null"
            ]
          },
          "item_id": {
            "description": "Item ID filter",
            "type": [
              "string",
              "null"
            ]
          },
          "stream_name": {
            "description": "Stream name to watch",
            "type": [
              "string",
              "null"
            ]
          }
        },
        "title": "StreamTriggerConfig",
        "type": "object"
      },
      "metadata": {},
      "name": "stream",
      "return_schema": {
        "$schema": "http://json-schema.org/draft-07/schema#",
        "properties": {
          "event": {
            "description": "Event-specific data (create/update/delete/sync payload)"
          },
          "event_type": {
            "description": "Stream event type (create, update, delete, sync)",
            "type": "string"
          },
          "group_id": {
            "description": "Group ID",
            "type": "string"
          },
          "id": {
            "description": "Item ID",
            "type": [
              "string",
              "null"
            ]
          },
          "stream_name": {
            "description": "Stream name",
            "type": "string"
          },
          "timestamp": {
            "description": "Event timestamp (ms)",
            "format": "int64",
            "type": "integer"
          }
        },
        "required": [
          "event",
          "event_type",
          "group_id",
          "stream_name",
          "timestamp"
        ],
        "title": "StreamCallRequest",
        "type": "object"
      }
    },
    {
      "description": "Stream join trigger",
      "invocation_schema": {
        "$schema": "http://json-schema.org/draft-07/schema#",
        "properties": {
          "condition_function_id": {
            "description": "Optional function ID to evaluate before invoking handler",
            "type": [
              "string",
              "null"
            ]
          },
          "stream_name": {
            "description": "Stream name to watch",
            "type": [
              "string",
              "null"
            ]
          }
        },
        "title": "StreamJoinLeaveTriggerConfig",
        "type": "object"
      },
      "metadata": {},
      "name": "stream:join",
      "return_schema": {
        "$schema": "http://json-schema.org/draft-07/schema#",
        "properties": {
          "event_type": {
            "description": "Event type (stream:join or stream:leave)",
            "type": "string"
          },
          "group_id": {
            "description": "Group ID",
            "type": "string"
          },
          "id": {
            "description": "Peer ID",
            "type": [
              "string",
              "null"
            ]
          },
          "stream_name": {
            "description": "Stream name",
            "type": "string"
          },
          "timestamp": {
            "description": "Event timestamp (ms)",
            "format": "int64",
            "type": "integer"
          }
        },
        "required": [
          "event_type",
          "group_id",
          "stream_name",
          "timestamp"
        ],
        "title": "StreamJoinLeaveCallRequest",
        "type": "object"
      }
    }
  ]
}