iii-stream
v0.14.0-next.1Build durable streams for real-time data subscriptions.
skill doc
Atomically apply ops to an item without read-modify-write
how-toWhen to use
Call stream::update when you want to change a stored item without
reading it first and risking a lost-write race against a concurrent
writer. Pass an ordered list of operations (set, merge, increment,
decrement, append, remove); the worker applies them atomically
against the current value and broadcasts the result the same way
stream::set does.
Reach for it when:
- Two writers can hit the same
(stream, group, item)at once.updateserializes the ops against the live value;setwould let the second writer clobber the first. - You want to mutate one field of an object without re-sending the rest
(
mergeat root, orsetat apath). - You're maintaining counters (
increment/decrement) or growing arrays/strings (append) where the pre-state is the source of truth.
Use stream::set instead when you have the full
desired value and concurrent writers aren't a concern — set is one
adapter round-trip; update does a read-then-write.
Use stream::delete instead when the goal is
removing the item entirely. remove ops only delete fields inside the
current value.
Inputs
{
"stream_name": "counters", // required; top-level namespace
"group_id": "global", // required; second-level partition
"item_id": "page-views", // required; identifier within the group
"ops": [ // required; ordered list, applied left-to-right
{ "type": "increment", "path": "total", "by": 1 },
{ "type": "set", "path": "last_seen_at", "value": "2026-05-20T17:00:00Z" },
{ "type": "merge", "value": { "source": "web" } }
]
}All four fields are required. ops accepts six shapes; note that
only merge and append accept a nested-path or omitted-path
form. set/increment/decrement/remove use FieldPath, which is
a single string (empty string means root for set).
{ "type": "set", "path": "— overwrite a field, or replace the root value when", "value": } pathis"".pathis aFieldPath(single string).value: nullwrites a JSON null; omit the field entirely is a deserialization error.{ "type": "merge", "path": <"field" | ["a","b","c"] | omitted>, "value":— shallow-merge an object at root, at a first-level key, or at a nested path of literal segments. Object-only — thevaluemust be a JSON object.pathis aMergePath:- omitted,
"", or[]→ root merge. "foo"is equivalent to["foo"](single first-level key).["a", "b", "c"]walks three literal keys; dots inside a segment are treated as part of the literal name (["a.b"]is one key named"a.b", not nesteda → b).
- omitted,
{ "type": "increment", "path": "— add to a numeric field.", "by": } pathis aFieldPath(single string; no nested paths).{ "type": "decrement", "path": "— subtract from a numeric field. Same", "by": } FieldPathshape asincrement.{ "type": "append", "path": <"field" | ["a","b","c"] | omitted>, "value":— push one element to an array, or concatenate to a string.} pathfollows the sameMergePathrules asmerge. Omitpath(or send"") to append at root.{ "type": "remove", "path": "— delete a field from the current object." } pathis aFieldPath(single string).
When the item doesn't exist, the worker treats the pre-state as null
and applies ops against it — most ops error in that case, but set at
root or merge at root will create the item.
Outputs
{
"old_value": { "total": 41, "source": "web" }, // value before; null if the item didn't exist
"new_value": { "total": 42, "last_seen_at": "2026-05-20T17:00:00Z", "source": "web" },
"errors": [ // omitted when empty; per-op failures from the ops array
{ "op_index": 2, "code": "merge.path.too_deep", "message": "merge depth exceeds max", "doc_url": null }
]
}old_valueisnullexactly when the item did not exist before the call.new_valuereflects every op that succeeded. Failing ops are recorded inerrorsand skipped — the call is not aborted by a single bad op. Inspecterrorsafter every call that mixes op types.errorsis omitted from the JSON envelope when empty (it is not serialized as[]). Treat the absence of the field as "no failures".- Each
UpdateOpErrorcarriesop_index(zero-based position in the requestopsarray), a stablecode(e.g.merge.path.too_deep), a human-readablemessage, and an optionaldoc_url.
Side effects
Same shape as stream::set:
- Synchronous, before the call returns — the adapter persists
new_valueat(stream_name, group_id, item_id). - Asynchronous, fire-and-forget — the worker spawns a task that
invokes every registered
streamtrigger whose filter matches. Handlers run on that spawned task after theupdatecall returns. Slow or failing handlers do not delay or fail the originatingupdate. - Synchronous, before the call returns — a
StreamWrapperMessagewithevent.type: "create"(whenold_valuewasnull) or"update"(otherwise) is broadcast to every WebSocket subscribed to the stream and group.event.datacarriesnew_value.
The actual call-site sequence is persist → spawn trigger task →
broadcast → return, identical to set.
A call where every op failed but the underlying value was unchanged
still broadcasts and fires triggers — the broadcast carries the
unchanged new_value. Use errors to detect this case server-side.
Worked example
Increment a page-view counter and stamp the last-seen timestamp in one atomic call:
{
"stream_name": "counters",
"group_id": "global",
"item_id": "page-views",
"ops": [
{ "type": "increment", "path": "total", "by": 1 },
{ "type": "set", "path": "last_seen_at", "value": "2026-05-20T17:00:00Z" }
]
}The response carries old_value, the post-write new_value, and (only when at least one op failed) an errors array with per-op op_index / code / message. errors is omitted entirely when every op succeeded, so the absence of the field is the success signal — see Outputs for the precise shape.
Related
stream::set— overwrite blindly when concurrent writers aren't a concern; one round-trip.stream::get— read the current value when you genuinely need to inspect it before deciding which ops to send.stream::delete— remove the entire item;removeops only delete fields, not the item itself.stream::send— broadcast a transient event without modifying the item.streamtrigger (iii://iii-stream/stream/reactive-triggers) — register a handler that fires on every successful update whose filter matches.