「我已經有 Redis 喇,點解重要用 BullMQ?」好多人第一次接觸 job queue 都會問呢個問題。今日我哋深入探討 BullMQ 係咩、佢同 Redis 嘅關係、點解 Redis Pub/Sub 唔適合做 Job Queue,同埋何時應該用 BullMQ 而唔係直接用 Redis。
TL;DR
核心重點:
- 🎯 BullMQ ≠ Redis:BullMQ 係基於 Redis 構建 嘅 Job Queue library,唔係 Redis 嘅替代品
- 🚀 Redis Pub/Sub 嘅限制:冇持久化、冇重試機制、冇優先級,唔適合做可靠嘅 Job Queue
- ⚡ BullMQ 嘅優勢:用 Redis 提供持久化、自動重試、優先級、延遲任務、並發控制
- 📊 效能數據:BullMQ 可以處理 10,000+ jobs/sec,記憶體佔用比純 Redis Pub/Sub 低 50%
- 🛠️ 何時用邊個:背景任務處理 → BullMQ;即時訊息廣播 → Redis Pub/Sub
Table of Contents
- 甚麼是 BullMQ?佢同 Redis 嘅關係
- Redis Pub/Sub 點解唔適合做 Job Queue?
- Redis Streams:介乎 Pub/Sub 同 BullMQ 之間嘅選擇
- BullMQ 嘅核心功能同優勢
- RabbitMQ 深入解析
- 完整對比:Redis Pub/Sub vs Streams vs BullMQ vs RabbitMQ
- 實戰代碼:用 BullMQ 處理背景任務
- 效能優化同最佳實踐
- 總結
- 相關資源
甚麼是 BullMQ?佢同 Redis 嘅關係
BullMQ 係咩?
BullMQ 係一個基於 Redis 構建嘅 Job Queue library,專門用於 Node.js 背景任務處理。
// BullMQ 係一個 Node.js library
import { Queue, Worker } from 'bullmq';
// 佢 **需要** Redis 先可以運作
const queue = new Queue('emails', {
connection: {
host: 'localhost',
port: 6379 // Redis 連接
}
});
關鍵事實:
- ✅ BullMQ 係一個 library,唔係一個獨立嘅 message broker
- ✅ BullMQ 依賴 Redis 嚟儲存 job 數據
- ✅ BullMQ 用 Redis 嘅 List、Sorted Set、Hash 嚟實現 queue 功能
- ❌ BullMQ 唔係 Redis 嘅替代品,而係 Redis 嘅 上層封裝
💡 簡單比喻
如果 Redis 係一個「倉庫」,咁:
Redis Pub/Sub 係倉庫入面嘅「廣播系統」(冇儲存,即時傳送)
BullMQ 係倉庫入面嘅「智能物流系統」(有儲存、有追蹤、有重試)
BullMQ 用 Redis 嘅數據結構(List、Sorted Set)嚟構建一個可靠嘅 job queue。
BullMQ 點樣用 Redis?
BullMQ 用 Redis 嘅以下數據結構:
# Redis 數據結構對應
Queue:waiting → Redis List # 等待執行嘅 jobs
Queue:active → Redis List # 正在執行嘅 jobs
Queue:completed → Redis Set # 已完成嘅 jobs
Queue:failed → Redis Set # 失敗嘅 jobs
Queue:delayed → Redis Sorted Set # 延遲執行嘅 jobs (by timestamp)
Queue:priority → Redis Sorted Set # 優先級 jobs (by priority)
Queue:jobs:123 → Redis Hash # 個別 job 嘅詳細數據
示意圖:
┌─────────────────────────────────────┐
│ BullMQ (Node.js) │
│ ┌───────────┐ ┌────────────┐ │
│ │ Queue │ │ Worker │ │
│ └─────┬─────┘ └──────┬─────┘ │
└────────┼───────────────────┼───────┘
│ Redis Commands │
▼ ▼
┌─────────────────────────────────────┐
│ Redis │
│ ┌──────┐ ┌──────┐ ┌──────────┐ │
│ │ List │ │ Set │ │ SortedSet│ │
│ └──────┘ └──────┘ └──────────┘ │
└─────────────────────────────────────┘
Redis Pub/Sub 點解唔適合做 Job Queue?
好多人第一次聽到 job queue 會諗:「我有 Redis Pub/Sub 喇,點解重要用 BullMQ?」
Redis Pub/Sub 嘅工作原理
// Publisher
import Redis from 'ioredis';
const redis = new Redis();
await redis.publish('emails', JSON.stringify({
to: 'user@example.com',
subject: 'Hello'
}));
// Subscriber
const subscriber = new Redis();
subscriber.subscribe('emails');
subscriber.on('message', (channel, message) => {
console.log('Received:', message);
// 處理郵件...
});
睇落幾好喎!咁有咩問題?
問題 1:冇持久化 (Fire-and-Forget)
場景:
// 1. 發送 job
await redis.publish('emails', JSON.stringify({ to: 'user@example.com' }));
// 2. 如果而家 **冇 subscriber** 連接緊...
// ❌ 呢個 message **永久失去**!
// 3. 就算有 subscriber,但如果 subscriber 正在處理另一個 job...
// ❌ 呢個 message 都會 **遺失**!
Redis Pub/Sub 特性:
- ✅ 即時廣播(低延遲)
- ❌ 冇儲存:message 發出後,如果冇 subscriber,就永久失去
- ❌ 冇 queue:subscriber 處理緊嘢嘅時候,新 message 會遺失
BullMQ 解決方法:
// BullMQ 會將 job **持久化** 到 Redis
await queue.add('sendEmail', { to: 'user@example.com' });
// ✅ 就算而家冇 worker,job 都會保存喺 Redis
// ✅ Worker 啟動後會自動處理所有 pending jobs
問題 2:冇重試機制
場景:
// Redis Pub/Sub
subscriber.on('message', async (channel, message) => {
try {
await sendEmail(JSON.parse(message));
} catch (error) {
// ❌ 失敗咗點算?message 已經冇咗...
// ❌ 你要自己寫 retry logic
// ❌ 你要自己儲存失敗嘅 jobs
}
});
// BullMQ
const worker = new Worker('emails', async (job) => {
await sendEmail(job.data);
}, {
connection,
attempts: 3, // ✅ 自動重試 3 次
backoff: {
type: 'exponential',
delay: 2000 // ✅ 指數退避
}
});
問題 3:冇優先級
場景:
// 你想優先處理 VIP 用戶嘅郵件
// ❌ Redis Pub/Sub 做唔到!所有 messages 一視同仁
// BullMQ
await queue.add('sendEmail', { to: 'vip@example.com' }, {
priority: 1 // ✅ 高優先級
});
await queue.add('sendEmail', { to: 'normal@example.com' }, {
priority: 10 // ✅ 低優先級
});
問題 4:冇延遲任務
場景:
// 你想 30 分鐘後發送提醒郵件
// ❌ Redis Pub/Sub 做唔到!
// BullMQ
await queue.add('sendReminder', { to: 'user@example.com' }, {
delay: 30 * 60 * 1000 // ✅ 30 分鐘後執行
});
問題 5:冇並發控制
場景:
// 你唔想同時發送超過 5 封郵件(避免被 rate limit)
// ❌ Redis Pub/Sub 冇呢個機制
// BullMQ
const worker = new Worker('emails', processEmail, {
connection,
concurrency: 5, // ✅ 最多同時處理 5 個 jobs
limiter: {
max: 100, // ✅ 每 5 秒最多 100 個 jobs
duration: 5000
}
});
⚠️ Redis Pub/Sub 嘅限制總結
❌ 冇持久化:message 發出後即失去❌ 冇重試:失敗咗就冇咗
❌ 冇優先級:所有 messages 平等
❌ 冇延遲:唔支援定時任務
❌ 冇並發控制:唔能限制處理速度
❌ 冇監控:唔知 jobs 嘅狀態
結論:Redis Pub/Sub 適合即時通訊,唔適合可靠嘅背景任務處理。
Redis Streams:介乎 Pub/Sub 同 BullMQ 之間嘅選擇
Redis Streams 係咩?
Redis Streams 係 Redis 5.0(2018 年)引入嘅數據結構,專門用於處理持久化嘅消息流。
佢嘅設計靈感來自 Apache Kafka,但更輕量同簡單。
關鍵特性:
- ✅ 持久化:messages 會儲存喺 Redis,唔會遺失
- ✅ Consumer Groups:多個 consumer 可以分工處理 messages
- ✅ At-least-once delivery:保證 message 至少送達一次
- ✅ Message history:可以讀取歷史 messages
- ❌ 冇自動重試:需要自己實現
- ❌ 冇優先級:FIFO 順序
- ❌ 冇延遲任務:唔支援定時執行
💡 Redis Streams vs Pub/Sub vs BullMQ
想像一個訊息傳遞系統:
Redis Pub/Sub = 廣播電台(即時,但冇人聽就冇咗)
Redis Streams = WhatsApp 群組(有歷史記錄,可以追返)
BullMQ = 任務管理系統(有追蹤、重試、優先級)
Redis Streams 基本操作
- 生產消息(XADD)
import Redis from 'ioredis';
const redis = new Redis();
// 添加 message 到 stream
const messageId = await redis.xadd(
'orders', // Stream name
'*', // Auto-generate ID (timestamp-based)
'orderId', '123', // Field-value pairs
'amount', '99.99',
'status', 'pending'
);
console.log('Message ID:', messageId);
// Output: 1234567890123-0 (timestamp-sequence)
- 消費消息(XREAD)
// 讀取新 messages(blocking mode)
const messages = await redis.xread(
'BLOCK', 5000, // Block for 5 seconds
'STREAMS', 'orders',
'$' // Only new messages
);
if (messages) {
for (const [stream, streamMessages] of messages) {
for (const [id, fields] of streamMessages) {
console.log('Message ID:', id);
console.log('Fields:', fields);
// ['orderId', '123', 'amount', '99.99', 'status', 'pending']
}
}
}
- Consumer Groups(進階)
// 創建 consumer group
await redis.xgroup(
'CREATE', 'orders', // Stream name
'order-processors', // Group name
'0', // Start from beginning
'MKSTREAM' // Create stream if not exists
);
// Consumer 讀取 messages
const messages = await redis.xreadgroup(
'GROUP', 'order-processors', // Group name
'worker-1', // Consumer name
'COUNT', 10, // Max 10 messages
'BLOCK', 5000, // Block for 5 seconds
'STREAMS', 'orders',
'>' // Only undelivered messages
);
if (messages) {
for (const [stream, streamMessages] of messages) {
for (const [id, fields] of streamMessages) {
try {
// 處理 message
await processOrder(fields);
// ✅ 確認處理完成(ACK)
await redis.xack('orders', 'order-processors', id);
} catch (error) {
console.error('Failed to process:', id, error);
// ❌ 唔 ACK,message 會留喺 pending 狀態
}
}
}
}
Redis Streams 完整範例
Producer(生產者):
// producer.ts
import Redis from 'ioredis';
const redis = new Redis();
async function publishOrder(order: any) {
const id = await redis.xadd(
'orders',
'*',
'orderId', order.id,
'userId', order.userId,
'amount', order.amount.toString(),
'timestamp', Date.now().toString()
);
console.log(`✅ Order published: ${id}`);
return id;
}
// 發布訂單
publishOrder({
id: '123',
userId: 'user-456',
amount: 99.99
});
Consumer(消費者):
// consumer.ts
import Redis from 'ioredis';
const redis = new Redis();
const STREAM = 'orders';
const GROUP = 'order-processors';
const CONSUMER = `worker-${process.pid}`;
async function startConsumer() {
// 創建 consumer group(如果未存在)
try {
await redis.xgroup('CREATE', STREAM, GROUP, '0', 'MKSTREAM');
console.log(`✅ Consumer group created: ${GROUP}`);
} catch (error: any) {
if (error.message.includes('BUSYGROUP')) {
console.log(`ℹ️ Consumer group already exists: ${GROUP}`);
} else {
throw error;
}
}
console.log(`👷 Consumer ${CONSUMER} started...`);
// 持續消費 messages
while (true) {
try {
const messages = await redis.xreadgroup(
'GROUP', GROUP,
CONSUMER,
'COUNT', 10,
'BLOCK', 5000,
'STREAMS', STREAM,
'>'
);
if (messages) {
for (const [stream, streamMessages] of messages) {
for (const [id, fields] of streamMessages) {
// 將 array 轉成 object
const data = arrayToObject(fields);
try {
console.log(`📦 Processing order ${data.orderId}...`);
await processOrder(data);
// ✅ ACK
await redis.xack(STREAM, GROUP, id);
console.log(`✅ Order ${data.orderId} completed`);
} catch (error) {
console.error(`❌ Failed to process ${id}:`, error);
// 唔 ACK,message 留喺 pending
}
}
}
}
} catch (error) {
console.error('Consumer error:', error);
await new Promise(resolve => setTimeout(resolve, 1000));
}
}
}
function arrayToObject(arr: string[]): Record<string, string> {
const obj: Record<string, string> = {};
for (let i = 0; i < arr.length; i += 2) {
obj[arr[i]] = arr[i + 1];
}
return obj;
}
async function processOrder(data: any) {
// 模擬處理訂單
await new Promise(resolve => setTimeout(resolve, 1000));
console.log(`💰 Order ${data.orderId}: $${data.amount}`);
}
startConsumer();
Redis Streams 處理 Pending Messages
// 檢查 pending messages(未 ACK 嘅)
async function processPendingMessages() {
// 獲取 pending messages
const pending = await redis.xpending(
STREAM,
GROUP,
'-', '+', // ID range (all)
10, // Max count
CONSUMER // 只睇自己嘅
);
if (pending && pending.length > 0) {
console.log(`⚠️ Found ${pending.length} pending messages`);
for (const [id, consumer, idleTime, deliveryCount] of pending) {
// 如果 pending 超過 5 分鐘,重新處理
if (idleTime > 300000) {
console.log(`🔄 Reprocessing message ${id}...`);
// Claim message(搶返嚟)
const messages = await redis.xclaim(
STREAM,
GROUP,
CONSUMER,
300000, // Min idle time
id
);
// 重新處理
// ...
}
}
}
}
// 定期檢查 pending messages
setInterval(processPendingMessages, 60000); // 每分鐘
Redis Streams 嘅限制
雖然 Redis Streams 比 Pub/Sub 強大,但仍然有以下限制:
- 冇自動重試
// ❌ Redis Streams 冇自動重試機制
// 你要自己實現:
const MAX_RETRIES = 3;
for (const [id, fields] of messages) {
const data = arrayToObject(fields);
let retries = 0;
while (retries < MAX_RETRIES) {
try {
await processOrder(data);
await redis.xack(STREAM, GROUP, id);
break; // Success
} catch (error) {
retries++;
if (retries >= MAX_RETRIES) {
// 移到 dead letter queue
await redis.xadd('orders:failed', '*', ...fields);
await redis.xack(STREAM, GROUP, id);
} else {
await new Promise(r => setTimeout(r, 1000 * retries));
}
}
}
}
- 冇優先級
// ❌ Redis Streams 係 FIFO,冇優先級
// 如果需要優先級,要用多個 streams:
await redis.xadd('orders:high', '*', ...);
await redis.xadd('orders:normal', '*', ...);
// Consumer 優先讀 high
const messages = await redis.xreadgroup(
'GROUP', GROUP, CONSUMER,
'STREAMS', 'orders:high', 'orders:normal',
'>', '>'
);
- 冇延遲任務
// ❌ Redis Streams 冇延遲執行功能
// 如果需要,要自己用 timestamp 過濾:
const data = {
orderId: '123',
executeAt: Date.now() + 30 * 60 * 1000 // 30 分鐘後
};
await redis.xadd('orders', '*', 'data', JSON.stringify(data));
// Consumer 檢查 timestamp
if (data.executeAt > Date.now()) {
// 太早,唔處理
continue;
}
何時用 Redis Streams?
🌊 用 Redis Streams 當:
✅ Event SourcingUser events: login, logout, purchase, etc. → Store in stream for audit trail → Multiple services consume (analytics, notifications, etc.)✅ Activity Feeds / 時間軸
Social media posts, user activities → Store in stream per user → Fan-out to followers✅ 簡單 Job Queue(唔需要重試/優先級)
Simple background tasks → Lighter than BullMQ → More reliable than Pub/Sub✅ Log Aggregation
Application logs from multiple services → Central stream for processing → Consumer groups for different purposes (storage, alerting, etc.)唔該用 Redis Streams 當:
❌ 需要自動重試 + 指數退避 → 用 BullMQ
❌ 需要優先級 queue → 用 BullMQ
❌ 需要延遲任務 / Cron jobs → 用 BullMQ
❌ 即時通訊(唔需要歷史) → 用 Redis Pub/Sub
❌ 企業級跨語言 messaging → 用 RabbitMQ
BullMQ 嘅核心功能同優勢
1. 持久化 + 可靠性
// Job 會儲存喺 Redis,唔會遺失
await queue.add('processVideo', {
videoId: '123',
url: 'https://example.com/video.mp4'
});
// ✅ 即使 worker crash,job 都唔會遺失
// ✅ Worker 重啟後會繼續處理
// ✅ 所有 job 狀態都有記錄(waiting, active, completed, failed)
2. 自動重試 + 指數退避
const worker = new Worker('processVideo', async (job) => {
// 如果呢度 throw error...
throw new Error('Processing failed');
}, {
connection,
attempts: 5, // 最多重試 5 次
backoff: {
type: 'exponential',
delay: 1000 // 1s, 2s, 4s, 8s, 16s
}
});
// ✅ BullMQ 會自動重試
// ✅ 每次重試間隔會遞增(指數退避)
// ✅ 重試次數用完後會移到 failed set
3. 優先級 Queue
// VIP 用戶嘅任務
await queue.add('processVideo', { userId: 'vip-123' }, {
priority: 1 // 最高優先級
});
// 免費用戶嘅任務
await queue.add('processVideo', { userId: 'free-456' }, {
priority: 10 // 低優先級
});
// ✅ VIP 任務會優先處理
// ✅ 用 Redis Sorted Set 實現 O(log N) 性能
4. 延遲任務 + 定時任務
// 延遲任務:30 分鐘後發送
await queue.add('sendReminder', { userId: '123' }, {
delay: 30 * 60 * 1000 // 30 分鐘
});
// 定時任務:每日凌晨 2 點執行
await queue.add('dailyReport', {}, {
repeat: {
pattern: '0 2 * * *', // Cron 表達式
tz: 'Asia/Hong_Kong'
}
});
// ✅ 用 Redis Sorted Set 按 timestamp 排序
// ✅ 唔需要額外嘅 cron 服務
5. 並發控制 + Rate Limiting
const worker = new Worker('sendEmail', processEmail, {
connection,
// 同時最多處理 10 個 jobs
concurrency: 10,
// Rate limiting:每 5 秒最多 100 個 jobs
limiter: {
max: 100,
duration: 5000
}
});
// ✅ 避免過載
// ✅ 符合 API rate limits
// ✅ 控制資源使用
6. 事件監聽 + 監控
// Queue events
queue.on('completed', (job) => {
console.log(`Job ${job.id} completed`);
});
queue.on('failed', (job, error) => {
console.error(`Job ${job.id} failed:`, error);
});
// Worker events
worker.on('progress', (job, progress) => {
console.log(`Job ${job.id} progress: ${progress}%`);
});
// ✅ 完整嘅事件系統
// ✅ 方便監控同 logging
// ✅ 可以接入 APM 工具
7. Parent-Child Dependencies
// 父任務
const parentJob = await queue.add('processProject', { projectId: '123' });
// 子任務(依賴父任務)
await queue.add('processFile', { fileId: 'a' }, {
parent: {
id: parentJob.id,
queue: queue.name
}
});
await queue.add('processFile', { fileId: 'b' }, {
parent: {
id: parentJob.id,
queue: queue.name
}
});
// ✅ 所有子任務完成後,父任務先會標記為完成
// ✅ 適合複雜嘅工作流
完整對比:Redis Pub/Sub vs Streams vs BullMQ vs RabbitMQ
功能對比表
| 特性 | Redis Pub/Sub | Redis Streams | BullMQ | RabbitMQ |
|---|---|---|---|---|
| 持久化 | ❌ 冇 | ✅ 有 | ✅ 有(Redis) | ✅ 有(Disk) |
| Message History | ❌ 冇 | ✅ 有 | ⚠️ 有限 | ⚠️ 有限 |
| Consumer Groups | ❌ 冇 | ✅ 有 | ❌ 冇 | ✅ 有 |
| 自動重試 | ❌ 冇 | ❌ 冇 | ✅ 有 | ✅ 有 |
| 優先級 | ❌ 冇 | ❌ 冇 | ✅ 有 | ✅ 有 |
| 延遲任務 | ❌ 冇 | ❌ 冇 | ✅ 有 | ✅ 有(plugin) |
| Rate Limiting | ❌ 冇 | ❌ 冇 | ✅ 有 | ⚠️ 部分支援 |
| 廣播(Pub/Sub) | ✅ 有 | ⚠️ 可模擬 | ❌ 冇 | ✅ 有 |
| 複雜路由 | ❌ 冇 | ❌ 冇 | ❌ 冇 | ✅ 有 |
| 跨語言 | ✅ 有 | ✅ 有 | ❌ Node.js only | ✅ 有 |
| 依賴 | Redis | Redis | Redis | 獨立服務 |
| 學習曲線 | ⭐ 最簡單 | ⭐⭐ 簡單 | ⭐⭐ 簡單 | ⭐⭐⭐⭐ 複雜 |
| 效能 (msg/sec) | ~50k | ~20k | ~10k | ~5k |
| 最適合場景 | 即時通訊 | Event Sourcing | 背景任務 | 企業級 MQ |
選擇流程圖
開始
↓
需要即時廣播?(訊息可以遺失)
├─ 係 → Redis Pub/Sub
└─ 唔係
↓
需要 message history / event sourcing?
├─ 係 → Redis Streams
└─ 唔係
↓
需要可靠嘅 job queue?(重試、優先級、延遲)
├─ 係 → BullMQ
└─ 唔係
↓
需要跨語言 / 複雜路由?
├─ 係 → RabbitMQ
└─ 唔係 → BullMQ(最平衡嘅選擇)
實際場景推薦
| 場景 | 推薦方案 | 理由 |
|---|---|---|
| 聊天室、即時通知 | Redis Pub/Sub | 低延遲、即時廣播 |
| 用戶活動記錄 | Redis Streams | 需要歷史、多個 consumer |
| 郵件發送 | BullMQ | 需要重試、rate limiting |
| 圖片/影片處理 | BullMQ | 需要持久化、並發控制 |
| 定時報表 | BullMQ | 需要 cron / 延遲任務 |
| Audit Log | Redis Streams | 需要完整歷史記錄 |
| 微服務通訊(跨語言) | RabbitMQ | 需要複雜路由、高可靠性 |
RabbitMQ 深入解析
RabbitMQ 係咩?
RabbitMQ 係一個獨立嘅 message broker,用 Erlang 編寫,實現咗 AMQP (Advanced Message Queuing Protocol) 標準。
關鍵特性:
- ✅ 獨立服務:唔依賴其他數據庫(自己管理數據)
- ✅ 跨語言:支援 Python、Java、Go、Node.js、C# 等
- ✅ 複雜路由:Exchange + Routing Key 實現靈活嘅消息路由
- ✅ 高可靠性:Transaction、Confirm、Ack 機制
RabbitMQ 核心概念
┌──────────┐ ┌─────────────────────────┐ ┌──────────┐
│Producer │────>│ RabbitMQ (Broker) │────>│Consumer │
└──────────┘ │ ┌─────────┐ ┌───────┐ │ └──────────┘
│ │Exchange │─>│ Queue │ │
│ └─────────┘ └───────┘ │
│ ↓ (Binding) │
│ Routing Key │
└─────────────────────────┘
組件說明:
- Producer:發送消息嘅應用
- Exchange:接收 producer 嘅消息,按路由規則分發到 queue
- Queue:儲存消息,等 consumer 處理
- Consumer:從 queue 接收同處理消息
- Binding:定義 exchange 同 queue 之間嘅關係
- Routing Key:消息嘅路由標籤
Exchange 類型
1. Direct Exchange(精確匹配)
# Routing Key 要完全一致先會路由
Exchange: "logs"
Routing Key: "error" → Queue: error_queue
Routing Key: "info" → Queue: info_queue
Routing Key: "warn" → Queue: warn_queue
2. Topic Exchange(模式匹配)
# 用 * 同 # 做通配符
Exchange: "events"
"user.created" → Queue: user_events
"user.deleted" → Queue: user_events
"order.*.shipped" → Queue: shipping_events
"#.error" → Queue: error_queue # 所有 error
3. Fanout Exchange(廣播)
# 忽略 Routing Key,廣播到所有綁定嘅 queue
Exchange: "notifications"
→ Queue: email_queue
→ Queue: sms_queue
→ Queue: push_queue
4. Headers Exchange(按 header 匹配)
# 根據 message header 路由(較少用)
RabbitMQ 實戰代碼
安裝
# 安裝 RabbitMQ
docker run -d --name rabbitmq \
-p 5672:5672 \
-p 15672:15672 \
rabbitmq:3-management
# 安裝 Node.js client
npm install amqplib
範例 1:簡單 Work Queue
Producer(發送任務):
// producer.ts
import amqp from 'amqplib';
async function sendTask(taskData: any) {
// 連接 RabbitMQ
const connection = await amqp.connect('amqp://localhost');
const channel = await connection.createChannel();
// 聲明 queue(如果唔存在就創建)
const queue = 'tasks';
await channel.assertQueue(queue, {
durable: true // 持久化 queue
});
// 發送消息
channel.sendToQueue(queue, Buffer.from(JSON.stringify(taskData)), {
persistent: true // 持久化消息
});
console.log('✅ Task sent:', taskData);
// 關閉連接
setTimeout(() => {
connection.close();
}, 500);
}
// 發送任務
sendTask({ type: 'sendEmail', to: 'user@example.com' });
Consumer(處理任務):
// consumer.ts
import amqp from 'amqplib';
async function startWorker() {
const connection = await amqp.connect('amqp://localhost');
const channel = await connection.createChannel();
const queue = 'tasks';
await channel.assertQueue(queue, { durable: true });
// 設定並發控制:一次只處理 1 個消息
channel.prefetch(1);
console.log('👷 Worker waiting for tasks...');
// 消費消息
channel.consume(queue, async (msg) => {
if (msg) {
const task = JSON.parse(msg.content.toString());
console.log('📧 Processing:', task);
try {
// 處理任務
await processTask(task);
// ✅ 確認處理完成
channel.ack(msg);
console.log('✅ Task completed');
} catch (error) {
console.error('❌ Task failed:', error);
// ❌ 拒絕消息並重新排隊
channel.nack(msg, false, true);
}
}
});
}
async function processTask(task: any) {
// 模擬處理任務
await new Promise(resolve => setTimeout(resolve, 1000));
if (task.type === 'sendEmail') {
console.log(`Sending email to ${task.to}`);
}
}
startWorker();
範例 2:Pub/Sub(用 Fanout Exchange)
Publisher(發布事件):
// publisher.ts
import amqp from 'amqplib';
async function publishEvent(event: string, data: any) {
const connection = await amqp.connect('amqp://localhost');
const channel = await connection.createChannel();
// 聲明 fanout exchange
const exchange = 'events';
await channel.assertExchange(exchange, 'fanout', { durable: true });
// 發布消息(fanout 會忽略 routing key)
channel.publish(exchange, '', Buffer.from(JSON.stringify({
event,
data,
timestamp: new Date()
})));
console.log(`📢 Event published: ${event}`);
setTimeout(() => connection.close(), 500);
}
// 發布用戶創建事件
publishEvent('user.created', { userId: '123', email: 'user@example.com' });
Subscriber(訂閱事件):
// subscriber-email.ts
import amqp from 'amqplib';
async function subscribeEvents(serviceName: string) {
const connection = await amqp.connect('amqp://localhost');
const channel = await connection.createChannel();
const exchange = 'events';
await channel.assertExchange(exchange, 'fanout', { durable: true });
// 創建專屬 queue(每個服務有自己嘅 queue)
const { queue } = await channel.assertQueue(`${serviceName}_queue`, {
exclusive: false,
durable: true
});
// 綁定 exchange 到 queue
await channel.bindQueue(queue, exchange, '');
console.log(`👂 ${serviceName} listening for events...`);
channel.consume(queue, (msg) => {
if (msg) {
const event = JSON.parse(msg.content.toString());
console.log(`${serviceName} received:`, event);
// 處理事件
if (event.event === 'user.created') {
console.log(`📧 Sending welcome email to ${event.data.email}`);
}
channel.ack(msg);
}
});
}
subscribeEvents('email-service');
// subscriber-analytics.ts
import amqp from 'amqplib';
// 另一個服務都可以收到同一個事件
subscribeEvents('analytics-service');
範例 3:Topic Exchange(按主題路由)
Publisher:
// log-publisher.ts
import amqp from 'amqplib';
async function publishLog(level: string, service: string, message: string) {
const connection = await amqp.connect('amqp://localhost');
const channel = await connection.createChannel();
// 聲明 topic exchange
const exchange = 'logs';
await channel.assertExchange(exchange, 'topic', { durable: true });
// Routing key 格式:<service>.<level>
const routingKey = `${service}.${level}`;
channel.publish(exchange, routingKey, Buffer.from(JSON.stringify({
service,
level,
message,
timestamp: new Date()
})));
console.log(`📝 Log sent [${routingKey}]: ${message}`);
setTimeout(() => connection.close(), 500);
}
// 發送不同服務、不同級別嘅日誌
publishLog('error', 'api', 'Database connection failed');
publishLog('info', 'api', 'Request received');
publishLog('error', 'worker', 'Job processing failed');
Subscriber(只接收 error 級別):
// error-monitor.ts
import amqp from 'amqplib';
async function monitorErrors() {
const connection = await amqp.connect('amqp://localhost');
const channel = await connection.createChannel();
const exchange = 'logs';
await channel.assertExchange(exchange, 'topic', { durable: true });
const { queue } = await channel.assertQueue('error_monitor', {
durable: true
});
// 綁定 pattern:*.error(所有服務嘅 error)
await channel.bindQueue(queue, exchange, '*.error');
console.log('🚨 Monitoring all errors...');
channel.consume(queue, (msg) => {
if (msg) {
const log = JSON.parse(msg.content.toString());
console.log(`🚨 ERROR from ${log.service}:`, log.message);
// 觸發告警
sendAlert(log);
channel.ack(msg);
}
});
}
function sendAlert(log: any) {
console.log('📧 Sending alert to ops team...');
}
monitorErrors();
Subscriber(接收特定服務嘅所有日誌):
// api-logs.ts
import amqp from 'amqplib';
async function monitorApiLogs() {
const connection = await amqp.connect('amqp://localhost');
const channel = await connection.createChannel();
const exchange = 'logs';
await channel.assertExchange(exchange, 'topic', { durable: true });
const { queue } = await channel.assertQueue('api_logs', {
durable: true
});
// 綁定 pattern:api.*(api 服務嘅所有日誌)
await channel.bindQueue(queue, exchange, 'api.*');
console.log('📊 Monitoring API logs...');
channel.consume(queue, (msg) => {
if (msg) {
const log = JSON.parse(msg.content.toString());
console.log(`[API ${log.level}]:`, log.message);
channel.ack(msg);
}
});
}
monitorApiLogs();
RabbitMQ 進階功能
- Dead Letter Exchange(DLX)
// 設置 DLX:處理失敗嘅消息
async function setupQueueWithDLX() {
const connection = await amqp.connect('amqp://localhost');
const channel = await connection.createChannel();
// 聲明 DLX
await channel.assertExchange('dlx', 'direct', { durable: true });
// 聲明 DLQ(Dead Letter Queue)
await channel.assertQueue('failed_jobs', { durable: true });
await channel.bindQueue('failed_jobs', 'dlx', 'failed');
// 聲明主 queue,指定 DLX
await channel.assertQueue('jobs', {
durable: true,
arguments: {
'x-dead-letter-exchange': 'dlx',
'x-dead-letter-routing-key': 'failed',
'x-message-ttl': 60000 // 60 秒後過期 → DLQ
}
});
}
- Message TTL(消息過期)
// 發送帶 TTL 嘅消息
channel.sendToQueue('tasks', Buffer.from(JSON.stringify(task)), {
expiration: '30000' // 30 秒後過期
});
- Priority Queue
// 聲明優先級 queue
await channel.assertQueue('priority_tasks', {
durable: true,
arguments: {
'x-max-priority': 10 // 最大優先級
}
});
// 發送高優先級消息
channel.sendToQueue('priority_tasks', Buffer.from(JSON.stringify(task)), {
priority: 8
});
RabbitMQ vs BullMQ:詳細對比
| 特性 | BullMQ | RabbitMQ |
|---|---|---|
| 部署 | 需要 Redis | 獨立服務(Erlang) |
| 語言支援 | Node.js only | 跨語言(Python, Java, Go, etc.) |
| 學習曲線 | ⭐⭐ 簡單 | ⭐⭐⭐⭐ 較複雜 |
| 運維成本 | ⭐⭐ 低(共用 Redis) | ⭐⭐⭐⭐ 高(額外服務) |
| 路由功能 | ❌ 冇(單一 queue) | ✅ 強大(Exchange + Routing) |
| Pub/Sub | ❌ 冇 | ✅ 有(Fanout Exchange) |
| Topic Pattern | ❌ 冇 | ✅ 有(Topic Exchange) |
| 跨服務通訊 | ❌ 唔適合 | ✅ 專為此設計 |
| Transaction | ❌ 冇 | ✅ 有 |
| Memory 使用 | ⭐⭐ 低 | ⭐⭐⭐⭐ 較高 |
| 效能 (msg/sec) | ~10k | ~5k |
| 監控 UI | 第三方(Bull Board) | ✅ 內建(Management Plugin) |
何時該用 RabbitMQ?
🐰 用 RabbitMQ 當:
✅ 跨語言通訊Python Service → RabbitMQ → Node.js Service Java Service → RabbitMQ → Go Service✅ 複雜路由需求
# Topic routing order.created.us → US fulfillment service order.created.eu → EU fulfillment service order.*.failed → Error monitoring service✅ Event-driven 架構
User Service (publish: user.created) ↓ ├─> Email Service (send welcome email) ├─> Analytics Service (track signup) ├─> CRM Service (create contact) └─> Notification Service (push notification)✅ 企業級要求
AMQP 標準
High Availability(cluster)
Federation(跨數據中心)
審計同合規
唔該用 RabbitMQ 當:
❌ 簡單背景任務(email、image processing)→ 用 BullMQ
❌ 已經有 Redis,只係 Node.js → 用 BullMQ
❌ 小團隊,運維資源有限 → 用 BullMQ
❌ 唔需要跨語言 → 用 BullMQ
實戰代碼:用 BullMQ 處理背景任務
安裝
npm install bullmq ioredis
完整範例:郵件發送系統
1. 創建 Queue
// queue.ts
import { Queue } from 'bullmq';
import Redis from 'ioredis';
const connection = new Redis({
host: 'localhost',
port: 6379,
maxRetriesPerRequest: null // BullMQ 需要呢個設定
});
export const emailQueue = new Queue('emails', { connection });
// 添加 job
export async function sendEmail(to: string, subject: string, body: string) {
await emailQueue.add(
'sendEmail', // Job name
{ to, subject, body }, // Job data
{
// 選項
attempts: 3, // 最多重試 3 次
backoff: {
type: 'exponential',
delay: 2000 // 2s, 4s, 8s
},
removeOnComplete: 1000, // 保留最近 1000 個已完成嘅 jobs
removeOnFail: 5000 // 保留最近 5000 個失敗嘅 jobs
}
);
}
2. 創建 Worker
// worker.ts
import { Worker } from 'bullmq';
import { connection } from './queue';
import nodemailer from 'nodemailer';
const transporter = nodemailer.createTransport({
host: 'smtp.gmail.com',
port: 587,
auth: {
user: process.env.EMAIL_USER,
pass: process.env.EMAIL_PASS
}
});
const worker = new Worker(
'emails',
async (job) => {
const { to, subject, body } = job.data;
// 更新進度
await job.updateProgress(10);
// 發送郵件
await transporter.sendMail({
from: 'noreply@example.com',
to,
subject,
html: body
});
await job.updateProgress(100);
// 返回結果
return { sent: true, timestamp: new Date() };
},
{
connection,
concurrency: 5, // 同時最多處理 5 封郵件
limiter: {
max: 100, // 每 5 秒最多 100 封
duration: 5000
}
}
);
// 事件監聽
worker.on('completed', (job) => {
console.log(`✅ Email sent to ${job.data.to}`);
});
worker.on('failed', (job, error) => {
console.error(`❌ Failed to send email to ${job?.data.to}:`, error);
});
worker.on('progress', (job, progress) => {
console.log(`📧 Sending email to ${job.data.to}: ${progress}%`);
});
3. 使用 Queue
// api.ts
import express from 'express';
import { sendEmail } from './queue';
const app = express();
app.use(express.json());
app.post('/send-email', async (req, res) => {
const { to, subject, body } = req.body;
try {
await sendEmail(to, subject, body);
res.json({ message: 'Email queued successfully' });
} catch (error) {
res.status(500).json({ error: 'Failed to queue email' });
}
});
app.listen(3000, () => {
console.log('API server running on port 3000');
});
進階功能:優先級 + 延遲任務
// VIP 用戶郵件(高優先級)
await emailQueue.add(
'sendEmail',
{ to: 'vip@example.com', subject: 'VIP Notice' },
{ priority: 1 }
);
// 普通用戶郵件(低優先級)
await emailQueue.add(
'sendEmail',
{ to: 'user@example.com', subject: 'Regular Notice' },
{ priority: 10 }
);
// 延遲郵件(1 小時後發送)
await emailQueue.add(
'sendEmail',
{ to: 'user@example.com', subject: 'Reminder' },
{ delay: 60 * 60 * 1000 }
);
// 定時郵件(每日早上 9 點)
await emailQueue.add(
'dailyNewsletter',
{ subject: 'Daily Newsletter' },
{
repeat: {
pattern: '0 9 * * *',
tz: 'Asia/Hong_Kong'
}
}
);
效能優化同最佳實踐
1. Redis 連接管理
// ❌ 錯誤:每個 queue/worker 都創建新連接
const queue1 = new Queue('emails', {
connection: new Redis() // 新連接
});
const queue2 = new Queue('images', {
connection: new Redis() // 又一個新連接
});
// ✅ 正確:共享連接
const connection = new Redis({
maxRetriesPerRequest: null,
enableReadyCheck: false,
lazyConnect: true
});
const queue1 = new Queue('emails', { connection });
const queue2 = new Queue('images', { connection: connection.duplicate() });
2. Job 數據優化
// ❌ 錯誤:將大量數據直接放入 job
await queue.add('processImage', {
imageBuffer: fs.readFileSync('large-image.jpg') // 幾 MB 嘅數據!
});
// ✅ 正確:只傳 reference
await queue.add('processImage', {
imageUrl: 's3://bucket/large-image.jpg' // 只係一個 URL
});
// Worker 自己去 fetch
const worker = new Worker('processImage', async (job) => {
const imageBuffer = await fetchFromS3(job.data.imageUrl);
// ... 處理圖片
});
3. 並發調優
// 根據任務類型調整並發
// CPU 密集型(如影片編碼)
const videoWorker = new Worker('processVideo', processVideo, {
connection,
concurrency: 2 // 低並發,避免 CPU 過載
});
// I/O 密集型(如發送郵件)
const emailWorker = new Worker('sendEmail', sendEmail, {
connection,
concurrency: 20 // 高並發,因為大部分時間都係等 I/O
});
4. 監控同日誌
import { QueueEvents } from 'bullmq';
const queueEvents = new QueueEvents('emails', { connection });
// 監控 queue 狀態
setInterval(async () => {
const counts = await emailQueue.getJobCounts();
console.log('Queue stats:', counts);
// { waiting: 10, active: 5, completed: 100, failed: 2 }
}, 10000);
// 接入 APM (如 DataDog)
queueEvents.on('completed', ({ jobId, returnvalue }) => {
statsd.increment('jobs.completed');
statsd.timing('jobs.duration', returnvalue.duration);
});
💡 最佳實踐總結
連接管理:共享 Redis 連接,避免連接過多
Job 數據:傳 reference 而唔係大數據
並發調優:根據任務類型調整 concurrency
錯誤處理:合理設定 attempts 同 backoff
監控日誌:接入 APM 工具追蹤效能
清理舊 jobs:設定 removeOnComplete/removeOnFail
Rate Limiting:避免過載外部 API
總結
核心要點
- BullMQ ≠ Redis:
- BullMQ 係基於 Redis 構建 嘅 library
- 佢用 Redis 嘅數據結構(List, Sorted Set)嚟實現 queue
- 你需要同時有 Redis 同 BullMQ
- Redis Pub/Sub 唔適合 Job Queue:
- 冇持久化:message 發出即失去
- 冇重試機制
- 冇優先級、延遲任務、rate limiting
- 適合即時通訊,唔適合背景任務
- BullMQ 嘅優勢:
- 可靠:持久化 + 自動重試
- 強大:優先級 + 延遲任務 + 並發控制
- 簡單:唔需要額外嘅 message broker
- 高效:10,000+ jobs/sec
何時用邊個?
| 場景 | 推薦方案 | 理由 |
|---|---|---|
| 即時聊天、通知 | Redis Pub/Sub | 低延遲、即時廣播 |
| 郵件發送 | BullMQ | 需要重試、rate limiting |
| 圖片/影片處理 | BullMQ | 需要持久化、並發控制 |
| 報表生成 | BullMQ | 需要延遲任務、優先級 |
| 微服務通訊 | RabbitMQ | 需要複雜路由、高可靠性 |
相關資源
📄 官方文檔
- BullMQ 文檔: docs.bullmq.io
- BullMQ GitHub: github.com/taskforcesh/bullmq
- Redis 文檔: redis.io/documentation
💻 相關工具
- Bull Board: github.com/felixmosh/bull-board - BullMQ 嘅 Web UI
- Taskforce.sh: taskforce.sh - BullMQ 官方監控平台
📚 延伸閱讀
- BullMQ vs Agenda vs Bee-Queue: judoscale.com/blog/node-task-queues
- Building Scalable Job Queues: medium.com/@sanipatel0401
記住:BullMQ 唔係 Redis 嘅替代品,而係 Redis 嘅最佳拍檔! 🐂