Billy Tse
HomeRoadmapBlogContact
Playground
Buy me a bug

© 2026 Billy Tse

OnlyFansLinkedInGitHubEmail
Back to Blog
February 6, 2026•40 min read

BullMQ vs Redis Streams vs RabbitMQ:Node.js Message Queue 完全對比指南

深入對比 Redis Pub/Sub、Redis Streams、BullMQ 同 RabbitMQ 四大 Message Queue 方案,解析點解 Redis Pub/Sub 唔適合做 Job Queue,Redis Streams 點樣處理持久化消息流,BullMQ 點樣構建可靠嘅背景任務處理系統,RabbitMQ 幾時先係最佳選擇,附完整實戰代碼同場景推薦

NLPWeb Development

「我已經有 Redis 喇,點解重要用 BullMQ?」好多人第一次接觸 job queue 都會問呢個問題。今日我哋深入探討 BullMQ 係咩、佢同 Redis 嘅關係、點解 Redis Pub/Sub 唔適合做 Job Queue,同埋何時應該用 BullMQ 而唔係直接用 Redis。

TL;DR

核心重點:

  • 🎯 BullMQ ≠ Redis:BullMQ 係基於 Redis 構建 嘅 Job Queue library,唔係 Redis 嘅替代品
  • 🚀 Redis Pub/Sub 嘅限制:冇持久化、冇重試機制、冇優先級,唔適合做可靠嘅 Job Queue
  • ⚡ BullMQ 嘅優勢:用 Redis 提供持久化、自動重試、優先級、延遲任務、並發控制
  • 📊 效能數據:BullMQ 可以處理 10,000+ jobs/sec,記憶體佔用比純 Redis Pub/Sub 低 50%
  • 🛠️ 何時用邊個:背景任務處理 → BullMQ;即時訊息廣播 → Redis Pub/Sub

Table of Contents

  • 甚麼是 BullMQ?佢同 Redis 嘅關係
  • Redis Pub/Sub 點解唔適合做 Job Queue?
  • Redis Streams:介乎 Pub/Sub 同 BullMQ 之間嘅選擇
  • BullMQ 嘅核心功能同優勢
  • RabbitMQ 深入解析
  • 完整對比:Redis Pub/Sub vs Streams vs BullMQ vs RabbitMQ
  • 實戰代碼:用 BullMQ 處理背景任務
  • 效能優化同最佳實踐
  • 總結
  • 相關資源

甚麼是 BullMQ?佢同 Redis 嘅關係

BullMQ 係咩?

BullMQ 係一個基於 Redis 構建嘅 Job Queue library,專門用於 Node.js 背景任務處理。

// BullMQ 係一個 Node.js library import { Queue, Worker } from 'bullmq'; // 佢 **需要** Redis 先可以運作 const queue = new Queue('emails', { connection: { host: 'localhost', port: 6379 // Redis 連接 } });

關鍵事實:

  • ✅ BullMQ 係一個 library,唔係一個獨立嘅 message broker
  • ✅ BullMQ 依賴 Redis 嚟儲存 job 數據
  • ✅ BullMQ 用 Redis 嘅 List、Sorted Set、Hash 嚟實現 queue 功能
  • ❌ BullMQ 唔係 Redis 嘅替代品,而係 Redis 嘅 上層封裝

💡 簡單比喻
如果 Redis 係一個「倉庫」,咁:

  • Redis Pub/Sub 係倉庫入面嘅「廣播系統」(冇儲存,即時傳送)

  • BullMQ 係倉庫入面嘅「智能物流系統」(有儲存、有追蹤、有重試)

BullMQ 用 Redis 嘅數據結構(List、Sorted Set)嚟構建一個可靠嘅 job queue。

BullMQ 點樣用 Redis?

BullMQ 用 Redis 嘅以下數據結構:

# Redis 數據結構對應 Queue:waiting → Redis List # 等待執行嘅 jobs Queue:active → Redis List # 正在執行嘅 jobs Queue:completed → Redis Set # 已完成嘅 jobs Queue:failed → Redis Set # 失敗嘅 jobs Queue:delayed → Redis Sorted Set # 延遲執行嘅 jobs (by timestamp) Queue:priority → Redis Sorted Set # 優先級 jobs (by priority) Queue:jobs:123 → Redis Hash # 個別 job 嘅詳細數據

示意圖:

┌─────────────────────────────────────┐ │ BullMQ (Node.js) │ │ ┌───────────┐ ┌────────────┐ │ │ │ Queue │ │ Worker │ │ │ └─────┬─────┘ └──────┬─────┘ │ └────────┼───────────────────┼───────┘ │ Redis Commands │ ▼ ▼ ┌─────────────────────────────────────┐ │ Redis │ │ ┌──────┐ ┌──────┐ ┌──────────┐ │ │ │ List │ │ Set │ │ SortedSet│ │ │ └──────┘ └──────┘ └──────────┘ │ └─────────────────────────────────────┘

Redis Pub/Sub 點解唔適合做 Job Queue?

好多人第一次聽到 job queue 會諗:「我有 Redis Pub/Sub 喇,點解重要用 BullMQ?」

Redis Pub/Sub 嘅工作原理

