The AgentWorker consumes jobs from the BullMQ queue and executes them using registered agents and workflows. Workers can run in the same process or as separate services.
Setup
import { Agent, openai } from "@radaros/core";
import { AgentWorker } from "@radaros/queue";
const assistant = new Agent({
name: "assistant",
model: openai("gpt-4o"),
instructions: "You are a helpful assistant.",
});
const worker = new AgentWorker({
connection: { host: "localhost", port: 6379 },
agentRegistry: { assistant },
concurrency: 5,
});
worker.start();
connection
{ host: string; port: number }
required
Redis connection details. Must match the producer’s connection.
queueName
string
default:"radaros:jobs"
Queue name to consume from. Must match the producer.
Number of jobs to process simultaneously.
agentRegistry
Record<string, Agent>
required
Map of agent names to Agent instances. Names must match what the producer enqueues.
Map of workflow names to Workflow instances.
How It Works
- Worker connects to Redis and listens for jobs on the configured queue
- When a job arrives, it looks up the agent/workflow by name in the registry
- Executes
agent.run() or workflow.run() with the job’s input
- Reports progress via BullMQ job progress updates
- Stores the result (or error) back to Redis for the producer to retrieve
With Workflows
import { Agent, Workflow, openai } from "@radaros/core";
import { AgentWorker } from "@radaros/queue";
const researcher = new Agent({
name: "researcher",
model: openai("gpt-4o"),
instructions: "Research the given topic thoroughly.",
});
const writer = new Agent({
name: "writer",
model: openai("gpt-4o"),
instructions: "Write a blog post based on the research.",
});
const pipeline = new Workflow({
name: "content-pipeline",
initialState: { topic: "", research: "", article: "" },
steps: [
{ name: "research", agent: researcher, inputFrom: (s) => s.topic },
{ name: "write", agent: writer, inputFrom: (s) => `Write about: ${s.research}` },
],
});
const worker = new AgentWorker({
connection: { host: "localhost", port: 6379 },
agentRegistry: { researcher, writer },
workflowRegistry: { "content-pipeline": pipeline },
concurrency: 3,
});
worker.start();
Graceful Shutdown
process.on("SIGTERM", async () => {
console.log("Shutting down worker...");
await worker.stop();
process.exit(0);
});
The stop() method waits for currently active jobs to complete before shutting down.
Event Bridging
The bridgeEventBusToJob utility connects an agent’s EventBus to BullMQ’s job progress system, enabling real-time progress tracking:
import { bridgeEventBusToJob } from "@radaros/queue";
// This is used internally by AgentWorker, but you can use it
// for custom worker implementations
const cleanup = bridgeEventBusToJob(agent.eventBus, job, runId);
// cleanup() removes all listeners when done
Events bridged:
run.stream.chunk updates job progress
tool.call adds to job logs
tool.result adds to job logs
Scaling
Run multiple worker processes to scale horizontally. BullMQ handles job distribution automatically:
# Terminal 1
node worker.js
# Terminal 2
node worker.js
# Terminal 3
node worker.js
Each worker processes up to concurrency jobs simultaneously. With 3 workers at concurrency 5, you can process 15 jobs in parallel.
Full Producer + Worker Example
// producer.ts
import { AgentQueue } from "@radaros/queue";
const queue = new AgentQueue({
connection: { host: "localhost", port: 6379 },
});
const { jobId } = await queue.enqueueAgentRun({
agentName: "assistant",
input: "Summarize the latest AI research papers",
});
queue.onCompleted((id, result) => {
console.log(`Result: ${result.text}`);
queue.close();
});
// worker.ts
import { Agent, openai } from "@radaros/core";
import { AgentWorker } from "@radaros/queue";
const assistant = new Agent({
name: "assistant",
model: openai("gpt-4o"),
instructions: "Summarize research papers concisely.",
});
const worker = new AgentWorker({
connection: { host: "localhost", port: 6379 },
agentRegistry: { assistant },
});
worker.start();
console.log("Worker listening for jobs...");