# session-manager

> Durable, reactive, branching store of typed conversation entries with six emitted trigger types.

| field | value |
|-------|-------|
| version | 0.1.2 |
| type | binary |
| repo | https://github.com/iii-hq/workers |
| supported_targets | x86_64-apple-darwin, aarch64-apple-darwin, i686-pc-windows-msvc, x86_64-pc-windows-msvc, aarch64-pc-windows-msvc, x86_64-unknown-linux-gnu, aarch64-unknown-linux-gnu, x86_64-unknown-linux-musl, armv7-unknown-linux-gnueabihf |
| author | iii |

## installation

```sh
iii worker add session-manager@0.1.2
```

## configuration

```yaml
- backend: fs
  backend_config:
    data_dir: ~/.iii/data/session-manager
  default_list_limit: 50
  max_list_limit: 500
```

## readme

# session-manager

Durable, reactive store for conversations. A session is an append-only
log of typed message entries (user, assistant, function_result, custom
— with optional fork branches) plus a small metadata record (title,
description, coarse status, app-defined metadata). Any worker or client
appends and reads over the bus; every mutation fires a trigger type
consumers bind to, so a chat UI, bot, or dashboard renders live with no
polling and no separate publish call. It runs no agent logic — it is
the conversation database the [harness](../harness) family drives, and
it is independently useful as a real-time chat store for any app.

## Install

```bash
iii worker add session-manager
```

`iii worker add` fetches the binary, writes a config block into
`~/.iii/config.yaml`, and the engine starts the worker on the next
`iii start`.

## Quickstart

Create a session, append a message, read the transcript back:

```rust
use iii_sdk::{register_worker, InitOptions, TriggerRequest};
use serde_json::json;

#[tokio::main]
async fn main() -> anyhow::Result<()> {
    let iii = register_worker("ws://localhost:49134", InitOptions::default());

    let created = iii.trigger(TriggerRequest {
        function_id: "session::create".into(),
        payload: json!({ "title": "Weather question", "metadata": { "owner": "u_1" } }),
        action: None,
        timeout_ms: Some(5_000),
    }).await?;
    let session_id = created["session_id"].as_str().unwrap().to_string();

    iii.trigger(TriggerRequest {
        function_id: "session::append".into(),
        payload: json!({
            "session_id": session_id,
            "message": {
                "role": "user",
                "content": [{ "type": "text", "text": "What's the weather?" }],
                "timestamp": 1717800000000_i64
            }
        }),
        action: None,
        timeout_ms: Some(5_000),
    }).await?;

    let transcript = iii.trigger(TriggerRequest {
        function_id: "session::messages".into(),
        payload: json!({ "session_id": session_id }),
        action: None,
        timeout_ms: Some(5_000),
    }).await?;

    println!("{transcript:#?}");
    Ok(())
}
```

To render live instead of polling, register a handler function and bind
it to a trigger type (the two-step reactive pattern):

```rust
use iii_sdk::{IIIError, RegisterFunction, RegisterTriggerInput};
use serde_json::{json, Value};

iii.register_function(
    "my-ui::on_message_updated",
    RegisterFunction::new_async(|event: Value| async move {
        // full updated message + monotonic revision; keep the highest
        println!("{} rev {}", event["entry_id"], event["revision"]);
        Ok::<_, IIIError>(json!({ "ok": true }))
    }),
);

iii.register_trigger(RegisterTriggerInput {
    trigger_type: "session::message-updated".into(),
    function_id: "my-ui::on_message_updated".into(),
    config: json!({ "session_id": session_id, "roles": ["assistant"] }),
    metadata: None,
})?;
```

Streaming an assistant reply uses the same primitives: append an
(initially empty) assistant message, then call `session::update_message`
as tokens arrive — each update fires `session::message-updated` with an
incremented `revision`.

The full function surface (14 `session::*` functions: lifecycle,
messages, branching/fork) is documented in the spec and on each
function's registered schema — see `iii worker info session-manager`.

## Custom trigger types

Six trigger types cover every mutation. Each binding's `config` filters
what reaches the handler; all configs additionally accept
`metadata` — a subset-equality match against `SessionMeta.metadata`
(the tenancy hook, e.g. `{ "owner": "u_1" }`). Malformed configs are
rejected at registration.

| Trigger type | Fires when | Config filters |
|---|---|---|
| `session::created` | A session is created (create / ensure / fork) | `metadata` |
| `session::message-added` | An entry is appended | `session_id`, `roles`, `metadata` |
| `session::message-updated` | A message's content changes (streaming deltas) | `session_id`, `roles`, `metadata` |
| `session::status-changed` | Status moves between idle/working/done/error | `session_id`, `metadata` |
| `session::meta-updated` | Title/description/metadata change | `session_id`, `metadata` |
| `session::deleted` | A session and its entries are removed | `session_id`, `metadata` |