// Publisher import Redis from 'ioredis'; const redis = new Redis(); await redis.publish('emails', JSON.stringify({ to: 'user@example.com', subject: 'Hello' })); // Subscriber const subscriber = new Redis(); subscriber.subscribe('emails'); subscriber.on('message', (channel, message) => { console.log('Received:', message); // 處理郵件... });

睇落幾好喎!咁有咩問題?

問題 1:冇持久化 (Fire-and-Forget)

場景:

// 1. 發送 job await redis.publish('emails', JSON.stringify({ to: 'user@example.com' })); // 2. 如果而家 **冇 subscriber** 連接緊... // ❌ 呢個 message **永久失去**! // 3. 就算有 subscriber,但如果 subscriber 正在處理另一個 job... // ❌ 呢個 message 都會 **遺失**!

Redis Pub/Sub 特性:

  • ✅ 即時廣播(低延遲)
  • ❌ 冇儲存:message 發出後,如果冇 subscriber,就永久失去
  • ❌ 冇 queue:subscriber 處理緊嘢嘅時候,新 message 會遺失

BullMQ 解決方法:

// BullMQ 會將 job **持久化** 到 Redis await queue.add('sendEmail', { to: 'user@example.com' }); // ✅ 就算而家冇 worker,job 都會保存喺 Redis // ✅ Worker 啟動後會自動處理所有 pending jobs

問題 2:冇重試機制

場景:

// Redis Pub/Sub subscriber.on('message', async (channel, message) => { try { await sendEmail(JSON.parse(message)); } catch (error) { // ❌ 失敗咗點算?message 已經冇咗... // ❌ 你要自己寫 retry logic // ❌ 你要自己儲存失敗嘅 jobs } }); // BullMQ const worker = new Worker('emails', async (job) => { await sendEmail(job.data); }, { connection, attempts: 3, // ✅ 自動重試 3 次 backoff: { type: 'exponential', delay: 2000 // ✅ 指數退避 } });

問題 3:冇優先級

場景:

// 你想優先處理 VIP 用戶嘅郵件 // ❌ Redis Pub/Sub 做唔到!所有 messages 一視同仁 // BullMQ await queue.add('sendEmail', { to: 'vip@example.com' }, { priority: 1 // ✅ 高優先級 }); await queue.add('sendEmail', { to: 'normal@example.com' }, { priority: 10 // ✅ 低優先級 });

問題 4:冇延遲任務

場景:

// 你想 30 分鐘後發送提醒郵件 // ❌ Redis Pub/Sub 做唔到! // BullMQ await queue.add('sendReminder', { to: 'user@example.com' }, { delay: 30 * 60 * 1000 // ✅ 30 分鐘後執行 });

問題 5:冇並發控制

場景:

// 你唔想同時發送超過 5 封郵件(避免被 rate limit) // ❌ Redis Pub/Sub 冇呢個機制 // BullMQ const worker = new Worker('emails', processEmail, { connection, concurrency: 5, // ✅ 最多同時處理 5 個 jobs limiter: { max: 100, // ✅ 每 5 秒最多 100 個 jobs duration: 5000 } });

⚠️ Redis Pub/Sub 嘅限制總結
❌ 冇持久化:message 發出後即失去

❌ 冇重試:失敗咗就冇咗

❌ 冇優先級:所有 messages 平等

❌ 冇延遲:唔支援定時任務

❌ 冇並發控制:唔能限制處理速度

❌ 冇監控:唔知 jobs 嘅狀態

結論:Redis Pub/Sub 適合即時通訊,唔適合可靠嘅背景任務處理。

Redis Streams:介乎 Pub/Sub 同 BullMQ 之間嘅選擇

Redis Streams 係咩?

Redis Streams 係 Redis 5.0(2018 年)引入嘅數據結構,專門用於處理持久化嘅消息流。

佢嘅設計靈感來自 Apache Kafka,但更輕量同簡單。

關鍵特性:

  • ✅ 持久化:messages 會儲存喺 Redis,唔會遺失
  • ✅ Consumer Groups:多個 consumer 可以分工處理 messages
  • ✅ At-least-once delivery:保證 message 至少送達一次
  • ✅ Message history:可以讀取歷史 messages
  • ❌ 冇自動重試:需要自己實現
  • ❌ 冇優先級:FIFO 順序
  • ❌ 冇延遲任務:唔支援定時執行

💡 Redis Streams vs Pub/Sub vs BullMQ
想像一個訊息傳遞系統:

  • Redis Pub/Sub = 廣播電台(即時,但冇人聽就冇咗)

  • Redis Streams = WhatsApp 群組(有歷史記錄,可以追返)

  • BullMQ = 任務管理系統(有追蹤、重試、優先級)

Redis Streams 基本操作

  1. 生產消息(XADD)
import Redis from 'ioredis'; const redis = new Redis(); // 添加 message 到 stream const messageId = await redis.xadd( 'orders', // Stream name '*', // Auto-generate ID (timestamp-based) 'orderId', '123', // Field-value pairs 'amount', '99.99', 'status', 'pending' ); console.log('Message ID:', messageId); // Output: 1234567890123-0 (timestamp-sequence)
  1. 消費消息(XREAD)
// 讀取新 messages(blocking mode) const messages = await redis.xread( 'BLOCK', 5000, // Block for 5 seconds 'STREAMS', 'orders', '$' // Only new messages ); if (messages) { for (const [stream, streamMessages] of messages) { for (const [id, fields] of streamMessages) { console.log('Message ID:', id); console.log('Fields:', fields); // ['orderId', '123', 'amount', '99.99', 'status', 'pending'] } } }
  1. Consumer Groups(進階)
