# iii-stream

> Build durable streams for real-time data subscriptions.

| field | value |
|-------|-------|
| version | 0.11.6-next.5 |
| type | engine |
| repo | https://github.com/iii-hq/iii |
| author | iii |

## installation

```sh
iii worker add iii-stream@0.11.6-next.5
```

## configuration

```yaml
- adapter:
    config:
      file_path: ./data/stream_store
      store_method: file_based
    name: kv
  host: 127.0.0.1
  port: 3112
```

## readme

# iii-stream

Durable streams for real-time data subscriptions. Streams organize data hierarchically: `stream_name` > `group_id` > `item_id`. Clients subscribe via WebSocket and receive real-time updates when items change.

When a worker triggers `stream::set`, the engine:
1. Persists the data via the configured adapter (Redis or KvStore)
2. Publishes a notification to all WebSocket clients subscribed to that stream and group
3. Evaluates registered `stream` triggers and fires matching handlers

## Sample Configuration

```yaml
- name: iii-stream
  config:
    port: ${STREAM_PORT:3112}
    host: 0.0.0.0
    adapter:
      name: redis
      config:
        redis_url: ${REDIS_URL:redis://localhost:6379}
```

## Configuration

| Field | Type | Description |
|---|---|---|
| `port` | number | The port to listen on. Defaults to `3112`. |
| `host` | string | The host to listen on. Defaults to `0.0.0.0`. |
| `auth_function` | string | Function ID to authenticate WebSocket connections. |
| `adapter` | Adapter | Adapter for stream storage and real-time delivery. |

## Adapters

### redis

Uses Redis as the backend. Stores stream data in Redis and uses Redis Pub/Sub for real-time delivery.

```yaml
name: redis
config:
  redis_url: ${REDIS_URL:redis://localhost:6379}
```

### kv

Built-in key-value store. Supports in-memory or file-based persistence. No external dependencies required.

```yaml
name: kv
config:
  store_method: file_based
  file_path: ./data/streams_store.db
```

| Field | Type | Description |
|---|---|---|
| `store_method` | string | `in_memory` (lost on restart) or `file_based` (persisted to disk). |
| `file_path` | string | Directory path for file-based storage. |

## Functions

### `stream::set`

Sets a value in the stream. Notifies all WebSocket subscribers and fires `stream` triggers.

Parameters: `stream_name` (string), `group_id` (string), `item_id` (string), `data` (any)

Returns: `old_value` (any), `new_value` (any)

### `stream::get`

Gets a value from the stream.

Parameters: `stream_name` (string), `group_id` (string), `item_id` (string)

Returns: `value` (any)

### `stream::delete`

Deletes a value from the stream.

Parameters: `stream_name` (string), `group_id` (string), `item_id` (string)

Returns: `old_value` (any)

### `stream::list`

Retrieves all items in a group.

Parameters: `stream_name` (string), `group_id` (string)

Returns: `group` (any[])

### `stream::list_groups`

Lists all groups in a stream.

Parameters: `stream_name` (string)

Returns: `groups` (string[])

### `stream::list_all`

Lists all streams with their group metadata.

Returns: `stream` (object[]), `count` (number)

### `stream::send`

Sends a custom event to all subscribers of a stream group (without persisting).

Parameters: `stream_name` (string), `group_id` (string), `type` (string), `data` (any), `id` (string, optional)

### `stream::update`

Atomically updates an item using a list of operations (`set`, `merge`, `increment`, `decrement`, `append`, `remove`).

Parameters: `stream_name` (string), `group_id` (string), `item_id` (string), `ops` (UpdateOp[])

Returns: `old_value` (any), `new_value` (any)

## Authentication

Define a function that receives request data (`headers`, `path`, `query_params`, `addr`) and returns `{ context: ... }`. Set it in config:

```yaml
- name: iii-stream
  config:
    auth_function: onAuth
```

**TypeScript:**
```typescript
iii.registerFunction('onAuth', (input) => ({
  context: { name: 'John Doe' },
}))
```

**Python:**
```python
def on_auth(input):
    return {'context': {'name': 'John Doe'}}

iii.register_function("onAuth", on_auth)
```

## Trigger Types

### `stream`

Fires when an item changes (via `stream::set`, `stream::update`, or `stream::delete`).