Delivery is fire-and-forget, at-least-once, and unordered — reconcile
message updates by `revision` (keep the highest) and transcript order
by the parent chain, never by arrival order.

## Storage backends

Two backends, selected by `backend` + a nested `backend_config`:

- **`fs`** (default) — one append-only JSONL file per session under
  `data_dir` (`<encoded_session_id>.jsonl`): typed `meta` / `entry` /
  `leaf` records, replayed last-wins on startup, file removed on
  session delete. This is the durable, single-instance setup.
- **`bridge`** — this instance keeps all domain logic (idempotency,
  revisions, branching, locks) but stores through a **main** instance
  running its own session-manager (`backend: fs`) on another iii
  engine, via the internal `session::store::*` protocol.

Event propagation in a bridge topology: the main is the single fan-out
point. A bridged instance publishes each mutation's events to the main
(`session::store::publish_events`); the main delivers to its own
subscribers and forwards an envelope to **every** attached bridged
instance over its internal `session::store::events` feed; each bridge
re-emits through its local trigger types with its own subscribers'
filters. With several bridged instances attached to one main, a
mutation made anywhere reaches every instance's subscribers exactly
once. (Consequence: an originator's own subscribers hear events after
the round trip through the main — same at-least-once, unordered
contract as always.)

The `session::store::*` functions are deployment plumbing served only
by fs-mode instances — deny them to agents like every other mutating
surface.

## Configuration

```yaml
backend: fs                # fs | bridge
backend_config:
  data_dir: ~/.iii/data/session-manager   # fs: one <session_id>.jsonl per session

# backend: bridge
# backend_config:
#   url: ws://127.0.0.1:49134   # main engine WebSocket URL
#   timeout_ms: 5000            # per store/publish call timeout

default_list_limit: 50   # page size when list/messages omit `limit`
max_list_limit: 500      # hard cap on any requested `limit`
```

An invalid `backend_config` is fatal at boot (a misconfigured bridge
never silently falls back to writing a local fs store). Other defaults
live in [`src/config.rs`](src/config.rs).

## Local development & testing

```bash
cargo run --release -- --url ws://127.0.0.1:49134 --config ./config.yaml
cargo test                       # unit + manifest + BDD (engine scenarios self-skip)
cargo test --test bdd -- --tags @pure    # no engine required
cargo test --test bdd -- --tags @engine  # requires a running `iii`
```

The BDD suite under [`tests/features/`](tests/features) is the
behavioural contract: @pure scenarios drive the production handlers
against a real fs store over a tempdir with deterministic ids/clock
(including restart-replay scenarios); @engine scenarios drive the same
code over a live engine, including JSONL persistence readbacks, real
trigger fan-out, and full bridge propagation (two bridged instances +
the main, asserting who received what, exactly once).

## Architecture documentation

Deep documentation lives in [`architecture/`](architecture):
[`architecture/integration.md`](architecture/integration.md) is the
self-contained handoff contract for workers that build on this one
(functions, events, filters, error codes, patterns, topologies);
[`architecture/internals.md`](architecture/internals.md) is the
maintainer deep-dive (module map, invariants, storage formats, event
pipeline, testing architecture).

## api reference

