Skip to content

Queues

VentureKit provides message queues through SQS with automatic consumer wiring, a type-safe queue client, two-layer retry architecture, and built-in idempotency support.

Add a queue definition on defineVenture() in vk.config.ts:

export default defineVenture({
base,
security,
queues: [
{ id: 'notifications', type: 'standard' },
],
envs: { dev, prod },
});

Override queue behavior per environment (e.g. config/prod.ts):

export const prod: EnvConfigInput = {
preset: 'medium',
queues: [
{ id: 'notifications', deadLetterQueue: true, batchSize: 5, timeout: 30 },
],
};

Create a file under src/queues/ whose name matches the queue id:

src/queues/notifications.ts
import { taskHandler } from '@venturekit/runtime';
interface NotificationEvent {
type: string;
userId: string;
message: string;
}
export const main = taskHandler<NotificationEvent>(
async (event, ctx, logger) => {
logger.info('Processing notification', {
type: event.type,
userId: event.userId,
traceId: ctx.traceId,
});
// Your business logic here
await sendEmail(event.userId, event.message);
return { sent: true };
},
{ retries: 2 },
);

VentureKit auto-discovers the handler file and wires it to the SQS queue at deploy time.

Use the sendMessage and sendMessages functions from @venturekit/runtime/patterns:

import { sendMessage, sendMessages } from '@venturekit/runtime/patterns';
// Send a single message
await sendMessage('notifications', {
type: 'welcome',
userId: 'user-123',
message: 'Welcome to the platform!',
});
// Send with delay (up to 15 minutes)
await sendMessage('notifications', payload, {
delaySeconds: 60,
});
// Send a batch (auto-splits into groups of 10)
await sendMessages('notifications', [
{ type: 'welcome', userId: 'user-1', message: 'Hi!' },
{ type: 'welcome', userId: 'user-2', message: 'Hi!' },
]);

For ordered, exactly-once delivery:

// vk.config.ts (shared definition)
queues: [
{ id: 'order-events', type: 'fifo' },
]
// config/prod.ts (per-env override)
queues: [
{ id: 'order-events', deadLetterQueue: true },
]
// In your handler:
await sendMessage('order-events', payload, {
messageGroupId: orderId, // Messages with same group are ordered
deduplicationId: `${orderId}-${action}`, // Prevents duplicates
});

Trace context is automatically propagated when sending messages. The consumer receives the same traceId as the sender, enabling end-to-end observability across:

HTTP Handler → sendMessage() → Queue Consumer → sendMessage() → ...
↑ traceId ↑ same traceId

No manual configuration needed.

Queue consumers have two complementary layers of retries:

Layer 1 — Application Retries (inside one invocation)

Section titled “Layer 1 — Application Retries (inside one invocation)”

Configured via taskHandler({ retries: N }). Retries the handler within a single Lambda execution for transient errors (network blips, throttles).

export const main = taskHandler(handler, {
retries: 2, // Up to 2 retries (3 total attempts)
retryDelayMs: 1000, // 1s → 2s → 4s exponential backoff
});
  • Validation errors are never retried — they fail immediately.
  • Uses exponential backoff: retryDelayMs × 2^attempt.
  • Recommended: 1–3 retries for transient errors.

Layer 2 — Infrastructure Retries (managed by AWS)

Section titled “Layer 2 — Infrastructure Retries (managed by AWS)”

When the Lambda function throws (all application retries exhausted), SQS handles re-delivery:

  1. Message becomes invisible for visibilityTimeout seconds.
  2. After timeout, SQS re-delivers to another consumer.
  3. After maxReceiveCount failures, message moves to the dead-letter queue (DLQ).
// config/prod.ts (per-env override)
queues: [
{
id: 'order-events',
deadLetterQueue: true, // Enable DLQ
visibilityTimeoutSeconds: 180, // 3 minutes (default: timeout × 6)
retentionDays: 14, // Keep messages 14 days (default: 4)
},
]
ScenarioApp RetriesInfra Retries
Transient network errorsretries: 2DLQ after 3
External API callsretries: 1DLQ after 5
Database writesretries: 0DLQ after 3 + idempotency
Critical paymentsretries: 0DLQ after 1 + idempotency

SQS guarantees at-least-once delivery. When a message is retried (visibility timeout expires), your handler may run again with the same message. Use taskIdempotencyMiddleware to ensure exactly-once processing:

import { taskHandler } from '@venturekit/runtime';
import {
taskIdempotencyMiddleware,
createDefaultIdempotencyStore,
} from '@venturekit/runtime/patterns';
const store = await createDefaultIdempotencyStore();
export const main = taskHandler<OrderEvent>(
async (event, ctx, logger) => {
// This only runs once per unique SQS messageId
await chargeCustomer(event.orderId, event.amount);
return { charged: true };
},
{
middleware: [taskIdempotencyMiddleware({ store })],
retries: 0, // Let SQS handle retries, idempotency prevents duplicates
},
);
  1. Extracts idempotency key from SQS messageId (default) or a custom field.
  2. Checks the idempotency store for a previous execution.
  3. If found and completed → returns cached result (skips handler).
  4. If not found → saves “pending”, executes handler, saves “completed”.
  5. On error → deletes record so the retry can execute.
import { eventFieldKeyExtractor } from '@venturekit/runtime/patterns';
// Use a field from the event payload as the idempotency key
taskIdempotencyMiddleware({
store,
keyExtractor: eventFieldKeyExtractor('orderId'),
});
EnvironmentBackendNotes
Local devIn-memoryAuto-selected, per-process
ProductionDynamoDBDistributed, TTL auto-cleanup

createDefaultIdempotencyStore() auto-selects the right backend.

OptionTypeDefaultDescription
idstringQueue identifier (matches src/queues/{id}.ts)
type'standard' | 'fifo''standard'Queue type
deadLetterQueuebooleanfalseEnable dead-letter queue
retentionDaysnumber4Message retention in days
visibilityTimeoutSecondsnumbertimeout × 6Visibility timeout
batchSizenumber10Messages per Lambda invocation
memorySizenumber256Consumer Lambda memory (MB)
timeoutnumber30Consumer Lambda timeout (seconds)

During vk dev, queue consumers are invoked via HTTP:

Terminal window
# Send a message to the notifications queue consumer
curl -X POST http://localhost:3000/_dev/queue/notifications \
-H "Content-Type: application/json" \
-d '{"type": "welcome", "userId": "user-123", "message": "Hi!"}'

The sendMessage() function automatically routes to the local dev server when VENTURE_LOCAL=true.