// 創建 consumer group await redis.xgroup( 'CREATE', 'orders', // Stream name 'order-processors', // Group name '0', // Start from beginning 'MKSTREAM' // Create stream if not exists ); // Consumer 讀取 messages const messages = await redis.xreadgroup( 'GROUP', 'order-processors', // Group name 'worker-1', // Consumer name 'COUNT', 10, // Max 10 messages 'BLOCK', 5000, // Block for 5 seconds 'STREAMS', 'orders', '>' // Only undelivered messages ); if (messages) { for (const [stream, streamMessages] of messages) { for (const [id, fields] of streamMessages) { try { // 處理 message await processOrder(fields); // ✅ 確認處理完成(ACK) await redis.xack('orders', 'order-processors', id); } catch (error) { console.error('Failed to process:', id, error); // ❌ 唔 ACK,message 會留喺 pending 狀態 } } } }

Redis Streams 完整範例

Producer(生產者):

// producer.ts import Redis from 'ioredis'; const redis = new Redis(); async function publishOrder(order: any) { const id = await redis.xadd( 'orders', '*', 'orderId', order.id, 'userId', order.userId, 'amount', order.amount.toString(), 'timestamp', Date.now().toString() ); console.log(`✅ Order published: ${id}`); return id; } // 發布訂單 publishOrder({ id: '123', userId: 'user-456', amount: 99.99 });

Consumer(消費者):

// consumer.ts import Redis from 'ioredis'; const redis = new Redis(); const STREAM = 'orders'; const GROUP = 'order-processors'; const CONSUMER = `worker-${process.pid}`; async function startConsumer() { // 創建 consumer group(如果未存在) try { await redis.xgroup('CREATE', STREAM, GROUP, '0', 'MKSTREAM'); console.log(`✅ Consumer group created: ${GROUP}`); } catch (error: any) { if (error.message.includes('BUSYGROUP')) { console.log(`ℹ️ Consumer group already exists: ${GROUP}`); } else { throw error; } } console.log(`👷 Consumer ${CONSUMER} started...`); // 持續消費 messages while (true) { try { const messages = await redis.xreadgroup( 'GROUP', GROUP, CONSUMER, 'COUNT', 10, 'BLOCK', 5000, 'STREAMS', STREAM, '>' ); if (messages) { for (const [stream, streamMessages] of messages) { for (const [id, fields] of streamMessages) { // 將 array 轉成 object const data = arrayToObject(fields); try { console.log(`📦 Processing order ${data.orderId}...`); await processOrder(data); // ✅ ACK await redis.xack(STREAM, GROUP, id); console.log(`✅ Order ${data.orderId} completed`); } catch (error) { console.error(`❌ Failed to process ${id}:`, error); // 唔 ACK,message 留喺 pending } } } } } catch (error) { console.error('Consumer error:', error); await new Promise(resolve => setTimeout(resolve, 1000)); } } } function arrayToObject(arr: string[]): Record<string, string> { const obj: Record<string, string> = {}; for (let i = 0; i < arr.length; i += 2) { obj[arr[i]] = arr[i + 1]; } return obj; } async function processOrder(data: any) { // 模擬處理訂單 await new Promise(resolve => setTimeout(resolve, 1000)); console.log(`💰 Order ${data.orderId}: $${data.amount}`); } startConsumer();

Redis Streams 處理 Pending Messages

// 檢查 pending messages(未 ACK 嘅) async function processPendingMessages() { // 獲取 pending messages const pending = await redis.xpending( STREAM, GROUP, '-', '+', // ID range (all) 10, // Max count CONSUMER // 只睇自己嘅 ); if (pending && pending.length > 0) { console.log(`⚠️ Found ${pending.length} pending messages`); for (const [id, consumer, idleTime, deliveryCount] of pending) { // 如果 pending 超過 5 分鐘,重新處理 if (idleTime > 300000) { console.log(`🔄 Reprocessing message ${id}...`); // Claim message(搶返嚟) const messages = await redis.xclaim( STREAM, GROUP, CONSUMER, 300000, // Min idle time id ); // 重新處理 // ... } } } } // 定期檢查 pending messages setInterval(processPendingMessages, 60000); // 每分鐘

Redis Streams 嘅限制

雖然 Redis Streams 比 Pub/Sub 強大,但仍然有以下限制:

  1. 冇自動重試
// ❌ Redis Streams 冇自動重試機制 // 你要自己實現: const MAX_RETRIES = 3; for (const [id, fields] of messages) { const data = arrayToObject(fields); let retries = 0; while (retries < MAX_RETRIES) { try { await processOrder(data); await redis.xack(STREAM, GROUP, id); break; // Success } catch (error) { retries++; if (retries >= MAX_RETRIES) { // 移到 dead letter queue await redis.xadd('orders:failed', '*', ...fields); await redis.xack(STREAM, GROUP, id); } else { await new Promise(r => setTimeout(r, 1000 * retries)); } } } }
  1. 冇優先級
// ❌ Redis Streams 係 FIFO,冇優先級 // 如果需要優先級,要用多個 streams: await redis.xadd('orders:high', '*', ...); await redis.xadd('orders:normal', '*', ...); // Consumer 優先讀 high const messages = await redis.xreadgroup( 'GROUP', GROUP, CONSUMER, 'STREAMS', 'orders:high', 'orders:normal', '>', '>' );
  1. 冇延遲任務
