Skip to main content
The AgentQueue class is the producer side of the queue system. It enqueues jobs for agents and workflows to be processed asynchronously by workers.

Setup

import { AgentQueue } from "@radaros/queue";

const queue = new AgentQueue({
  connection: { host: "localhost", port: 6379 },
  queueName: "radaros:jobs", // optional, this is the default
});
connection
{ host: string; port: number }
required
Redis connection details.
queueName
string
default:"radaros:jobs"
BullMQ queue name.
defaultJobOptions
Record<string, unknown>
Default options for all jobs (priority, attempts, backoff, etc.).

Enqueue Agent Runs

const { jobId } = await queue.enqueueAgentRun({
  agentName: "assistant",
  input: "Analyze this quarterly report",
  sessionId: "user-123",
  userId: "user-123",
});

console.log("Job enqueued:", jobId);

With Priority and Delay

const { jobId } = await queue.enqueueAgentRun({
  agentName: "processor",
  input: "High priority task",
  priority: 1,        // Lower number = higher priority
  delay: 5000,        // Delay 5 seconds before processing
});

Enqueue Workflows

const { jobId } = await queue.enqueueWorkflow({
  workflowName: "content-pipeline",
  initialState: {
    topic: "AI in healthcare",
    format: "blog-post",
  },
  sessionId: "session-abc",
});

Check Job Status

const status = await queue.getJobStatus(jobId);

console.log(status.state);      // "waiting" | "active" | "completed" | "failed" | "delayed"
console.log(status.progress);   // 0-100
console.log(status.result);     // RunOutput (when completed)
console.log(status.error);      // Error message (when failed)

JobStatus Fields

FieldTypeDescription
jobIdstringUnique job identifier
statestringCurrent state
progressnumberCompletion percentage
resultRunOutputAgent output (when completed)
errorstringError message (when failed)
createdAtDateWhen the job was enqueued
processedAtDateWhen processing started
finishedAtDateWhen processing finished

Cancel Jobs

await queue.cancelJob(jobId);

Event Handlers

Listen for job completion or failure:
queue.onCompleted((jobId, result) => {
  console.log(`Job ${jobId} completed:`, result.text);
  // Send notification, update database, etc.
});

queue.onFailed((jobId, error) => {
  console.error(`Job ${jobId} failed:`, error.message);
  // Alert, retry logic, etc.
});

Cleanup

await queue.close();

Full Example

import { Agent, openai } from "@radaros/core";
import { AgentQueue } from "@radaros/queue";

const queue = new AgentQueue({
  connection: { host: "localhost", port: 6379 },
});

// Enqueue multiple jobs
const jobs = await Promise.all([
  queue.enqueueAgentRun({ agentName: "summarizer", input: "Long article text..." }),
  queue.enqueueAgentRun({ agentName: "translator", input: "Translate this to Spanish" }),
  queue.enqueueAgentRun({ agentName: "classifier", input: "Classify this email" }),
]);

// Monitor all jobs
for (const { jobId } of jobs) {
  const status = await queue.getJobStatus(jobId);
  console.log(`${jobId}: ${status.state}`);
}

// Wait for completion
queue.onCompleted((id, result) => {
  console.log(`Done: ${id} -> ${result.text.slice(0, 100)}`);
});