| Config Field | Type | Description |
|---|---|---|
| `stream_name` | string | Required. Only changes on this stream fire the handler. |
| `group_id` | string | If set, only changes within this group fire the handler. |
| `item_id` | string | If set, only changes to this specific item fire the handler. |
| `condition_function_id` | string | Function ID for conditional execution. |

Payload fields: `type` (`create`/`update`/`delete`), `timestamp`, `streamName`, `groupId`, `id`, `event` (object with `type` and `data`).

### `stream:join` and `stream:leave`

Fire when a client connects or disconnects via WebSocket.

Payload fields: `subscription_id`, `stream_name`, `group_id`, `id` (optional), `context` (from auth).

```typescript
const fn = iii.registerFunction('onJoin', (input) => {
  console.log(`Joined ${input.stream_name}/${input.group_id}`, input.context)
  return {}
})

iii.registerTrigger({
  type: 'stream:join',
  function_id: fn.id,
  config: {},
})
```

## Usage Example: Real-Time Presence

```typescript
import { registerWorker, TriggerAction } from 'iii-sdk'

const iii = registerWorker('ws://localhost:49134')

// Set presence
iii.trigger({
  function_id: 'stream::set',
  payload: {
    stream_name: 'presence',
    group_id: 'room-1',
    item_id: 'user-123',
    data: { name: 'Alice', online: true, lastSeen: new Date().toISOString() },
  },
  action: TriggerAction.Void(),
})

// Get a user
const user = await iii.trigger({
  function_id: 'stream::get',
  payload: { stream_name: 'presence', group_id: 'room-1', item_id: 'user-123' },
})

// List all members in a room
const roomMembers = await iii.trigger({
  function_id: 'stream::list',
  payload: { stream_name: 'presence', group_id: 'room-1' },
})
```

Clients connect via WebSocket to `ws://host:3112/stream/presence/room-1/` and receive real-time updates when items change.

## api reference

