$worker

session-manager

v0.1.2

Durable, reactive, branching store of typed conversation entries with six emitted trigger types.

  • macOS: arm64 · x64
  • Linux: arm64 · armv7 · x64
  • Windows: arm64 · x64 · x86
agent-ready brief for v0.1.2
install + config + dependencies + readme + api reference, all in one place. fetch as agent-context.md for an llm to consume.
the same content rendered as discrete blocks below is exposed as a single markdown document at /workers/session-manager.md?version=0.1.2. paste it into an llm prompt or pipe it through curl from a worker.

install

install
$iii worker add session-manager@0.1.2

configuration

iii-config.yaml
- backend: fs
  backend_config:
    data_dir: ~/.iii/data/session-manager
  default_list_limit: 50
  max_list_limit: 500

dependencies

no dependencies for v0.1.2

readme

README.md

session-manager

Durable, reactive store for conversations. A session is an append-only log of typed message entries (user, assistant, function_result, custom — with optional fork branches) plus a small metadata record (title, description, coarse status, app-defined metadata). Any worker or client appends and reads over the bus; every mutation fires a trigger type consumers bind to, so a chat UI, bot, or dashboard renders live with no polling and no separate publish call. It runs no agent logic — it is the conversation database the harness family drives, and it is independently useful as a real-time chat store for any app.

Install

iii worker add session-manager

iii worker add fetches the binary, writes a config block into ~/.iii/config.yaml, and the engine starts the worker on the next iii start.

Quickstart

Create a session, append a message, read the transcript back:

use iii_sdk::{register_worker, InitOptions, TriggerRequest};
use serde_json::json;

#[tokio::main]
async fn main() -> anyhow::Result<()> {
    let iii = register_worker("ws://localhost:49134", InitOptions::default());

    let created = iii.trigger(TriggerRequest {
        function_id: "session::create".into(),
        payload: json!({ "title": "Weather question", "metadata": { "owner": "u_1" } }),
        action: None,
        timeout_ms: Some(5_000),
    }).await?;
    let session_id = created["session_id"].as_str().unwrap().to_string();

    iii.trigger(TriggerRequest {
        function_id: "session::append".into(),
        payload: json!({
            "session_id": session_id,
            "message": {
                "role": "user",
                "content": [{ "type": "text", "text": "What's the weather?" }],
                "timestamp": 1717800000000_i64
            }
        }),
        action: None,
        timeout_ms: Some(5_000),
    }).await?;

    let transcript = iii.trigger(TriggerRequest {
        function_id: "session::messages".into(),
        payload: json!({ "session_id": session_id }),
        action: None,
        timeout_ms: Some(5_000),
    }).await?;

    println!("{transcript:#?}");
    Ok(())
}

To render live instead of polling, register a handler function and bind it to a trigger type (the two-step reactive pattern):

use iii_sdk::{IIIError, RegisterFunction, RegisterTriggerInput};
use serde_json::{json, Value};

iii.register_function(
    "my-ui::on_message_updated",
    RegisterFunction::new_async(|event: Value| async move {
        // full updated message + monotonic revision; keep the highest
        println!("{} rev {}", event["entry_id"], event["revision"]);
        Ok::<_, IIIError>(json!({ "ok": true }))
    }),
);

iii.register_trigger(RegisterTriggerInput {
    trigger_type: "session::message-updated".into(),
    function_id: "my-ui::on_message_updated".into(),
    config: json!({ "session_id": session_id, "roles": ["assistant"] }),
    metadata: None,
})?;

Streaming an assistant reply uses the same primitives: append an (initially empty) assistant message, then call session::update_message as tokens arrive — each update fires session::message-updated with an incremented revision.

The full function surface (14 session::* functions: lifecycle, messages, branching/fork) is documented in the spec and on each function's registered schema — see iii worker info session-manager.

Custom trigger types

Six trigger types cover every mutation. Each binding's config filters what reaches the handler; all configs additionally accept metadata — a subset-equality match against SessionMeta.metadata (the tenancy hook, e.g. { "owner": "u_1" }). Malformed configs are rejected at registration.

Trigger type Fires when Config filters
session::created A session is created (create / ensure / fork) metadata
session::message-added An entry is appended session_id, roles, metadata
session::message-updated A message's content changes (streaming deltas) session_id, roles, metadata
session::status-changed Status moves between idle/working/done/error session_id, metadata
session::meta-updated Title/description/metadata change session_id, metadata
session::deleted A session and its entries are removed session_id, metadata

Delivery is fire-and-forget, at-least-once, and unordered — reconcile message updates by revision (keep the highest) and transcript order by the parent chain, never by arrival order.

