Queues
VentureKit provides message queues through SQS with automatic consumer wiring, a type-safe queue client, two-layer retry architecture, and built-in idempotency support.
1. Declare a Queue Intent
Section titled “1. Declare a Queue Intent”Add a queue definition on defineVenture() in vk.config.ts:
export default defineVenture({ base, security, queues: [ { id: 'notifications', type: 'standard' }, ], envs: { dev, prod },});Override queue behavior per environment (e.g. config/prod.ts):
export const prod: EnvConfigInput = { preset: 'medium', queues: [ { id: 'notifications', deadLetterQueue: true, batchSize: 5, timeout: 30 }, ],};2. Write a Consumer
Section titled “2. Write a Consumer”Create a file under src/queues/ whose name matches the queue id:
import { taskHandler } from '@venturekit/runtime';
interface NotificationEvent { type: string; userId: string; message: string;}
export const main = taskHandler<NotificationEvent>( async (event, ctx, logger) => { logger.info('Processing notification', { type: event.type, userId: event.userId, traceId: ctx.traceId, });
// Your business logic here await sendEmail(event.userId, event.message);
return { sent: true }; }, { retries: 2 },);VentureKit auto-discovers the handler file and wires it to the SQS queue at deploy time.
Sending Messages
Section titled “Sending Messages”Use the sendMessage and sendMessages functions from @venturekit/runtime/patterns:
import { sendMessage, sendMessages } from '@venturekit/runtime/patterns';
// Send a single messageawait sendMessage('notifications', { type: 'welcome', userId: 'user-123', message: 'Welcome to the platform!',});
// Send with delay (up to 15 minutes)await sendMessage('notifications', payload, { delaySeconds: 60,});
// Send a batch (auto-splits into groups of 10)await sendMessages('notifications', [ { type: 'welcome', userId: 'user-1', message: 'Hi!' }, { type: 'welcome', userId: 'user-2', message: 'Hi!' },]);FIFO Queues
Section titled “FIFO Queues”For ordered, exactly-once delivery:
// vk.config.ts (shared definition)queues: [ { id: 'order-events', type: 'fifo' },]
// config/prod.ts (per-env override)queues: [ { id: 'order-events', deadLetterQueue: true },]
// In your handler:await sendMessage('order-events', payload, { messageGroupId: orderId, // Messages with same group are ordered deduplicationId: `${orderId}-${action}`, // Prevents duplicates});Trace Propagation
Section titled “Trace Propagation”Trace context is automatically propagated when sending messages. The consumer receives the same traceId as the sender, enabling end-to-end observability across:
HTTP Handler → sendMessage() → Queue Consumer → sendMessage() → ... ↑ traceId ↑ same traceIdNo manual configuration needed.
Retry Architecture
Section titled “Retry Architecture”Queue consumers have two complementary layers of retries:
Layer 1 — Application Retries (inside one invocation)
Section titled “Layer 1 — Application Retries (inside one invocation)”Configured via taskHandler({ retries: N }). Retries the handler within a single Lambda execution for transient errors (network blips, throttles).
export const main = taskHandler(handler, { retries: 2, // Up to 2 retries (3 total attempts) retryDelayMs: 1000, // 1s → 2s → 4s exponential backoff});- Validation errors are never retried — they fail immediately.
- Uses exponential backoff:
retryDelayMs × 2^attempt. - Recommended: 1–3 retries for transient errors.
Layer 2 — Infrastructure Retries (managed by AWS)
Section titled “Layer 2 — Infrastructure Retries (managed by AWS)”When the Lambda function throws (all application retries exhausted), SQS handles re-delivery:
- Message becomes invisible for
visibilityTimeoutseconds. - After timeout, SQS re-delivers to another consumer.
- After
maxReceiveCountfailures, message moves to the dead-letter queue (DLQ).
// config/prod.ts (per-env override)queues: [ { id: 'order-events', deadLetterQueue: true, // Enable DLQ visibilityTimeoutSeconds: 180, // 3 minutes (default: timeout × 6) retentionDays: 14, // Keep messages 14 days (default: 4) },]Best Practice
Section titled “Best Practice”| Scenario | App Retries | Infra Retries |
|---|---|---|
| Transient network errors | retries: 2 | DLQ after 3 |
| External API calls | retries: 1 | DLQ after 5 |
| Database writes | retries: 0 | DLQ after 3 + idempotency |
| Critical payments | retries: 0 | DLQ after 1 + idempotency |
Idempotency
Section titled “Idempotency”SQS guarantees at-least-once delivery. When a message is retried (visibility timeout expires), your handler may run again with the same message. Use taskIdempotencyMiddleware to ensure exactly-once processing:
import { taskHandler } from '@venturekit/runtime';import { taskIdempotencyMiddleware, createDefaultIdempotencyStore,} from '@venturekit/runtime/patterns';
const store = await createDefaultIdempotencyStore();
export const main = taskHandler<OrderEvent>( async (event, ctx, logger) => { // This only runs once per unique SQS messageId await chargeCustomer(event.orderId, event.amount); return { charged: true }; }, { middleware: [taskIdempotencyMiddleware({ store })], retries: 0, // Let SQS handle retries, idempotency prevents duplicates },);How It Works
Section titled “How It Works”- Extracts idempotency key from SQS
messageId(default) or a custom field. - Checks the idempotency store for a previous execution.
- If found and completed → returns cached result (skips handler).
- If not found → saves “pending”, executes handler, saves “completed”.
- On error → deletes record so the retry can execute.
Custom Key Extractors
Section titled “Custom Key Extractors”import { eventFieldKeyExtractor } from '@venturekit/runtime/patterns';
// Use a field from the event payload as the idempotency keytaskIdempotencyMiddleware({ store, keyExtractor: eventFieldKeyExtractor('orderId'),});Storage Backends
Section titled “Storage Backends”| Environment | Backend | Notes |
|---|---|---|
| Local dev | In-memory | Auto-selected, per-process |
| Production | DynamoDB | Distributed, TTL auto-cleanup |
createDefaultIdempotencyStore() auto-selects the right backend.
Queue Intent Options
Section titled “Queue Intent Options”| Option | Type | Default | Description |
|---|---|---|---|
id | string | — | Queue identifier (matches src/queues/{id}.ts) |
type | 'standard' | 'fifo' | 'standard' | Queue type |
deadLetterQueue | boolean | false | Enable dead-letter queue |
retentionDays | number | 4 | Message retention in days |
visibilityTimeoutSeconds | number | timeout × 6 | Visibility timeout |
batchSize | number | 10 | Messages per Lambda invocation |
memorySize | number | 256 | Consumer Lambda memory (MB) |
timeout | number | 30 | Consumer Lambda timeout (seconds) |
Local Development
Section titled “Local Development”During vk dev, queue consumers are invoked via HTTP:
# Send a message to the notifications queue consumercurl -X POST http://localhost:3000/_dev/queue/notifications \ -H "Content-Type: application/json" \ -d '{"type": "welcome", "userId": "user-123", "message": "Hi!"}'The sendMessage() function automatically routes to the local dev server when VENTURE_LOCAL=true.
Related
Section titled “Related”- Infrastructure Intents — declaring queues
- Idempotency — HTTP handler idempotency
- Project Structure —
src/queues/convention