// ❌ Redis Streams 冇延遲執行功能 // 如果需要,要自己用 timestamp 過濾: const data = { orderId: '123', executeAt: Date.now() + 30 * 60 * 1000 // 30 分鐘後 }; await redis.xadd('orders', '*', 'data', JSON.stringify(data)); // Consumer 檢查 timestamp if (data.executeAt > Date.now()) { // 太早,唔處理 continue; }

何時用 Redis Streams?

🌊 用 Redis Streams 當:
✅ Event Sourcing

User events: login, logout, purchase, etc. → Store in stream for audit trail → Multiple services consume (analytics, notifications, etc.)

✅ Activity Feeds / 時間軸

Social media posts, user activities → Store in stream per user → Fan-out to followers

✅ 簡單 Job Queue(唔需要重試/優先級)

Simple background tasks → Lighter than BullMQ → More reliable than Pub/Sub

✅ Log Aggregation

Application logs from multiple services → Central stream for processing → Consumer groups for different purposes (storage, alerting, etc.)

唔該用 Redis Streams 當:

❌ 需要自動重試 + 指數退避 → 用 BullMQ

❌ 需要優先級 queue → 用 BullMQ

❌ 需要延遲任務 / Cron jobs → 用 BullMQ

❌ 即時通訊(唔需要歷史) → 用 Redis Pub/Sub

❌ 企業級跨語言 messaging → 用 RabbitMQ

BullMQ 嘅核心功能同優勢

1. 持久化 + 可靠性

// Job 會儲存喺 Redis,唔會遺失 await queue.add('processVideo', { videoId: '123', url: 'https://example.com/video.mp4' }); // ✅ 即使 worker crash,job 都唔會遺失 // ✅ Worker 重啟後會繼續處理 // ✅ 所有 job 狀態都有記錄(waiting, active, completed, failed)

2. 自動重試 + 指數退避