```json
{
  "functions": [
    {
      "description": "Append one entry (idempotent on entry_id); fires session::message-added.",
      "metadata": {},
      "name": "session::append",
      "request_schema": {},
      "response_schema": {}
    },
    {
      "description": "Append several message entries in order; fires session::message-added per entry.",
      "metadata": {},
      "name": "session::append_many",
      "request_schema": {},
      "response_schema": {}
    },
    {
      "description": "Create a session at status idle; fires session::created.",
      "metadata": {},
      "name": "session::create",
      "request_schema": {},
      "response_schema": {}
    },
    {
      "description": "Delete a session and its entries; fires session::deleted.",
      "metadata": {},
      "name": "session::delete",
      "request_schema": {},
      "response_schema": {}
    },
    {
      "description": "Idempotently ensure a session with a given id exists; fires session::created only when it creates.",
      "metadata": {},
      "name": "session::ensure",
      "request_schema": {},
      "response_schema": {}
    },
    {
      "description": "Copy history up to an entry into a new session (copy-on-fork); fires session::created.",
      "metadata": {},
      "name": "session::fork",
      "request_schema": {},
      "response_schema": {}
    },
    {
      "description": "Read one session's metadata (null when unknown).",
      "metadata": {},
      "name": "session::get",
      "request_schema": {},
      "response_schema": {}
    },
    {
      "description": "Read a single entry by id (null when unknown).",
      "metadata": {},
      "name": "session::get_message",
      "request_schema": {},
      "response_schema": {}
    },
    {
      "description": "List sessions with pagination, ordering, and status/metadata filters.",
      "metadata": {},
      "name": "session::list",
      "request_schema": {},
      "response_schema": {}
    },
    {
      "description": "Load the active path as messages with entry ids, oldest first; pagination and role filtering.",
      "metadata": {},
      "name": "session::messages",
      "request_schema": {},
      "response_schema": {}
    },
    {
      "description": "Move the active path to end at a given entry (branch switch).",
      "metadata": {},
      "name": "session::set_active_leaf",
      "request_schema": {},
      "response_schema": {}
    },
    {
      "description": "Update a session's title/description/metadata; fires session::meta-updated.",
      "metadata": {},
      "name": "session::set_meta",
      "request_schema": {},
      "response_schema": {}
    },
    {
      "description": "Set status idle/working/done/error; fires session::status-changed (no-op when unchanged).",
      "metadata": {},
      "name": "session::set_status",
      "request_schema": {},
      "response_schema": {}
    },
    {
      "description": "Internal store protocol: clear a session's active leaf pointer.",
      "metadata": {},
      "name": "session::store::delete_active_leaf",
      "request_schema": {},
      "response_schema": {}
    },
    {
      "description": "Internal store protocol: delete every entry of a session.",
      "metadata": {},
      "name": "session::store::delete_entries",
      "request_schema": {},
      "response_schema": {}
    },
    {
      "description": "Internal store protocol: delete one SessionMeta.",
      "metadata": {},
      "name": "session::store::delete_meta",
      "request_schema": {},
      "response_schema": {}
    },
    {
      "description": "Internal store protocol: read a session's active leaf pointer.",
      "metadata": {},
      "name": "session::store::get_active_leaf",
      "request_schema": {},
      "response_schema": {}
    },
    {
      "description": "Internal store protocol: read one SessionEntry (null when unknown).",
      "metadata": {},
      "name": "session::store::get_entry",
      "request_schema": {},
      "response_schema": {}
    },
    {
      "description": "Internal store protocol: read one SessionMeta (null when unknown).",
      "metadata": {},
      "name": "session::store::get_meta",
      "request_schema": {},
      "response_schema": {}
    },
    {
      "description": "Internal store protocol: list every entry of a session.",
      "metadata": {},
      "name": "session::store::list_entries",
      "request_schema": {},
      "response_schema": {}
    },
    {
      "description": "Internal store protocol: list every SessionMeta.",
      "metadata": {},
      "name": "session::store::list_metas",
      "request_schema": {},
      "response_schema": {}
    },
    {
      "description": "Internal store protocol: ingest a bridged instance's event envelopes and fan them out to local subscribers and every attached bridge.",
      "metadata": {},
      "name": "session::store::publish_events",
      "request_schema": {},
      "response_schema": {}
    },
    {
      "description": "Internal store protocol: write one SessionEntry.",
      "metadata": {},
      "name": "session::store::put_entry",
      "request_schema": {},
      "response_schema": {}
    },
    {
      "description": "Internal store protocol: write one SessionMeta.",
      "metadata": {},
      "name": "session::store::put_meta",
      "request_schema": {},
      "response_schema": {}
    },
    {
      "description": "Internal store protocol: move a session's active leaf pointer.",
      "metadata": {},
      "name": "session::store::set_active_leaf",
      "request_schema": {},
      "response_schema": {}
    },
    {
      "description": "Replace a message entry's content (optimistic concurrency via expected_revision); fires session::message-updated.",
      "metadata": {},
      "name": "session::update_message",
      "request_schema": {},
      "response_schema": {}
    }
  ],
  "triggers": [
    {
      "description": "A new session exists (via session::create or session::fork).",
      "invocation_schema": {},
      "metadata": {},
      "name": "session::created",
      "return_schema": {}
    },
    {
      "description": "A session and its entries were removed.",
      "invocation_schema": {},
      "metadata": {},
      "name": "session::deleted",
      "return_schema": {}
    },
    {
      "description": "A message entry was appended to a session.",
      "invocation_schema": {},
      "metadata": {},
      "name": "session::message-added",
      "return_schema": {}
    },
    {
      "description": "A message entry's content changed (e.g. streaming deltas).",
      "invocation_schema": {},
      "metadata": {},
      "name": "session::message-updated",
      "return_schema": {}
    },
    {
      "description": "A session's title/description/metadata changed.",
      "invocation_schema": {},
      "metadata": {},
      "name": "session::meta-updated",
      "return_schema": {}
    },
    {
      "description": "A session's status changed (idle/working/done/error).",
      "invocation_schema": {},
      "metadata": {},
      "name": "session::status-changed",
      "return_schema": {}
    },
    {
      "description": "Internal: event-envelope feed for bridged session-manager instances. Not for direct consumption — bind the six public session::* types instead.",
      "invocation_schema": {},
      "metadata": {},
      "name": "session::store::events",
      "return_schema": {}
    }
  ]
}
```