```json
{
  "functions": [
    {
      "description": "Delete a value from a stream",
      "metadata": {},
      "name": "stream::delete",
      "request_schema": {
        "$schema": "http://json-schema.org/draft-07/schema#",
        "properties": {
          "group_id": {
            "type": "string"
          },
          "item_id": {
            "type": "string"
          },
          "stream_name": {
            "type": "string"
          }
        },
        "required": [
          "group_id",
          "item_id",
          "stream_name"
        ],
        "title": "StreamDeleteInput",
        "type": "object"
      },
      "response_schema": {
        "$schema": "http://json-schema.org/draft-07/schema#",
        "properties": {
          "old_value": {
            "description": "The value before the update (None if key didn't exist)"
          }
        },
        "title": "DeleteResult",
        "type": "object"
      }
    },
    {
      "description": "Get a value from a stream",
      "metadata": {},
      "name": "stream::get",
      "request_schema": {
        "$schema": "http://json-schema.org/draft-07/schema#",
        "properties": {
          "group_id": {
            "type": "string"
          },
          "item_id": {
            "type": "string"
          },
          "stream_name": {
            "type": "string"
          }
        },
        "required": [
          "group_id",
          "item_id",
          "stream_name"
        ],
        "title": "StreamGetInput",
        "type": "object"
      },
      "response_schema": {}
    },
    {
      "description": "List all items in a stream group",
      "metadata": {},
      "name": "stream::list",
      "request_schema": {
        "$schema": "http://json-schema.org/draft-07/schema#",
        "properties": {
          "group_id": {
            "type": "string"
          },
          "stream_name": {
            "type": "string"
          }
        },
        "required": [
          "group_id",
          "stream_name"
        ],
        "title": "StreamListInput",
        "type": "object"
      },
      "response_schema": {}
    },
    {
      "description": "List all available stream with metadata",
      "metadata": {},
      "name": "stream::list_all",
      "request_schema": {
        "$schema": "http://json-schema.org/draft-07/schema#",
        "description": "Input for stream.listAll (empty struct)",
        "title": "StreamListAllInput",
        "type": "object"
      },
      "response_schema": {
        "$schema": "http://json-schema.org/draft-07/schema#",
        "definitions": {
          "StreamMetadata": {
            "description": "Metadata for a stream (used by stream.listAll)",
            "properties": {
              "groups": {
                "items": {
                  "type": "string"
                },
                "type": "array"
              },
              "id": {
                "type": "string"
              }
            },
            "required": [
              "groups",
              "id"
            ],
            "type": "object"
          }
        },
        "properties": {
          "count": {
            "format": "uint",
            "minimum": 0,
            "type": "integer"
          },
          "stream": {
            "items": {
              "$ref": "#/definitions/StreamMetadata"
            },
            "type": "array"
          }
        },
        "required": [
          "count",
          "stream"
        ],
        "title": "StreamListAllResult",
        "type": "object"
      }
    },
    {
      "description": "List all groups in a stream",
      "metadata": {},
      "name": "stream::list_groups",
      "request_schema": {
        "$schema": "http://json-schema.org/draft-07/schema#",
        "properties": {
          "stream_name": {
            "type": "string"
          }
        },
        "required": [
          "stream_name"
        ],
        "title": "StreamListGroupsInput",
        "type": "object"
      },
      "response_schema": {}
    },
    {
      "description": "Send a custom event to stream subscribers",
      "metadata": {},
      "name": "stream::send",
      "request_schema": {
        "$schema": "http://json-schema.org/draft-07/schema#",
        "properties": {
          "data": true,
          "group_id": {
            "type": "string"
          },
          "id": {
            "type": [
              "string",
              "null"
            ]
          },
          "stream_name": {
            "type": "string"
          },
          "type": {
            "type": "string"
          }
        },
        "required": [
          "data",
          "group_id",
          "stream_name",
          "type"
        ],
        "title": "StreamSendInput",
        "type": "object"
      },
      "response_schema": {}
    },
    {
      "description": "Set a value in a stream",
      "metadata": {},
      "name": "stream::set",
      "request_schema": {
        "$schema": "http://json-schema.org/draft-07/schema#",
        "properties": {
          "data": true,
          "group_id": {
            "type": "string"
          },
          "item_id": {
            "type": "string"
          },
          "stream_name": {
            "type": "string"
          }
        },
        "required": [
          "data",
          "group_id",
          "item_id",
          "stream_name"
        ],
        "title": "StreamSetInput",
        "type": "object"
      },
      "response_schema": {
        "$schema": "http://json-schema.org/draft-07/schema#",
        "properties": {
          "new_value": {
            "description": "The value after the update"
          },
          "old_value": {
            "description": "The value before the update (None if key didn't exist)"
          }
        },
        "required": [
          "new_value"
        ],
        "title": "SetResult",
        "type": "object"
      }
    },
    {
      "description": "Atomically update a stream value with multiple operations",
      "metadata": {},
      "name": "stream::update",
      "request_schema": {
        "$schema": "http://json-schema.org/draft-07/schema#",
        "definitions": {
          "FieldPath": {
            "description": "Represents a path to a field in a JSON object",
            "type": "string"
          },
          "MergePath": {
            "anyOf": [
              {
                "type": "string"
              },
              {
                "items": {
                  "type": "string"
                },
                "type": "array"
              }
            ],
            "description": "Path target for a [`UpdateOp::Merge`] operation. Accepts either a single string (legacy / first-level field) or an array of literal segments (nested path).\n\nPath normalization rules applied by the engine: - absent / `Single(\"\")` / `Segments(vec![])` → root merge - `Single(\"foo\")` is equivalent to `Segments(vec![\"foo\".into()])` - `Segments([\"a\", \"b\", \"c\"])` walks three literal keys, never interpreting dots specially. `Segments(vec![\"a.b\".into()])` is a single literal key named `\"a.b\"`.\n\n**Variant ordering is load-bearing.** `#[serde(untagged)]` tries variants in declaration order — `Single` MUST come before `Segments` so a JSON string deserializes into `Single` rather than failing the array match first."
          },
          "UpdateOp": {
            "description": "Operations that can be performed atomically on a stream value",
            "oneOf": [
              {
                "description": "Set a value at path (overwrite)",
                "properties": {
                  "path": {
                    "$ref": "#/definitions/FieldPath"
                  },
                  "type": {
                    "enum": [
                      "set"
                    ],
                    "type": "string"
                  },
                  "value": true
                },
                "required": [
                  "path",
                  "type"
                ],
                "type": "object"
              },
              {
                "description": "Merge object into existing value (object-only). Path may be omitted (root merge), a single first-level key, or an array of literal segments for nested merge. See [`MergePath`].",
                "properties": {
                  "path": {
                    "anyOf": [
                      {
                        "$ref": "#/definitions/MergePath"
                      },
                      {
                        "type": "null"
                      }
                    ]
                  },
                  "type": {
                    "enum": [
                      "merge"
                    ],
                    "type": "string"
                  },
                  "value": true
                },
                "required": [
                  "type",
                  "value"
                ],
                "type": "object"
              },
              {
                "description": "Increment numeric value",
                "properties": {
                  "by": {
                    "format": "int64",
                    "type": "integer"
                  },
                  "path": {
                    "$ref": "#/definitions/FieldPath"
                  },
                  "type": {
                    "enum": [
                      "increment"
                    ],
                    "type": "string"
                  }
                },
                "required": [
                  "by",
                  "path",
                  "type"
                ],
                "type": "object"
              },
              {
                "description": "Decrement numeric value",
                "properties": {
                  "by": {
                    "format": "int64",
                    "type": "integer"
                  },
                  "path": {
                    "$ref": "#/definitions/FieldPath"
                  },
                  "type": {
                    "enum": [
                      "decrement"
                    ],
                    "type": "string"
                  }
                },
                "required": [
                  "by",
                  "path",
                  "type"
                ],
                "type": "object"
              },
              {
                "description": "Append an element to an array or concatenate a string",
                "properties": {
                  "path": {
                    "$ref": "#/definitions/FieldPath"
                  },
                  "type": {
                    "enum": [
                      "append"
                    ],
                    "type": "string"
                  },
                  "value": true
                },
                "required": [
                  "path",
                  "type",
                  "value"
                ],
                "type": "object"
              },
              {
                "description": "Remove a field",
                "properties": {
                  "path": {
                    "$ref": "#/definitions/FieldPath"
                  },
                  "type": {
                    "enum": [
                      "remove"
                    ],
                    "type": "string"
                  }
                },
                "required": [
                  "path",
                  "type"
                ],
                "type": "object"
              }
            ]
          }
        },
        "description": "Input for atomic stream update operations",
        "properties": {
          "group_id": {
            "type": "string"
          },
          "item_id": {
            "type": "string"
          },
          "ops": {
            "items": {
              "$ref": "#/definitions/UpdateOp"
            },
            "type": "array"
          },
          "stream_name": {
            "type": "string"
          }
        },
        "required": [
          "group_id",
          "item_id",
          "ops",
          "stream_name"
        ],
        "title": "StreamUpdateInput",
        "type": "object"
      },
      "response_schema": {
        "$schema": "http://json-schema.org/draft-07/schema#",
        "definitions": {
          "UpdateOpError": {
            "description": "Per-op error reported by an atomic update operation.",
            "properties": {
              "code": {
                "description": "Stable error code, e.g. `\"merge.path.too_deep\"`.",
                "type": "string"
              },
              "doc_url": {
                "description": "Optional documentation URL for this error class.",
                "type": [
                  "string",
                  "null"
                ]
              },
              "message": {
                "description": "Human-readable description with concrete numbers when applicable.",
                "type": "string"
              },
              "op_index": {
                "description": "Index of the offending op within the original `ops` array.",
                "format": "uint",
                "minimum": 0,
                "type": "integer"
              }
            },
            "required": [
              "code",
              "message",
              "op_index"
            ],
            "type": "object"
          }
        },
        "description": "Result of an atomic update operation",
        "properties": {
          "errors": {
            "description": "Errors encountered while applying ops. Successfully applied ops are still reflected in `new_value`. Field is omitted from JSON when empty for backward compatibility.",
            "items": {
              "$ref": "#/definitions/UpdateOpError"
            },
            "type": "array"
          },
          "new_value": {
            "description": "The value after the update"
          },
          "old_value": {
            "description": "The value before the update (None if key didn't exist)"
          }
        },
        "required": [
          "new_value"
        ],
        "title": "UpdateResult",
        "type": "object"
      }
    }
  ],
  "triggers": [
    {
      "description": "Stream join trigger",
      "invocation_schema": {
        "$schema": "http://json-schema.org/draft-07/schema#",
        "properties": {
          "condition_function_id": {
            "description": "Optional function ID to evaluate before invoking handler",
            "type": [
              "string",
              "null"
            ]
          },
          "stream_name": {
            "description": "Stream name to watch",
            "type": [
              "string",
              "null"
            ]
          }
        },
        "title": "StreamJoinLeaveTriggerConfig",
        "type": "object"
      },
      "metadata": {},
      "name": "stream:join",
      "return_schema": {
        "$schema": "http://json-schema.org/draft-07/schema#",
        "properties": {
          "event_type": {
            "description": "Event type (stream:join or stream:leave)",
            "type": "string"
          },
          "group_id": {
            "description": "Group ID",
            "type": "string"
          },
          "id": {
            "description": "Peer ID",
            "type": [
              "string",
              "null"
            ]
          },
          "stream_name": {
            "description": "Stream name",
            "type": "string"
          },
          "timestamp": {
            "description": "Event timestamp (ms)",
            "format": "int64",
            "type": "integer"
          }
        },
        "required": [
          "event_type",
          "group_id",
          "stream_name",
          "timestamp"
        ],
        "title": "StreamJoinLeaveCallRequest",
        "type": "object"
      }
    },
    {
      "description": "Stream leave trigger",
      "invocation_schema": {
        "$schema": "http://json-schema.org/draft-07/schema#",
        "properties": {
          "condition_function_id": {
            "description": "Optional function ID to evaluate before invoking handler",
            "type": [
              "string",
              "null"
            ]
          },
          "stream_name": {
            "description": "Stream name to watch",
            "type": [
              "string",
              "null"
            ]
          }
        },
        "title": "StreamJoinLeaveTriggerConfig",
        "type": "object"
      },
      "metadata": {},
      "name": "stream:leave",
      "return_schema": {
        "$schema": "http://json-schema.org/draft-07/schema#",
        "properties": {
          "event_type": {
            "description": "Event type (stream:join or stream:leave)",
            "type": "string"
          },
          "group_id": {
            "description": "Group ID",
            "type": "string"
          },
          "id": {
            "description": "Peer ID",
            "type": [
              "string",
              "null"
            ]
          },
          "stream_name": {
            "description": "Stream name",
            "type": "string"
          },
          "timestamp": {
            "description": "Event timestamp (ms)",
            "format": "int64",
            "type": "integer"
          }
        },
        "required": [
          "event_type",
          "group_id",
          "stream_name",
          "timestamp"
        ],
        "title": "StreamJoinLeaveCallRequest",
        "type": "object"
      }
    },
    {
      "description": "Stream trigger",
      "invocation_schema": {
        "$schema": "http://json-schema.org/draft-07/schema#",
        "properties": {
          "condition_function_id": {
            "description": "Optional function ID to evaluate before invoking handler",
            "type": [
              "string",
              "null"
            ]
          },
          "group_id": {
            "description": "Group ID filter",
            "type": [
              "string",
              "null"
            ]
          },
          "item_id": {
            "description": "Item ID filter",
            "type": [
              "string",
              "null"
            ]
          },
          "stream_name": {
            "description": "Stream name to watch",
            "type": [
              "string",
              "null"
            ]
          }
        },
        "title": "StreamTriggerConfig",
        "type": "object"
      },
      "metadata": {},
      "name": "stream",
      "return_schema": {
        "$schema": "http://json-schema.org/draft-07/schema#",
        "properties": {
          "event": {
            "description": "Event-specific data (create/update/delete/sync payload)"
          },
          "event_type": {
            "description": "Stream event type (create, update, delete, sync)",
            "type": "string"
          },
          "group_id": {
            "description": "Group ID",
            "type": "string"
          },
          "id": {
            "description": "Item ID",
            "type": [
              "string",
              "null"
            ]
          },
          "stream_name": {
            "description": "Stream name",
            "type": "string"
          },
          "timestamp": {
            "description": "Event timestamp (ms)",
            "format": "int64",
            "type": "integer"
          }
        },
        "required": [
          "event",
          "event_type",
          "group_id",
          "stream_name",
          "timestamp"
        ],
        "title": "StreamCallRequest",
        "type": "object"
      }
    }
  ]
}
```
