Installation
npm install @radaros/queue bullmq ioredis
Requires a running Redis server.
AgentQueue
The producer that enqueues agent and workflow jobs.
import { AgentQueue } from "@radaros/queue";
Constructor
new AgentQueue(config: QueueConfig)
connection
{ host: string; port: number }
required
Redis connection details.
queueName
string
default:"radaros:jobs"
Name of the BullMQ queue.
Default options applied to all jobs (priority, attempts, delay, etc.).
Methods
enqueueAgentRun
async enqueueAgentRun(opts: {
agentName: string;
input: string;
sessionId?: string;
userId?: string;
priority?: number;
delay?: number;
}): Promise<{ jobId: string }>
Enqueue an agent execution job.
enqueueWorkflow
async enqueueWorkflow(opts: {
workflowName: string;
initialState?: Record<string, unknown>;
sessionId?: string;
priority?: number;
delay?: number;
}): Promise<{ jobId: string }>
Enqueue a workflow execution job.
getJobStatus
async getJobStatus(jobId: string): Promise<JobStatus>
Get the current status of a job.
cancelJob
async cancelJob(jobId: string): Promise<void>
Cancel a pending or active job.
Event Handlers
queue.onCompleted((jobId: string, result: RunOutput) => {
console.log(`Job ${jobId} completed:`, result.text);
});
queue.onFailed((jobId: string, error: Error) => {
console.error(`Job ${jobId} failed:`, error.message);
});
close
async close(): Promise<void>
Close the queue connection.
AgentWorker
The consumer that processes jobs from the queue.
import { AgentWorker } from "@radaros/queue";
Constructor
new AgentWorker(config: WorkerConfig)
connection
{ host: string; port: number }
required
Redis connection details.
queueName
string
default:"radaros:jobs"
Queue name to consume from.
Number of jobs processed in parallel.
agentRegistry
Record<string, Agent>
required
Map of agent name to Agent instance.
Map of workflow name to Workflow instance.
Methods
start
Begin processing jobs from the queue.
stop
async stop(): Promise<void>
Gracefully stop the worker.
Types
JobPayload
type JobPayload = AgentJobPayload | WorkflowJobPayload;
interface AgentJobPayload {
type: "agent";
agentName: string;
input: string;
sessionId?: string;
userId?: string;
}
interface WorkflowJobPayload {
type: "workflow";
workflowName: string;
initialState?: Record<string, unknown>;
sessionId?: string;
}
JobStatus
interface JobStatus {
jobId: string;
state: "waiting" | "active" | "completed" | "failed" | "delayed";
progress?: number;
result?: RunOutput;
error?: string;
createdAt: Date;
processedAt?: Date;
finishedAt?: Date;
}
Utilities
bridgeEventBusToJob
import { bridgeEventBusToJob } from "@radaros/queue";
const cleanup = bridgeEventBusToJob(eventBus, job, runId);
// Returns cleanup function to remove listeners
Bridges agent EventBus events to BullMQ job progress and logs, enabling real-time job status tracking.
Example
import { Agent, openai } from "@radaros/core";
import { AgentQueue, AgentWorker } from "@radaros/queue";
const agent = new Agent({
name: "processor",
model: openai("gpt-4o"),
instructions: "Process the given data.",
});
// Producer
const queue = new AgentQueue({
connection: { host: "localhost", port: 6379 },
});
const { jobId } = await queue.enqueueAgentRun({
agentName: "processor",
input: "Analyze this dataset",
});
console.log("Enqueued job:", jobId);
// Worker (can be in a separate process)
const worker = new AgentWorker({
connection: { host: "localhost", port: 6379 },
agentRegistry: { processor: agent },
concurrency: 3,
});
worker.start();
// Monitor
queue.onCompleted((id, result) => {
console.log(`Job ${id}:`, result.text);
});