Storage backends

Two backends, selected by backend + a nested backend_config:

  • fs (default) — one append-only JSONL file per session under data_dir (.jsonl): typed meta / entry / leaf records, replayed last-wins on startup, file removed on session delete. This is the durable, single-instance setup.
  • bridge — this instance keeps all domain logic (idempotency, revisions, branching, locks) but stores through a main instance running its own session-manager (backend: fs) on another iii engine, via the internal session::store::* protocol.

Event propagation in a bridge topology: the main is the single fan-out point. A bridged instance publishes each mutation's events to the main (session::store::publish_events); the main delivers to its own subscribers and forwards an envelope to every attached bridged instance over its internal session::store::events feed; each bridge re-emits through its local trigger types with its own subscribers' filters. With several bridged instances attached to one main, a mutation made anywhere reaches every instance's subscribers exactly once. (Consequence: an originator's own subscribers hear events after the round trip through the main — same at-least-once, unordered contract as always.)

The session::store::* functions are deployment plumbing served only by fs-mode instances — deny them to agents like every other mutating surface.

Configuration

backend: fs                # fs | bridge
backend_config:
  data_dir: ~/.iii/data/session-manager   # fs: one <session_id>.jsonl per session

# backend: bridge
# backend_config:
#   url: ws://127.0.0.1:49134   # main engine WebSocket URL
#   timeout_ms: 5000            # per store/publish call timeout

default_list_limit: 50   # page size when list/messages omit `limit`
max_list_limit: 500      # hard cap on any requested `limit`

An invalid backend_config is fatal at boot (a misconfigured bridge never silently falls back to writing a local fs store). Other defaults live in src/config.rs.

Local development & testing

cargo run --release -- --url ws://127.0.0.1:49134 --config ./config.yaml
cargo test                       # unit + manifest + BDD (engine scenarios self-skip)
cargo test --test bdd -- --tags @pure    # no engine required
cargo test --test bdd -- --tags @engine  # requires a running `iii`

The BDD suite under tests/features/ is the behavioural contract: @pure scenarios drive the production handlers against a real fs store over a tempdir with deterministic ids/clock (including restart-replay scenarios); @engine scenarios drive the same code over a live engine, including JSONL persistence readbacks, real trigger fan-out, and full bridge propagation (two bridged instances + the main, asserting who received what, exactly once).

Architecture documentation

Deep documentation lives in architecture/: architecture/integration.md is the self-contained handoff contract for workers that build on this one (functions, events, filters, error codes, patterns, topologies); architecture/internals.md is the maintainer deep-dive (module map, invariants, storage formats, event pipeline, testing architecture).

api reference (json)