const worker = new Worker('processVideo', async (job) => { // 如果呢度 throw error... throw new Error('Processing failed'); }, { connection, attempts: 5, // 最多重試 5 次 backoff: { type: 'exponential', delay: 1000 // 1s, 2s, 4s, 8s, 16s } }); // ✅ BullMQ 會自動重試 // ✅ 每次重試間隔會遞增(指數退避) // ✅ 重試次數用完後會移到 failed set

3. 優先級 Queue

// VIP 用戶嘅任務 await queue.add('processVideo', { userId: 'vip-123' }, { priority: 1 // 最高優先級 }); // 免費用戶嘅任務 await queue.add('processVideo', { userId: 'free-456' }, { priority: 10 // 低優先級 }); // ✅ VIP 任務會優先處理 // ✅ 用 Redis Sorted Set 實現 O(log N) 性能

4. 延遲任務 + 定時任務

// 延遲任務:30 分鐘後發送 await queue.add('sendReminder', { userId: '123' }, { delay: 30 * 60 * 1000 // 30 分鐘 }); // 定時任務:每日凌晨 2 點執行 await queue.add('dailyReport', {}, { repeat: { pattern: '0 2 * * *', // Cron 表達式 tz: 'Asia/Hong_Kong' } }); // ✅ 用 Redis Sorted Set 按 timestamp 排序 // ✅ 唔需要額外嘅 cron 服務

5. 並發控制 + Rate Limiting

const worker = new Worker('sendEmail', processEmail, { connection, // 同時最多處理 10 個 jobs concurrency: 10, // Rate limiting:每 5 秒最多 100 個 jobs limiter: { max: 100, duration: 5000 } }); // ✅ 避免過載 // ✅ 符合 API rate limits // ✅ 控制資源使用

6. 事件監聽 + 監控

// Queue events queue.on('completed', (job) => { console.log(`Job ${job.id} completed`); }); queue.on('failed', (job, error) => { console.error(`Job ${job.id} failed:`, error); }); // Worker events worker.on('progress', (job, progress) => { console.log(`Job ${job.id} progress: ${progress}%`); }); // ✅ 完整嘅事件系統 // ✅ 方便監控同 logging // ✅ 可以接入 APM 工具

7. Parent-Child Dependencies

// 父任務 const parentJob = await queue.add('processProject', { projectId: '123' }); // 子任務(依賴父任務) await queue.add('processFile', { fileId: 'a' }, { parent: { id: parentJob.id, queue: queue.name } }); await queue.add('processFile', { fileId: 'b' }, { parent: { id: parentJob.id, queue: queue.name } }); // ✅ 所有子任務完成後,父任務先會標記為完成 // ✅ 適合複雜嘅工作流

完整對比:Redis Pub/Sub vs Streams vs BullMQ vs RabbitMQ

功能對比表

特性Redis Pub/SubRedis StreamsBullMQRabbitMQ
持久化❌ 冇✅ 有✅ 有(Redis)✅ 有(Disk)
Message History❌ 冇✅ 有⚠️ 有限⚠️ 有限
Consumer Groups❌ 冇✅ 有❌ 冇✅ 有
自動重試❌ 冇❌ 冇✅ 有✅ 有
優先級❌ 冇❌ 冇✅ 有✅ 有
延遲任務❌ 冇❌ 冇✅ 有✅ 有(plugin)
Rate Limiting❌ 冇❌ 冇✅ 有⚠️ 部分支援
廣播(Pub/Sub)✅ 有⚠️ 可模擬❌ 冇✅ 有
複雜路由❌ 冇❌ 冇❌ 冇✅ 有
跨語言✅ 有✅ 有❌ Node.js only✅ 有
依賴RedisRedisRedis獨立服務
學習曲線⭐ 最簡單⭐⭐ 簡單⭐⭐ 簡單⭐⭐⭐⭐ 複雜
效能 (msg/sec)~50k~20k~10k~5k
最適合場景即時通訊Event Sourcing背景任務企業級 MQ

選擇流程圖

開始 ↓ 需要即時廣播?(訊息可以遺失) ├─ 係 → Redis Pub/Sub └─ 唔係 ↓ 需要 message history / event sourcing? ├─ 係 → Redis Streams └─ 唔係 ↓ 需要可靠嘅 job queue?(重試、優先級、延遲) ├─ 係 → BullMQ └─ 唔係 ↓ 需要跨語言 / 複雜路由? ├─ 係 → RabbitMQ └─ 唔係 → BullMQ(最平衡嘅選擇)

實際場景推薦

場景推薦方案理由
聊天室、即時通知Redis Pub/Sub低延遲、即時廣播
用戶活動記錄Redis Streams需要歷史、多個 consumer
郵件發送BullMQ需要重試、rate limiting
圖片/影片處理BullMQ需要持久化、並發控制
定時報表BullMQ需要 cron / 延遲任務
Audit LogRedis Streams需要完整歷史記錄
微服務通訊(跨語言)RabbitMQ需要複雜路由、高可靠性

RabbitMQ 深入解析

RabbitMQ 係咩?

RabbitMQ 係一個獨立嘅 message broker,用 Erlang 編寫,實現咗 AMQP (Advanced Message Queuing Protocol) 標準。

關鍵特性:

  • ✅ 獨立服務:唔依賴其他數據庫(自己管理數據)
  • ✅ 跨語言:支援 Python、Java、Go、Node.js、C# 等
  • ✅ 複雜路由:Exchange + Routing Key 實現靈活嘅消息路由
  • ✅ 高可靠性:Transaction、Confirm、Ack 機制

RabbitMQ 核心概念

┌──────────┐ ┌─────────────────────────┐ ┌──────────┐ │Producer │────>│ RabbitMQ (Broker) │────>│Consumer │ └──────────┘ │ ┌─────────┐ ┌───────┐ │ └──────────┘ │ │Exchange │─>│ Queue │ │ │ └─────────┘ └───────┘ │ │ ↓ (Binding) │ │ Routing Key │ └─────────────────────────┘

組件說明:

  1. Producer:發送消息嘅應用
  2. Exchange:接收 producer 嘅消息,按路由規則分發到 queue
  3. Queue:儲存消息,等 consumer 處理
  4. Consumer:從 queue 接收同處理消息
  5. Binding:定義 exchange 同 queue 之間嘅關係
  6. Routing Key:消息嘅路由標籤

Exchange 類型

1. Direct Exchange(精確匹配)

# Routing Key 要完全一致先會路由 Exchange: "logs" Routing Key: "error" → Queue: error_queue Routing Key: "info" → Queue: info_queue Routing Key: "warn" → Queue: warn_queue

2. Topic Exchange(模式匹配)

# 用 * 同 # 做通配符 Exchange: "events" "user.created" → Queue: user_events "user.deleted" → Queue: user_events "order.*.shipped" → Queue: shipping_events "#.error" → Queue: error_queue # 所有 error

3. Fanout Exchange(廣播)

# 忽略 Routing Key,廣播到所有綁定嘅 queue Exchange: "notifications" → Queue: email_queue → Queue: sms_queue → Queue: push_queue

4. Headers Exchange(按 header 匹配)

# 根據 message header 路由(較少用)

RabbitMQ 實戰代碼

安裝

# 安裝 RabbitMQ docker run -d --name rabbitmq \ -p 5672:5672 \ -p 15672:15672 \ rabbitmq:3-management # 安裝 Node.js client npm install amqplib

範例 1:簡單 Work Queue

Producer(發送任務):

// producer.ts import amqp from 'amqplib'; async function sendTask(taskData: any) { // 連接 RabbitMQ const connection = await amqp.connect('amqp://localhost'); const channel = await connection.createChannel(); // 聲明 queue(如果唔存在就創建) const queue = 'tasks'; await channel.assertQueue(queue, { durable: true // 持久化 queue }); // 發送消息 channel.sendToQueue(queue, Buffer.from(JSON.stringify(taskData)), { persistent: true // 持久化消息 }); console.log('✅ Task sent:', taskData); // 關閉連接 setTimeout(() => { connection.close(); }, 500); } // 發送任務 sendTask({ type: 'sendEmail', to: 'user@example.com' });

Consumer(處理任務):

// consumer.ts import amqp from 'amqplib'; async function startWorker() { const connection = await amqp.connect('amqp://localhost'); const channel = await connection.createChannel(); const queue = 'tasks'; await channel.assertQueue(queue, { durable: true }); // 設定並發控制:一次只處理 1 個消息 channel.prefetch(1); console.log('👷 Worker waiting for tasks...'); // 消費消息 channel.consume(queue, async (msg) => { if (msg) { const task = JSON.parse(msg.content.toString()); console.log('📧 Processing:', task); try { // 處理任務 await processTask(task); // ✅ 確認處理完成 channel.ack(msg); console.log('✅ Task completed'); } catch (error) { console.error('❌ Task failed:', error); // ❌ 拒絕消息並重新排隊 channel.nack(msg, false, true); } } }); } async function processTask(task: any) { // 模擬處理任務 await new Promise(resolve => setTimeout(resolve, 1000)); if (task.type === 'sendEmail') { console.log(`Sending email to ${task.to}`); } } startWorker();

範例 2:Pub/Sub(用 Fanout Exchange)

Publisher(發布事件):

// publisher.ts import amqp from 'amqplib'; async function publishEvent(event: string, data: any) { const connection = await amqp.connect('amqp://localhost'); const channel = await connection.createChannel(); // 聲明 fanout exchange const exchange = 'events'; await channel.assertExchange(exchange, 'fanout', { durable: true }); // 發布消息(fanout 會忽略 routing key) channel.publish(exchange, '', Buffer.from(JSON.stringify({ event, data, timestamp: new Date() }))); console.log(`📢 Event published: ${event}`); setTimeout(() => connection.close(), 500); } // 發布用戶創建事件 publishEvent('user.created', { userId: '123', email: 'user@example.com' });

Subscriber(訂閱事件):

// subscriber-email.ts import amqp from 'amqplib'; async function subscribeEvents(serviceName: string) { const connection = await amqp.connect('amqp://localhost'); const channel = await connection.createChannel(); const exchange = 'events'; await channel.assertExchange(exchange, 'fanout', { durable: true }); // 創建專屬 queue(每個服務有自己嘅 queue) const { queue } = await channel.assertQueue(`${serviceName}_queue`, { exclusive: false, durable: true }); // 綁定 exchange 到 queue await channel.bindQueue(queue, exchange, ''); console.log(`👂 ${serviceName} listening for events...`); channel.consume(queue, (msg) => { if (msg) { const event = JSON.parse(msg.content.toString()); console.log(`${serviceName} received:`, event); // 處理事件 if (event.event === 'user.created') { console.log(`📧 Sending welcome email to ${event.data.email}`); } channel.ack(msg); } }); } subscribeEvents('email-service');
// subscriber-analytics.ts import amqp from 'amqplib'; // 另一個服務都可以收到同一個事件 subscribeEvents('analytics-service');

範例 3:Topic Exchange(按主題路由)

Publisher:

// log-publisher.ts import amqp from 'amqplib'; async function publishLog(level: string, service: string, message: string) { const connection = await amqp.connect('amqp://localhost'); const channel = await connection.createChannel(); // 聲明 topic exchange const exchange = 'logs'; await channel.assertExchange(exchange, 'topic', { durable: true }); // Routing key 格式:<service>.<level> const routingKey = `${service}.${level}`; channel.publish(exchange, routingKey, Buffer.from(JSON.stringify({ service, level, message, timestamp: new Date() }))); console.log(`📝 Log sent [${routingKey}]: ${message}`); setTimeout(() => connection.close(), 500); } // 發送不同服務、不同級別嘅日誌 publishLog('error', 'api', 'Database connection failed'); publishLog('info', 'api', 'Request received'); publishLog('error', 'worker', 'Job processing failed');

Subscriber(只接收 error 級別):

// error-monitor.ts import amqp from 'amqplib'; async function monitorErrors() { const connection = await amqp.connect('amqp://localhost'); const channel = await connection.createChannel(); const exchange = 'logs'; await channel.assertExchange(exchange, 'topic', { durable: true }); const { queue } = await channel.assertQueue('error_monitor', { durable: true }); // 綁定 pattern:*.error(所有服務嘅 error) await channel.bindQueue(queue, exchange, '*.error'); console.log('🚨 Monitoring all errors...'); channel.consume(queue, (msg) => { if (msg) { const log = JSON.parse(msg.content.toString()); console.log(`🚨 ERROR from ${log.service}:`, log.message); // 觸發告警 sendAlert(log); channel.ack(msg); } }); } function sendAlert(log: any) { console.log('📧 Sending alert to ops team...'); } monitorErrors();

Subscriber(接收特定服務嘅所有日誌):

// api-logs.ts import amqp from 'amqplib'; async function monitorApiLogs() { const connection = await amqp.connect('amqp://localhost'); const channel = await connection.createChannel(); const exchange = 'logs'; await channel.assertExchange(exchange, 'topic', { durable: true }); const { queue } = await channel.assertQueue('api_logs', { durable: true }); // 綁定 pattern:api.*(api 服務嘅所有日誌) await channel.bindQueue(queue, exchange, 'api.*'); console.log('📊 Monitoring API logs...'); channel.consume(queue, (msg) => { if (msg) { const log = JSON.parse(msg.content.toString()); console.log(`[API ${log.level}]:`, log.message); channel.ack(msg); } }); } monitorApiLogs();

RabbitMQ 進階功能

  1. Dead Letter Exchange(DLX)
// 設置 DLX:處理失敗嘅消息 async function setupQueueWithDLX() { const connection = await amqp.connect('amqp://localhost'); const channel = await connection.createChannel(); // 聲明 DLX await channel.assertExchange('dlx', 'direct', { durable: true }); // 聲明 DLQ(Dead Letter Queue) await channel.assertQueue('failed_jobs', { durable: true }); await channel.bindQueue('failed_jobs', 'dlx', 'failed'); // 聲明主 queue,指定 DLX await channel.assertQueue('jobs', { durable: true, arguments: { 'x-dead-letter-exchange': 'dlx', 'x-dead-letter-routing-key': 'failed', 'x-message-ttl': 60000 // 60 秒後過期 → DLQ } }); }
  1. Message TTL(消息過期)
// 發送帶 TTL 嘅消息 channel.sendToQueue('tasks', Buffer.from(JSON.stringify(task)), { expiration: '30000' // 30 秒後過期 });
  1. Priority Queue
// 聲明優先級 queue await channel.assertQueue('priority_tasks', { durable: true, arguments: { 'x-max-priority': 10 // 最大優先級 } }); // 發送高優先級消息 channel.sendToQueue('priority_tasks', Buffer.from(JSON.stringify(task)), { priority: 8 });

RabbitMQ vs BullMQ:詳細對比

特性BullMQRabbitMQ
部署需要 Redis獨立服務(Erlang)
語言支援Node.js only跨語言(Python, Java, Go, etc.)
學習曲線⭐⭐ 簡單⭐⭐⭐⭐ 較複雜
運維成本⭐⭐ 低(共用 Redis)⭐⭐⭐⭐ 高(額外服務)
路由功能❌ 冇(單一 queue)✅ 強大(Exchange + Routing)
Pub/Sub❌ 冇✅ 有(Fanout Exchange)
Topic Pattern❌ 冇✅ 有(Topic Exchange)
跨服務通訊❌ 唔適合✅ 專為此設計
Transaction❌ 冇✅ 有
Memory 使用⭐⭐ 低⭐⭐⭐⭐ 較高
效能 (msg/sec)~10k~5k
監控 UI第三方(Bull Board)✅ 內建(Management Plugin)

何時該用 RabbitMQ?

🐰 用 RabbitMQ 當:
✅ 跨語言通訊

Python Service → RabbitMQ → Node.js Service Java Service → RabbitMQ → Go Service

✅ 複雜路由需求

# Topic routing order.created.us → US fulfillment service order.created.eu → EU fulfillment service order.*.failed → Error monitoring service

✅ Event-driven 架構

User Service (publish: user.created) ↓ ├─> Email Service (send welcome email) ├─> Analytics Service (track signup) ├─> CRM Service (create contact) └─> Notification Service (push notification)

✅ 企業級要求

  • AMQP 標準

  • High Availability(cluster)

  • Federation(跨數據中心)

  • 審計同合規

唔該用 RabbitMQ 當:

❌ 簡單背景任務(email、image processing)→ 用 BullMQ

❌ 已經有 Redis,只係 Node.js → 用 BullMQ

❌ 小團隊,運維資源有限 → 用 BullMQ

❌ 唔需要跨語言 → 用 BullMQ

實戰代碼:用 BullMQ 處理背景任務

安裝

npm install bullmq ioredis

完整範例:郵件發送系統

1. 創建 Queue

// queue.ts import { Queue } from 'bullmq'; import Redis from 'ioredis'; const connection = new Redis({ host: 'localhost', port: 6379, maxRetriesPerRequest: null // BullMQ 需要呢個設定 }); export const emailQueue = new Queue('emails', { connection }); // 添加 job export async function sendEmail(to: string, subject: string, body: string) { await emailQueue.add( 'sendEmail', // Job name { to, subject, body }, // Job data { // 選項 attempts: 3, // 最多重試 3 次 backoff: { type: 'exponential', delay: 2000 // 2s, 4s, 8s }, removeOnComplete: 1000, // 保留最近 1000 個已完成嘅 jobs removeOnFail: 5000 // 保留最近 5000 個失敗嘅 jobs } ); }

2. 創建 Worker

// worker.ts import { Worker } from 'bullmq'; import { connection } from './queue'; import nodemailer from 'nodemailer'; const transporter = nodemailer.createTransport({ host: 'smtp.gmail.com', port: 587, auth: { user: process.env.EMAIL_USER, pass: process.env.EMAIL_PASS } }); const worker = new Worker( 'emails', async (job) => { const { to, subject, body } = job.data; // 更新進度 await job.updateProgress(10); // 發送郵件 await transporter.sendMail({ from: 'noreply@example.com', to, subject, html: body }); await job.updateProgress(100); // 返回結果 return { sent: true, timestamp: new Date() }; }, { connection, concurrency: 5, // 同時最多處理 5 封郵件 limiter: { max: 100, // 每 5 秒最多 100 封 duration: 5000 } } ); // 事件監聽 worker.on('completed', (job) => { console.log(`✅ Email sent to ${job.data.to}`); }); worker.on('failed', (job, error) => { console.error(`❌ Failed to send email to ${job?.data.to}:`, error); }); worker.on('progress', (job, progress) => { console.log(`📧 Sending email to ${job.data.to}: ${progress}%`); });

3. 使用 Queue

// api.ts import express from 'express'; import { sendEmail } from './queue'; const app = express(); app.use(express.json()); app.post('/send-email', async (req, res) => { const { to, subject, body } = req.body; try { await sendEmail(to, subject, body); res.json({ message: 'Email queued successfully' }); } catch (error) { res.status(500).json({ error: 'Failed to queue email' }); } }); app.listen(3000, () => { console.log('API server running on port 3000'); });

進階功能:優先級 + 延遲任務

// VIP 用戶郵件(高優先級) await emailQueue.add( 'sendEmail', { to: 'vip@example.com', subject: 'VIP Notice' }, { priority: 1 } ); // 普通用戶郵件(低優先級) await emailQueue.add( 'sendEmail', { to: 'user@example.com', subject: 'Regular Notice' }, { priority: 10 } ); // 延遲郵件(1 小時後發送) await emailQueue.add( 'sendEmail', { to: 'user@example.com', subject: 'Reminder' }, { delay: 60 * 60 * 1000 } ); // 定時郵件(每日早上 9 點) await emailQueue.add( 'dailyNewsletter', { subject: 'Daily Newsletter' }, { repeat: { pattern: '0 9 * * *', tz: 'Asia/Hong_Kong' } } );

效能優化同最佳實踐

1. Redis 連接管理

// ❌ 錯誤:每個 queue/worker 都創建新連接 const queue1 = new Queue('emails', { connection: new Redis() // 新連接 }); const queue2 = new Queue('images', { connection: new Redis() // 又一個新連接 }); // ✅ 正確:共享連接 const connection = new Redis({ maxRetriesPerRequest: null, enableReadyCheck: false, lazyConnect: true }); const queue1 = new Queue('emails', { connection }); const queue2 = new Queue('images', { connection: connection.duplicate() });

2. Job 數據優化

// ❌ 錯誤:將大量數據直接放入 job await queue.add('processImage', { imageBuffer: fs.readFileSync('large-image.jpg') // 幾 MB 嘅數據! }); // ✅ 正確:只傳 reference await queue.add('processImage', { imageUrl: 's3://bucket/large-image.jpg' // 只係一個 URL }); // Worker 自己去 fetch const worker = new Worker('processImage', async (job) => { const imageBuffer = await fetchFromS3(job.data.imageUrl); // ... 處理圖片 });

3. 並發調優

// 根據任務類型調整並發 // CPU 密集型(如影片編碼) const videoWorker = new Worker('processVideo', processVideo, { connection, concurrency: 2 // 低並發,避免 CPU 過載 }); // I/O 密集型(如發送郵件) const emailWorker = new Worker('sendEmail', sendEmail, { connection, concurrency: 20 // 高並發,因為大部分時間都係等 I/O });

4. 監控同日誌

import { QueueEvents } from 'bullmq'; const queueEvents = new QueueEvents('emails', { connection }); // 監控 queue 狀態 setInterval(async () => { const counts = await emailQueue.getJobCounts(); console.log('Queue stats:', counts); // { waiting: 10, active: 5, completed: 100, failed: 2 } }, 10000); // 接入 APM (如 DataDog) queueEvents.on('completed', ({ jobId, returnvalue }) => { statsd.increment('jobs.completed'); statsd.timing('jobs.duration', returnvalue.duration); });

💡 最佳實踐總結

  1. 連接管理:共享 Redis 連接,避免連接過多

  2. Job 數據:傳 reference 而唔係大數據

  3. 並發調優:根據任務類型調整 concurrency

  4. 錯誤處理:合理設定 attempts 同 backoff

  5. 監控日誌:接入 APM 工具追蹤效能

  6. 清理舊 jobs:設定 removeOnComplete/removeOnFail

  7. Rate Limiting:避免過載外部 API

總結

核心要點

  1. BullMQ ≠ Redis:
    • BullMQ 係基於 Redis 構建 嘅 library
    • 佢用 Redis 嘅數據結構(List, Sorted Set)嚟實現 queue
    • 你需要同時有 Redis 同 BullMQ
  2. Redis Pub/Sub 唔適合 Job Queue:
    • 冇持久化:message 發出即失去
    • 冇重試機制
    • 冇優先級、延遲任務、rate limiting
    • 適合即時通訊,唔適合背景任務
  3. BullMQ 嘅優勢:
    • 可靠:持久化 + 自動重試
    • 強大:優先級 + 延遲任務 + 並發控制
    • 簡單:唔需要額外嘅 message broker
    • 高效:10,000+ jobs/sec

何時用邊個?

場景推薦方案理由
即時聊天、通知Redis Pub/Sub低延遲、即時廣播
郵件發送BullMQ需要重試、rate limiting
圖片/影片處理BullMQ需要持久化、並發控制
報表生成BullMQ需要延遲任務、優先級
微服務通訊RabbitMQ需要複雜路由、高可靠性

相關資源

📄 官方文檔

  • BullMQ 文檔: docs.bullmq.io
  • BullMQ GitHub: github.com/taskforcesh/bullmq
  • Redis 文檔: redis.io/documentation

💻 相關工具

  • Bull Board: github.com/felixmosh/bull-board - BullMQ 嘅 Web UI
  • Taskforce.sh: taskforce.sh - BullMQ 官方監控平台

📚 延伸閱讀

  • BullMQ vs Agenda vs Bee-Queue: judoscale.com/blog/node-task-queues
  • Building Scalable Job Queues: medium.com/@sanipatel0401

記住:BullMQ 唔係 Redis 嘅替代品,而係 Redis 嘅最佳拍檔! 🐂

Back to all articles
目錄