# iii-queue

> Queue worker for async job processing with named queues, retries, and dead-letter support.

| field | value |
|-------|-------|
| version | 0.11.6 |
| type | engine |
| repo | https://github.com/iii-hq/iii |
| author | iii |

## installation

```sh
iii worker add iii-queue@0.11.6
```

## configuration

```yaml
- adapter:
    name: builtin
```

## readme

# iii-queue

Asynchronous job processing with named queues, retries, and dead-letter support.

Supports two modes:

- **Topic-based queues** — register a consumer per topic, emit events via `iii::durable::publish`. Fan-out: every distinct function subscribed to a topic receives a copy of each message.
- **Named queues** — define queues in config, then enqueue function calls via `TriggerAction.Enqueue`. No trigger registration needed.

## Sample Configuration

```yaml
- name: iii-queue
  config:
    queue_configs:
      default:
        max_retries: 5
        concurrency: 5
        type: standard
      payment:
        max_retries: 10
        concurrency: 2
        type: fifo
        message_group_field: transaction_id
    adapter:
      name: builtin
      config:
        store_method: file_based
        file_path: ./data/queue_store
```

## Configuration

| Field | Type | Description |
|---|---|---|
| `queue_configs` | map[string, FunctionQueueConfig] | Map of named queue configurations. Each key is the queue name. |
| `adapter` | Adapter | Transport adapter. Defaults to `builtin`. |

### Queue Configuration (`queue_configs` entries)

| Field | Type | Description |
|---|---|---|
| `max_retries` | u32 | Maximum delivery attempts before routing to DLQ. Defaults to `3`. |
| `concurrency` | u32 | Maximum jobs processed simultaneously. Defaults to `10`. FIFO queues override this to `prefetch=1`. |
| `type` | string | `standard` (concurrent) or `fifo` (ordered within a message group). |
| `message_group_field` | string | Required for `fifo`. JSON field whose value determines the ordering group. |
| `backoff_ms` | u64 | Base retry backoff in milliseconds. Exponential: `backoff_ms × 2^(attempt−1)`. Defaults to `1000`. |
| `poll_interval_ms` | u64 | Worker poll interval in milliseconds. Defaults to `100`. |

## Queue Modes

### When to use which

| | Topic-based | Named queues |
|---|---|---|
| **Producer** | `trigger({ function_id: 'iii::durable::publish', payload: { topic, data } })` | `trigger({ function_id, payload, action: TriggerAction.Enqueue({ queue }) })` |
| **Consumer** | `registerTrigger({ type: 'durable:subscriber', config: { topic } })` | No registration — function is the target |
| **Delivery** | Fan-out: each subscribed function gets every message; replicas compete | Single target function per enqueue call |
| **Config** | Optional `queue_config` on trigger | `queue_configs` in `iii-config.yaml` |
| **Use case** | Durable pub/sub with retries and fan-out | Direct function invocation with retries, FIFO, DLQ |

### Standard vs FIFO Queues

| Dimension | Standard | FIFO |
|-----------|----------|------|
| **Processing model** | Up to `concurrency` jobs in parallel | One job at a time (prefetch=1) |
| **Ordering** | No guarantees | Strictly ordered within a message group |
| **`message_group_field`** | Not required | Required — must be present and non-null in every payload |
| **Throughput** | High | Lower — trades throughput for ordering |
| **Use cases** | Email sends, image processing, notifications | Payments, ledger entries, state machines |
| **Retries** | Retried independently, other jobs continue | Retried inline — blocks the queue until success or DLQ |

## Adapters

### builtin

Built-in in-process queue. No external dependencies. Suitable for single-instance deployments.

```yaml
name: builtin
config:
  store_method: file_based   # in_memory | file_based
  file_path: ./data/queue_store
```

### redis

Uses Redis Pub/Sub for topic-based queues. Supports multi-instance deployments.

> **Note:** The Redis adapter supports publishing to named queues but does not implement named queue consumption, retries, or dead-letter queues. For full named queue support in multi-instance deployments, use RabbitMQ.

```yaml
name: redis
config:
  redis_url: ${REDIS_URL:redis://localhost:6379}
```

### rabbitmq

Uses RabbitMQ for durable delivery, retries, and dead-letter queues across multiple engine instances.

```yaml
name: rabbitmq
config:
  amqp_url: ${RABBITMQ_URL:amqp://localhost:5672}
```

### Adapter Comparison

| | builtin | rabbitmq | redis |
|---|---|---|---|
| **Retries** | Yes | Yes | No |
| **Dead-letter queue** | Yes | Yes | No |
| **FIFO ordering** | Yes | Yes | No |
| **Named queue consumption** | Yes | Yes | No (publish only) |
| **Topic-based pub/sub** | Yes | Yes | Yes |
| **Multi-instance** | No | Yes | Yes |
| **External dependency** | None | RabbitMQ | Redis |

| Scenario | Recommended Adapter |
|----------|-------------------|
| Local development | `builtin` (`in_memory`) |
| Single-instance production | `builtin` (`file_based`) |
| Multi-instance production | `rabbitmq` |

## Builtin Functions

### `iii::durable::publish`

Publishes a message to a topic-based queue. Fanned out to every distinct subscribed function.

| Field | Type | Description |
|-------|------|-------------|
| `topic` | string | The topic to publish to (required). |
| `data` | any | The payload delivered to each subscribed function. |