agent-api-reference.json
{
  "functions": [
    {
      "description": "Append one entry (idempotent on entry_id); fires session::message-added.",
      "metadata": {},
      "name": "session::append",
      "request_schema": {},
      "response_schema": {}
    },
    {
      "description": "Append several message entries in order; fires session::message-added per entry.",
      "metadata": {},
      "name": "session::append_many",
      "request_schema": {},
      "response_schema": {}
    },
    {
      "description": "Create a session at status idle; fires session::created.",
      "metadata": {},
      "name": "session::create",
      "request_schema": {},
      "response_schema": {}
    },
    {
      "description": "Delete a session and its entries; fires session::deleted.",
      "metadata": {},
      "name": "session::delete",
      "request_schema": {},
      "response_schema": {}
    },
    {
      "description": "Idempotently ensure a session with a given id exists; fires session::created only when it creates.",
      "metadata": {},
      "name": "session::ensure",
      "request_schema": {},
      "response_schema": {}
    },
    {
      "description": "Copy history up to an entry into a new session (copy-on-fork); fires session::created.",
      "metadata": {},
      "name": "session::fork",
      "request_schema": {},
      "response_schema": {}
    },
    {
      "description": "Read one session's metadata (null when unknown).",
      "metadata": {},
      "name": "session::get",
      "request_schema": {},
      "response_schema": {}
    },
    {
      "description": "Read a single entry by id (null when unknown).",
      "metadata": {},
      "name": "session::get_message",
      "request_schema": {},
      "response_schema": {}
    },
    {
      "description": "List sessions with pagination, ordering, and status/metadata filters.",
      "metadata": {},
      "name": "session::list",
      "request_schema": {},
      "response_schema": {}
    },
    {
      "description": "Load the active path as messages with entry ids, oldest first; pagination and role filtering.",
      "metadata": {},
      "name": "session::messages",
      "request_schema": {},
      "response_schema": {}
    },
    {
      "description": "Move the active path to end at a given entry (branch switch).",
      "metadata": {},
      "name": "session::set_active_leaf",
      "request_schema": {},
      "response_schema": {}
    },
    {
      "description": "Update a session's title/description/metadata; fires session::meta-updated.",
      "metadata": {},
      "name": "session::set_meta",
      "request_schema": {},
      "response_schema": {}
    },
    {
      "description": "Set status idle/working/done/error; fires session::status-changed (no-op when unchanged).",
      "metadata": {},
      "name": "session::set_status",
      "request_schema": {},
      "response_schema": {}
    },
    {
      "description": "Internal store protocol: clear a session's active leaf pointer.",
      "metadata": {},
      "name": "session::store::delete_active_leaf",
      "request_schema": {},
      "response_schema": {}
    },
    {
      "description": "Internal store protocol: delete every entry of a session.",
      "metadata": {},
      "name": "session::store::delete_entries",
      "request_schema": {},
      "response_schema": {}
    },
    {
      "description": "Internal store protocol: delete one SessionMeta.",
      "metadata": {},
      "name": "session::store::delete_meta",
      "request_schema": {},
      "response_schema": {}
    },
    {
      "description": "Internal store protocol: read a session's active leaf pointer.",
      "metadata": {},
      "name": "session::store::get_active_leaf",
      "request_schema": {},
      "response_schema": {}
    },
    {
      "description": "Internal store protocol: read one SessionEntry (null when unknown).",
      "metadata": {},
      "name": "session::store::get_entry",
      "request_schema": {},
      "response_schema": {}
    },
    {
      "description": "Internal store protocol: read one SessionMeta (null when unknown).",
      "metadata": {},
      "name": "session::store::get_meta",
      "request_schema": {},
      "response_schema": {}
    },
    {
      "description": "Internal store protocol: list every entry of a session.",
      "metadata": {},
      "name": "session::store::list_entries",
      "request_schema": {},
      "response_schema": {}
    },
    {
      "description": "Internal store protocol: list every SessionMeta.",
      "metadata": {},
      "name": "session::store::list_metas",
      "request_schema": {},
      "response_schema": {}
    },
    {
      "description": "Internal store protocol: ingest a bridged instance's event envelopes and fan them out to local subscribers and every attached bridge.",
      "metadata": {},
      "name": "session::store::publish_events",
      "request_schema": {},
      "response_schema": {}
    },
    {
      "description": "Internal store protocol: write one SessionEntry.",
      "metadata": {},
      "name": "session::store::put_entry",
      "request_schema": {},
      "response_schema": {}
    },
    {
      "description": "Internal store protocol: write one SessionMeta.",
      "metadata": {},
      "name": "session::store::put_meta",
      "request_schema": {},
      "response_schema": {}
    },
    {
      "description": "Internal store protocol: move a session's active leaf pointer.",
      "metadata": {},
      "name": "session::store::set_active_leaf",
      "request_schema": {},
      "response_schema": {}
    },
    {
      "description": "Replace a message entry's content (optimistic concurrency via expected_revision); fires session::message-updated.",
      "metadata": {},
      "name": "session::update_message",
      "request_schema": {},
      "response_schema": {}
    }
  ],
  "triggers": [
    {
      "description": "A new session exists (via session::create or session::fork).",
      "invocation_schema": {},
      "metadata": {},
      "name": "session::created",
      "return_schema": {}
    },
    {
      "description": "A session and its entries were removed.",
      "invocation_schema": {},
      "metadata": {},
      "name": "session::deleted",
      "return_schema": {}
    },
    {
      "description": "A message entry was appended to a session.",
      "invocation_schema": {},
      "metadata": {},
      "name": "session::message-added",
      "return_schema": {}
    },
    {
      "description": "A message entry's content changed (e.g. streaming deltas).",
      "invocation_schema": {},
      "metadata": {},
      "name": "session::message-updated",
      "return_schema": {}
    },
    {
      "description": "A session's title/description/metadata changed.",
      "invocation_schema": {},
      "metadata": {},
      "name": "session::meta-updated",
      "return_schema": {}
    },
    {
      "description": "A session's status changed (idle/working/done/error).",
      "invocation_schema": {},
      "metadata": {},
      "name": "session::status-changed",
      "return_schema": {}
    },
    {
      "description": "Internal: event-envelope feed for bridged session-manager instances. Not for direct consumption — bind the six public session::* types instead.",
      "invocation_schema": {},
      "metadata": {},
      "name": "session::store::events",
      "return_schema": {}
    }
  ]
}