### `iii::queue::redrive`

Moves all messages from a named queue's dead-letter queue back to the main queue.

| Field | Type | Description |
|-------|------|-------------|
| `queue` | string | The named queue whose DLQ should be redriven. |

Returns: `queue` (string), `redriven` (number)

```bash
iii trigger \
  --function-id='iii::queue::redrive' \
  --payload='{"queue": "payment"}'
```

## api reference

```json
{
  "functions": [
    {
      "description": "Browse DLQ messages",
      "metadata": {},
      "name": "engine::queue::dlq_messages",
      "request_schema": {
        "$schema": "http://json-schema.org/draft-07/schema#",
        "title": "AnyValue"
      },
      "response_schema": {}
    },
    {
      "description": "List DLQ topics with counts",
      "metadata": {},
      "name": "engine::queue::dlq_topics",
      "request_schema": {
        "$schema": "http://json-schema.org/draft-07/schema#",
        "title": "AnyValue"
      },
      "response_schema": {}
    },
    {
      "description": "List all queue topics",
      "metadata": {},
      "name": "engine::queue::list_topics",
      "request_schema": {
        "$schema": "http://json-schema.org/draft-07/schema#",
        "title": "AnyValue"
      },
      "response_schema": {}
    },
    {
      "description": "Get stats for a queue topic",
      "metadata": {},
      "name": "engine::queue::topic_stats",
      "request_schema": {
        "$schema": "http://json-schema.org/draft-07/schema#",
        "title": "AnyValue"
      },
      "response_schema": {}
    },
    {
      "description": "Enqueue a message",
      "metadata": {},
      "name": "iii::durable::publish",
      "request_schema": {
        "$schema": "http://json-schema.org/draft-07/schema#",
        "properties": {
          "data": true,
          "topic": {
            "type": "string"
          }
        },
        "required": [
          "data",
          "topic"
        ],
        "title": "QueueInput",
        "type": "object"
      },
      "response_schema": {}
    },
    {
      "description": "Discard (purge) a single DLQ message by ID",
      "metadata": {},
      "name": "iii::queue::discard_message",
      "request_schema": {
        "$schema": "http://json-schema.org/draft-07/schema#",
        "properties": {
          "message_id": {
            "type": "string"
          },
          "queue": {
            "type": "string"
          }
        },
        "required": [
          "message_id",
          "queue"
        ],
        "title": "RedriveSingleInput",
        "type": "object"
      },
      "response_schema": {
        "$schema": "http://json-schema.org/draft-07/schema#",
        "properties": {
          "message_id": {
            "type": "string"
          },
          "queue": {
            "type": "string"
          },
          "redriven": {
            "format": "uint64",
            "minimum": 0,
            "type": "integer"
          }
        },
        "required": [
          "message_id",
          "queue",
          "redriven"
        ],
        "title": "RedriveSingleResult",
        "type": "object"
      }
    },
    {
      "description": "Redrive all DLQ messages back to the main queue",
      "metadata": {},
      "name": "iii::queue::redrive",
      "request_schema": {
        "$schema": "http://json-schema.org/draft-07/schema#",
        "properties": {
          "queue": {
            "type": "string"
          }
        },
        "required": [
          "queue"
        ],
        "title": "RedriveInput",
        "type": "object"
      },
      "response_schema": {
        "$schema": "http://json-schema.org/draft-07/schema#",
        "properties": {
          "queue": {
            "type": "string"
          },
          "redriven": {
            "format": "uint64",
            "minimum": 0,
            "type": "integer"
          }
        },
        "required": [
          "queue",
          "redriven"
        ],
        "title": "RedriveResult",
        "type": "object"
      }
    },
    {
      "description": "Redrive a single DLQ message by ID back to the main queue",
      "metadata": {},
      "name": "iii::queue::redrive_message",
      "request_schema": {
        "$schema": "http://json-schema.org/draft-07/schema#",
        "properties": {
          "message_id": {
            "type": "string"
          },
          "queue": {
            "type": "string"
          }
        },
        "required": [
          "message_id",
          "queue"
        ],
        "title": "RedriveSingleInput",
        "type": "object"
      },
      "response_schema": {
        "$schema": "http://json-schema.org/draft-07/schema#",
        "properties": {
          "message_id": {
            "type": "string"
          },
          "queue": {
            "type": "string"
          },
          "redriven": {
            "format": "uint64",
            "minimum": 0,
            "type": "integer"
          }
        },
        "required": [
          "message_id",
          "queue",
          "redriven"
        ],
        "title": "RedriveSingleResult",
        "type": "object"
      }
    }
  ],
  "triggers": [
    {
      "description": "Queue core module",
      "invocation_schema": {
        "$schema": "http://json-schema.org/draft-07/schema#",
        "properties": {
          "condition_function_id": {
            "description": "Optional function ID to evaluate before invoking handler",
            "type": [
              "string",
              "null"
            ]
          },
          "queue_config": {
            "description": "Queue-specific subscriber configuration"
          },
          "topic": {
            "description": "Queue topic to subscribe to",
            "type": "string"
          }
        },
        "required": [
          "topic"
        ],
        "title": "QueueTriggerConfig",
        "type": "object"
      },
      "metadata": {},
      "name": "durable:subscriber",
      "return_schema": {}
    }
  ]
}
```
