Table of Contents

  1. Introduction: Why Message Queues Matter
  2. Understanding RabbitMQ Fundamentals
  3. System Design with Message Queues
  4. Setting Up Your Development Environment
  5. Your First RabbitMQ Producer
  6. Your First RabbitMQ Consumer
  7. Advanced Messaging Patterns
  8. Error Handling and Reliability
  9. Performance Optimization
  10. Monitoring and Operations
  11. Production Deployment
  12. Resources and Further Learning

Introduction: Why Message Queues Matter

In modern distributed systems, message queues have become as fundamental as databases. Think of them as the nervous system for your microservices architecture:

  • Decoupling: Services communicate without knowing about each other
  • Asynchronous Processing: Long-running tasks don’t block user requests
  • Load Balancing: Distribute work across multiple consumers
  • Reliability: Messages persist until successfully processed
  • Scalability: Add consumers without changing producers

Why RabbitMQ? RabbitMQ is one of the most mature, feature-rich message brokers available:

Mature & Battle-Tested: Over 15 years in production
Protocol Agnostic: AMQP, MQTT, STOMP, and more
Flexible Routing: Complex routing patterns out of the box
High Performance: Handles millions of messages per second
Clustering Support: Built-in high availability and scalability
Rich Management: Comprehensive monitoring and management tools

The Mental Model: Think of RabbitMQ as a smart post office:

  • Exchanges = Mail sorting rooms (categorize mail)
  • Queues = Mailboxes (hold mail for recipients)
  • Bindings = Rules connecting sorting rooms to mailboxes
  • Messages = Letters (the actual data being sent)

Understanding RabbitMQ Fundamentals

Before writing code, let’s understand the core concepts that make RabbitMQ powerful.

1. The AMQP Protocol

RabbitMQ implements AMQP (Advanced Message Queuing Protocol), which defines:

Producer → [Message] → Exchange → [Binding] → Queue → [Message] → Consumer

Key AMQP Concepts:

  • Messages: The data being transmitted (payload + metadata)
  • Producers: Applications that send messages
  • Consumers: Applications that receive messages
  • Exchanges: Route messages to queues based on rules
  • Queues: Buffer messages for consumers
  • Bindings: Links between exchanges and queues
  • Channels: Virtual connections within a physical TCP connection

2. Exchange Types and Routing

RabbitMQ’s power comes from its sophisticated routing:

Direct Exchange

Routes messages to queues with exact binding key match:

Exchange: direct
Binding: queue1 → "user.created"
Message: routing-key="user.created" → queue1 ✅
Message: routing-key="user.updated" → queue1 ❌

Topic Exchange

Routes using wildcard patterns:

Exchange: topic
Binding: queue1 → "user.*"
Message: routing-key="user.created" → queue1 ✅
Message: routing-key="user.updated" → queue1 ✅
Message: routing-key="order.created" → queue1 ❌

Wildcards:
* (star) = One word
# (hash) = Zero or more words

Fanout Exchange

Broadcasts to all bound queues:

Exchange: fanout
Binding: queue1 → "" (empty)
Binding: queue2 → "" (empty)
Message: Any routing-key → queue1 + queue2 ✅

Headers Exchange

Routes based on message headers rather than routing key:

Exchange: headers
Binding: queue1 → { "x-match": "all", "priority": "high" }
Message: headers={ "priority": "high", "source": "api" } → queue1 ✅
Message: headers={ "priority": "low", "source": "api" } → queue1 ❌

3. Message Properties and Metadata

Every message in RabbitMQ carries rich metadata:

{
  // Content
  content: "Your actual data here",
  contentType: "application/json",
  contentEncoding: "utf-8",
  
  // Routing
  routingKey: "user.events.created",
  exchange: "user_events",
  
  // Delivery guarantees
  deliveryMode: 2, // 1=non-persistent, 2=persistent
  priority: 5, // 0-9, higher = more important
  
  // Timestamps
  timestamp: 1634567890123,
  expiration: "86400000", // Message TTL in milliseconds
  
  // Headers for custom metadata
  headers: {
    "source": "user-service",
    "version": "1.0",
    "trace-id": "abc123"
  },
  
  // Unique identifier
  messageId: "unique-message-id-123"
}

4. Queue Features

Queues have powerful features for different use cases:

// Durable queue (survives broker restart)
queue: {
  name: "user_notifications",
  durable: true,
  autoDelete: false
}

// Temporary queue (deleted when last consumer disconnects)
queue: {
  name: "", // Random name generated by broker
  durable: false,
  autoDelete: true,
  exclusive: true // Only used by this connection
}

// Queue with TTL and limits
queue: {
  name: "processing_queue",
  messageTTL: 3600000, // 1 hour
  maxLength: 10000, // Max 10,000 messages
  overflow: "drop-head" // Drop oldest when full
}

System Design with Message Queues

Pattern 1: Task Queue Architecture

┌─────────┐    ┌──────────────┐    ┌─────────────┐
│   API   │───▶│   RabbitMQ   │───▶│  Worker Pool │
│ Service │    │   Exchange    │    │ (Consumers) │
└─────────┘    └──────────────┘    └─────────────┘

Flow:
1. API receives HTTP request
2. API publishes task to RabbitMQ
3. API immediately returns response (async)
4. Workers pick up tasks and process
5. Workers publish results to response queue
6. API polls response queue or uses webhook

Benefits:

  • API responsiveness: Never blocks on long tasks
  • Load balancing: Multiple workers share the load
  • Fault tolerance: Failed tasks can be retried
  • Scalability: Add workers without changing API

Implementation Example:

// API Service
app.post('/process-video', async (req, res) => {
  const taskId = generateId()
  
  // Publish task to workers
  await channel.publish('tasks', 'video.process', {
    taskId,
    videoUrl: req.body.videoUrl,
    options: req.body.options
  })
  
  // Return immediately
  res.json({ taskId, status: 'queued' })
})

// Worker Service
await channel.consume('video_processing', async (msg) => {
  const task = JSON.parse(msg.content.toString())
  
  try {
    const result = await processVideo(task.videoUrl, task.options)
    
    // Publish result
    await channel.publish('results', `task.${task.taskId}`, {
      taskId: task.taskId,
      result,
      status: 'completed'
    })
    
    channel.ack(msg)
  } catch (error) {
    // Publish error
    await channel.publish('results', `task.${task.taskId}`, {
      taskId: task.taskId,
      error: error.message,
      status: 'failed'
    })
    
    // Reject with requeue
    channel.nack(msg, false, true)
  }
})

Pattern 2: Event-Driven Architecture

┌──────────┐    ┌─────────────┐    ┌──────────────┐    ┌─────────────┐
│   User   │───▶│   User      │───▶│  RabbitMQ     │───▶│   Email      │
│ Service  │    │   Events     │    │   Event Bus    │    │   Service    │
└──────────┘    └─────────────┘    └──────────────┘    └─────────────┘

                      ┌──────────────┐    ┌─────────────┐
                      │   Analytics   │───▶│  Analytics   │
                      │   Service    │    │   Service    │
                      └──────────────┘    └─────────────┘

Flow:
1. User Service publishes user.created event
2. Email Service receives → sends welcome email
3. Analytics Service receives → updates metrics
4. Multiple services can subscribe independently

Benefits:

  • Loose Coupling: Services don’t know about each other
  • Easy to Add New Consumers: Just subscribe to events
  • Scalable: Each service scales independently
  • Resilient: One service failure doesn’t affect others

Implementation Example:

// User Service (Publisher)
async function createUser(userData) {
  const user = await db.users.create(userData)
  
  // Publish domain events
  await channel.publish('user.events', 'user.created', {
    userId: user.id,
    email: user.email,
    timestamp: new Date().toISOString(),
    source: 'user-service'
  })
  
  return user
}

// Email Service (Subscriber)
await channel.consume('email_notifications', async (msg) => {
  const event = JSON.parse(msg.content.toString())
  
  if (event.routingKey === 'user.created') {
    await sendWelcomeEmail(event.email)
  }
  
  channel.ack(msg)
})

// Analytics Service (Subscriber)
await channel.consume('analytics_events', async (msg) => {
  const event = JSON.parse(msg.content.toString())
  
  // Track user signup event
  await analytics.track('user_signup', {
    userId: event.userId,
    source: event.source,
    timestamp: event.timestamp
  })
  
  channel.ack(msg)
})

Pattern 3: Microservices Communication

┌─────────────┐    ┌──────────────┐    ┌─────────────┐
│   Order     │───▶│  RabbitMQ     │───▶│  Inventory  │
│   Service   │    │  RPC Queue    │    │   Service   │
└─────────────┘    └──────────────┘    └─────────────┘
      │                    │                    │
      │                    ▼                    │
      │            ┌──────────────┐            │
      └──────────▶│  Payment      │◀───────────┘
                   │   Service    │
                   └──────────────┘

Flow:
1. Order Service checks inventory via RPC
2. Inventory Service responds
3. Order Service processes payment via RPC
4. Services communicate synchronously through queues

Benefits:

  • Service Independence: Each service owns its data
  • Synchronous Communication: RPC pattern for immediate responses
  • Load Balancing: Multiple instances of each service
  • Circuit Breaking: Failed calls don’t cascade

Implementation Example:

// RPC Client Helper
class RPCClient {
  constructor(channel, serviceName) {
    this.channel = channel
    this.serviceName = serviceName
    this.responseQueue = null
    this.pendingRequests = new Map()
  }
  
  async call(method, params) {
    const correlationId = generateId()
    const replyTo = `${this.serviceName}_response_${process.pid}`
    
    // Create temporary response queue
    if (!this.responseQueue) {
      this.responseQueue = await this.channel.assertQueue(replyTo, {
        exclusive: true,
        autoDelete: true
      })
      
      await this.channel.consume(this.responseQueue.queue, (msg) => {
        const correlationId = msg.properties.correlationId
        const resolver = this.pendingRequests.get(correlationId)
        
        if (resolver) {
          resolver(JSON.parse(msg.content.toString()))
          this.pendingRequests.delete(correlationId)
        }
      })
    }
    
    // Send RPC request
    return new Promise((resolve, reject) => {
      this.pendingRequests.set(correlationId, { resolve, reject })
      
      this.channel.publish('rpc', `${this.serviceName}.${method}`, JSON.stringify(params), {
        correlationId,
        replyTo,
        expiration: 30000 // 30 second timeout
      })
      
      // Timeout handling
      setTimeout(() => {
        if (this.pendingRequests.has(correlationId)) {
          this.pendingRequests.delete(correlationId)
          reject(new Error('RPC timeout'))
        }
      }, 30000)
    })
  }
}

// Usage in Order Service
const inventoryClient = new RPCClient(channel, 'inventory')
const paymentClient = new RPCClient(channel, 'payment')

async function createOrder(orderData) {
  try {
    // Check inventory
    const inventoryResult = await inventoryClient.call('checkStock', {
      productId: orderData.productId,
      quantity: orderData.quantity
    })
    
    if (!inventoryResult.available) {
      throw new Error('Out of stock')
    }
    
    // Process payment
    const paymentResult = await paymentClient.call('processPayment', {
      amount: orderData.amount,
      paymentMethod: orderData.paymentMethod
    })
    
    if (!paymentResult.success) {
      throw new Error('Payment failed')
    }
    
    // Create order
    return await db.orders.create(orderData)
  } catch (error) {
    console.error('Order creation failed:', error)
    throw error
  }
}

// Inventory Service (RPC Server)
await channel.consume('inventory', async (msg) => {
  const { method, params } = JSON.parse(msg.content.toString())
  const { replyTo, correlationId } = msg.properties
  
  try {
    let result
    
    switch (method) {
      case 'checkStock':
        result = await checkInventory(params.productId, params.quantity)
        break
      case 'reserveStock':
        result = await reserveStock(params.productId, params.quantity)
        break
      default:
        throw new Error(`Unknown method: ${method}`)
    }
    
    // Send response
    await channel.sendToQueue(replyTo, Buffer.from(JSON.stringify(result)), {
      correlationId
    })
    
    channel.ack(msg)
  } catch (error) {
    // Send error response
    await channel.sendToQueue(replyTo, Buffer.from(JSON.stringify({
      error: error.message
    })), {
      correlationId
    })
    
    channel.nack(msg)
  }
})

Setting Up Your Development Environment

Create a docker-compose.yml file:

version: '3.8'
services:
  rabbitmq:
    image: rabbitmq:3.12-management
    container_name: rabbitmq
    ports:
      - "5672:5672"   # AMQP port
      - "15672:15672"  # Management UI port
      - "15692:15692"  # STOMP port
    environment:
      RABBITMQ_DEFAULT_USER: admin
      RABBITMQ_DEFAULT_PASS: admin123
      RABBITMQ_DEFAULT_VHOST: "/"
    volumes:
      - rabbitmq_data:/var/lib/rabbitmq
      - ./rabbitmq.conf:/etc/rabbitmq/rabbitmq.conf
    healthcheck:
      test: ["CMD", "rabbitmq-diagnostics", "ping"]
      interval: 30s
      timeout: 10s
      retries: 3

  # Optional: Management UI
  rabbitmq-ui:
    image: rabbitmq:3.12-management-alpine
    container_name: rabbitmq-ui
    ports:
      - "15673:15672"
    environment:
      RABBITMQ_DEFAULT_USER: admin
      RABBITMQ_DEFAULT_PASS: admin123
    depends_on:
      - rabbitmq

volumes:
  rabbitmq_data:

Start RabbitMQ:

docker-compose up -d

Access Management UI at http://localhost:15672 (admin/admin123).

Option 2: Local Installation

# Ubuntu/Debian
sudo apt-get update
sudo apt-get install rabbitmq-server

# macOS with Homebrew
brew install rabbitmq

# Start service
sudo systemctl start rabbitmq-server  # Linux
brew services start rabbitmq        # macOS

Installing Node.js Client

npm install amqplib
# or
yarn add amqplib

# For TypeScript
npm install --save-dev @types/node

Basic Configuration

Create src/rabbitmqConfig.js:

const amqp = require('amqplib')

const rabbitConfig = {
  url: process.env.RABBITMQ_URL || 'amqp://admin:admin123@localhost:5672',
  
  // Connection options
  socketOptions: {
    timeout: 30000, // 30 seconds
    heartbeat: 60     // Heartbeat every 60 seconds
  },
  
  // Client properties
  clientProperties: {
    product_name: 'my-app',
    product_version: '1.0.0',
    platform: 'Node.js',
    capabilities: {
      publisher_confirms: true,
      consumer_cancel_notify: true,
      basic.nack: true
    }
  },
  
  // Reconnection settings
  retry: {
    times: 10,
    interval: 1000
  }
}

// Connection helper
async function createConnection() {
  try {
    const connection = await amqp.connect(rabbitConfig.url, {
      timeout: rabbitConfig.socketOptions.timeout,
      heartbeat: rabbitConfig.socketOptions.heartbeat,
      clientProperties: rabbitConfig.clientProperties
    })
    
    console.log('Connected to RabbitMQ')
    return connection
  } catch (error) {
    console.error('Failed to connect to RabbitMQ:', error)
    throw error
  }
}

module.exports = {
  rabbitConfig,
  createConnection
}

Your First RabbitMQ Producer

Let’s build a simple producer that sends user events to RabbitMQ.

Basic Producer Example

const amqp = require('amqplib')
const { createConnection } = require('./rabbitmqConfig')

class EventProducer {
  constructor() {
    this.connection = null
    this.channel = null
  }
  
  async connect() {
    try {
      // Create connection and channel
      this.connection = await createConnection()
      this.channel = await this.connection.createChannel()
      
      // Enable publisher confirms
      await this.channel.confirmChannel()
      
      // Declare exchange
      await this.channel.assertExchange('user_events', 'topic', {
        durable: true
      })
      
      console.log('Producer connected and ready')
    } catch (error) {
      console.error('Failed to connect producer:', error)
      throw error
    }
  }
  
  async publishEvent(routingKey, event, options = {}) {
    if (!this.channel) {
      throw new Error('Producer not connected')
    }
    
    try {
      const message = {
        id: generateId(),
        type: event.type,
        data: event.data,
        timestamp: new Date().toISOString(),
        source: 'user-service',
        version: '1.0'
      }
      
      const published = this.channel.publish(
        'user_events',           // Exchange
        routingKey,             // Routing key
        Buffer.from(JSON.stringify(message)), // Message body
        {
          persistent: true,        // Survive broker restart
          messageId: message.id,  // Unique message ID
          timestamp: Date.now(),     // Message timestamp
          expiration: options.ttl || '86400000', // 24 hours TTL
          headers: {
            'content-type': 'application/json',
            'event-type': event.type,
            ...options.headers
          }
        }
      )
      
      if (published) {
        console.log(`Event published: ${routingKey}`, message)
        return message
      } else {
        throw new Error('Failed to publish event')
      }
    } catch (error) {
      console.error('Error publishing event:', error)
      throw error
    }
  }
  
  async publishUserEvent(eventType, userData, options = {}) {
    const routingKey = `user.${eventType}`
    const event = {
      type: eventType,
      data: userData
    }
    
    return await this.publishEvent(routingKey, event, options)
  }
  
  async disconnect() {
    try {
      if (this.channel) {
        await this.channel.close()
      }
      if (this.connection) {
        await this.connection.close()
      }
      console.log('Producer disconnected')
    } catch (error) {
      console.error('Error disconnecting producer:', error)
    }
  }
}

// Helper function
function generateId() {
  return Date.now().toString(36) + Math.random().toString(36).substr(2)
}

// Usage example
async function main() {
  const producer = new EventProducer()
  
  try {
    await producer.connect()
    
    // Publish different user events
    await producer.publishUserEvent('created', {
      userId: 'user123',
      email: 'john@example.com',
      name: 'John Doe'
    })
    
    await producer.publishUserEvent('updated', {
      userId: 'user123',
      changes: { email: 'john.doe@example.com' }
    })
    
    await producer.publishUserEvent('login', {
      userId: 'user123',
      ip: '192.168.1.100',
      userAgent: 'Mozilla/5.0...'
    }, {
      headers: {
        'priority': 'high',
        'source': 'web-app'
      }
    })
    
    // Wait for confirms
    await new Promise(resolve => setTimeout(resolve, 1000))
    
  } catch (error) {
    console.error('Error in main:', error)
  } finally {
    await producer.disconnect()
  }
}

main()

Producer with Error Handling and Retries

class ResilientProducer {
  constructor() {
    this.connection = null
    this.channel = null
    this.isConnecting = false
    this.reconnectAttempts = 0
    this.maxReconnectAttempts = 10
  }
  
  async connect() {
    if (this.isConnecting) return
    
    this.isConnecting = true
    
    while (this.reconnectAttempts < this.maxReconnectAttempts) {
      try {
        this.connection = await createConnection()
        this.channel = await this.connection.createChannel()
        
        // Set up error handlers
        this.connection.on('error', this.handleConnectionError.bind(this))
        this.connection.on('close', this.handleConnectionClose.bind(this))
        this.channel.on('error', this.handleChannelError.bind(this))
        
        // Enable publisher confirms
        await this.channel.confirmChannel()
        
        // Assert exchange
        await this.channel.assertExchange('events', 'topic', {
          durable: true
        })
        
        console.log('Producer connected successfully')
        this.reconnectAttempts = 0
        this.isConnecting = false
        return
        
      } catch (error) {
        this.reconnectAttempts++
        console.error(`Connection attempt ${this.reconnectAttempts} failed:`, error)
        
        if (this.reconnectAttempts < this.maxReconnectAttempts) {
          // Exponential backoff
          const delay = Math.min(1000 * Math.pow(2, this.reconnectAttempts - 1), 30000)
          await new Promise(resolve => setTimeout(resolve, delay))
        }
      }
    }
    
    this.isConnecting = false
    throw new Error('Failed to connect after maximum attempts')
  }
  
  async publishWithRetry(routingKey, message, maxRetries = 3) {
    let lastError
    
    for (let attempt = 1; attempt <= maxRetries; attempt++) {
      try {
        if (!this.channel) {
          await this.connect()
        }
        
        const result = await this.publishOnce(routingKey, message)
        console.log(`Message published on attempt ${attempt}`)
        return result
        
      } catch (error) {
        lastError = error
        console.error(`Publish attempt ${attempt} failed:`, error)
        
        if (attempt < maxRetries) {
          // Wait before retry
          await new Promise(resolve => setTimeout(resolve, 1000 * attempt))
        }
      }
    }
    
    throw lastError
  }
  
  async publishOnce(routingKey, message) {
    return new Promise((resolve, reject) => {
      const messageId = generateId()
      
      // Set up confirm handler
      const confirmHandler = (correlationId, ack) => {
        if (correlationId === messageId) {
          this.channel.removeListener('confirm', confirmHandler)
          this.channel.removeListener('return', returnHandler)
          
          if (ack) {
            resolve({ messageId, success: true })
          } else {
            reject(new Error('Message not acknowledged'))
          }
        }
      }
      
      // Set up return handler (message returned)
      const returnHandler = (correlationId, returned) => {
        if (correlationId === messageId) {
          this.channel.removeListener('confirm', confirmHandler)
          this.channel.removeListener('return', returnHandler)
          reject(new Error('Message returned: ' + returned.reason))
        }
      }
      
      this.channel.on('confirm', confirmHandler)
      this.channel.on('return', returnHandler)
      
      // Publish message
      const published = this.channel.publish(
        'events',
        routingKey,
        Buffer.from(JSON.stringify(message)),
        {
          messageId,
          persistent: true,
          timestamp: Date.now(),
          headers: {
            'content-type': 'application/json',
            'publish-attempt': Date.now().toString()
          }
        }
      )
      
      if (!published) {
        reject(new Error('Channel write buffer full'))
      }
    })
  }
  
  handleConnectionError(error) {
    console.error('Connection error:', error)
    // Attempt to reconnect
    this.connect().catch(console.error)
  }
  
  handleConnectionClose() {
    console.log('Connection closed, attempting to reconnect...')
    this.connect().catch(console.error)
  }
  
  handleChannelError(error) {
    console.error('Channel error:', error)
  }
}

Your First RabbitMQ Consumer

Let’s build a consumer that processes user events from RabbitMQ.

Basic Consumer Example

const amqp = require('amqplib')
const { createConnection } = require('./rabbitmqConfig')

class EventConsumer {
  constructor(queueName, handler) {
    this.queueName = queueName
    this.handler = handler
    this.connection = null
    this.channel = null
    this.isProcessing = false
  }
  
  async start() {
    try {
      // Create connection and channel
      this.connection = await createConnection()
      this.channel = await this.connection.createChannel()
      
      // Set prefetch count (QoS)
      await this.channel.prefetch(1)
      
      // Declare exchange
      await this.channel.assertExchange('user_events', 'topic', {
        durable: true
      })
      
      // Declare queue
      await this.channel.assertQueue(this.queueName, {
        durable: true,
        arguments: {
          'x-message-ttl': 86400000, // 24 hours
          'x-dead-letter-exchange': 'user_events_dlx',
          'x-dead-letter-routing-key': this.queueName
        }
      })
      
      // Bind queue to exchange
      await this.channel.bindQueue(this.queueName, 'user_events', 'user.*')
      
      // Set up consumer
      await this.channel.consume(this.queueName, this.handleMessage.bind(this), {
        noAck: false // Manual acknowledgment
      })
      
      console.log(`Consumer started for queue: ${this.queueName}`)
      
      // Set up error handlers
      this.connection.on('error', this.handleConnectionError.bind(this))
      this.connection.on('close', this.handleConnectionClose.bind(this))
      this.channel.on('error', this.handleChannelError.bind(this))
      this.channel.on('cancel', this.handleConsumerCancel.bind(this))
      
    } catch (error) {
      console.error('Failed to start consumer:', error)
      throw error
    }
  }
  
  async handleMessage(message) {
    if (this.isProcessing) {
      console.log('Already processing, skipping message')
      return
    }
    
    this.isProcessing = true
    
    try {
      const event = JSON.parse(message.content.toString())
      
      console.log(`Received event: ${message.fields.routingKey}`, event)
      
      // Process the event
      await this.handler(event, message)
      
      // Acknowledge message
      this.channel.ack(message)
      console.log(`Message processed and acknowledged`)
      
    } catch (error) {
      console.error('Error processing message:', error)
      
      // Negative acknowledgment with requeue
      this.channel.nack(message, false, true)
      console.log(`Message rejected and requeued`)
      
    } finally {
      this.isProcessing = false
    }
  }
  
  async stop() {
    try {
      if (this.channel) {
        // Cancel consumer
        await this.channel.cancel(this.queueName)
        
        // Close channel
        await this.channel.close()
      }
      
      if (this.connection) {
        await this.connection.close()
      }
      
      console.log(`Consumer stopped for queue: ${this.queueName}`)
    } catch (error) {
      console.error('Error stopping consumer:', error)
    }
  }
  
  handleConnectionError(error) {
    console.error('Connection error:', error)
  }
  
  handleConnectionClose() {
    console.log('Connection closed')
  }
  
  handleChannelError(error) {
    console.error('Channel error:', error)
  }
  
  handleConsumerCancel() {
    console.log('Consumer cancelled by server')
  }
}

// Usage example: Email notification consumer
async function emailNotificationHandler(event, message) {
  switch (event.type) {
    case 'created':
      await sendWelcomeEmail(event.data.email, event.data.name)
      break
      
    case 'updated':
      await sendProfileUpdateEmail(event.data.email, event.data.changes)
      break
      
    case 'login':
      await sendLoginNotificationEmail(event.data.email, event.data.ip)
      break
      
    default:
      console.log(`Unknown event type: ${event.type}`)
  }
}

// Usage example: Analytics consumer
async function analyticsHandler(event, message) {
  // Track user events in analytics
  await analytics.track(event.type, {
    userId: event.data.userId,
    timestamp: event.timestamp,
    source: event.source,
    routingKey: message.fields.routingKey
  })
}

// Start consumers
async function main() {
  const emailConsumer = new EventConsumer('email_notifications', emailNotificationHandler)
  const analyticsConsumer = new EventConsumer('analytics_events', analyticsHandler)
  
  try {
    await emailConsumer.start()
    await analyticsConsumer.start()
    
    console.log('All consumers started successfully')
    
    // Graceful shutdown
    process.on('SIGINT', async () => {
      console.log('Shutting down consumers...')
      await emailConsumer.stop()
      await analyticsConsumer.stop()
      process.exit(0)
    })
    
  } catch (error) {
    console.error('Error starting consumers:', error)
  }
}

main()

Consumer with Batch Processing

class BatchConsumer {
  constructor(queueName, handler, batchSize = 10, batchTimeout = 5000) {
    this.queueName = queueName
    this.handler = handler
    this.batchSize = batchSize
    this.batchTimeout = batchTimeout
    this.connection = null
    this.channel = null
    this.messageBuffer = []
    this.batchTimer = null
  }
  
  async start() {
    try {
      this.connection = await createConnection()
      this.channel = await this.connection.createChannel()
      
      // Set prefetch to batch size
      await this.channel.prefetch(this.batchSize)
      
      await this.channel.assertExchange('events', 'topic', { durable: true })
      await this.channel.assertQueue(this.queueName, { durable: true })
      await this.channel.bindQueue(this.queueName, 'events', 'batch.*')
      
      await this.channel.consume(this.queueName, this.handleMessage.bind(this), {
        noAck: false
      })
      
      console.log(`Batch consumer started: ${this.queueName}`)
      
    } catch (error) {
      console.error('Failed to start batch consumer:', error)
      throw error
    }
  }
  
  handleMessage(message) {
    try {
      const event = JSON.parse(message.content.toString())
      this.messageBuffer.push({ event, message })
      
      // If buffer is full, process immediately
      if (this.messageBuffer.length >= this.batchSize) {
        await this.processBatch()
      } else {
        // Set timer to process batch if timeout
        this.scheduleBatchProcessing()
      }
      
    } catch (error) {
      console.error('Error parsing message:', error)
      this.channel.nack(message, false, false) // Don't requeue malformed messages
    }
  }
  
  scheduleBatchProcessing() {
    if (this.batchTimer) {
      clearTimeout(this.batchTimer)
    }
    
    this.batchTimer = setTimeout(async () => {
      if (this.messageBuffer.length > 0) {
        await this.processBatch()
      }
    }, this.batchTimeout)
  }
  
  async processBatch() {
    if (this.batchTimer) {
      clearTimeout(this.batchTimer)
      this.batchTimer = null
    }
    
    if (this.messageBuffer.length === 0) return
    
    const batch = this.messageBuffer.splice(0)
    
    try {
      console.log(`Processing batch of ${batch.length} messages`)
      
      // Process all messages in batch
      await this.handler(batch.map(item => item.event))
      
      // Acknowledge all messages
      batch.forEach(item => {
        this.channel.ack(item.message)
      })
      
      console.log(`Batch processed successfully`)
      
    } catch (error) {
      console.error('Error processing batch:', error)
      
      // Negative acknowledgment for all messages
      batch.forEach(item => {
        this.channel.nack(item.message, false, true)
      })
    }
  }
  
  async stop() {
    if (this.batchTimer) {
      clearTimeout(this.batchTimer)
    }
    
    // Process remaining messages
    await this.processBatch()
    
    if (this.channel) {
      await this.channel.cancel(this.queueName)
      await this.channel.close()
    }
    
    if (this.connection) {
      await this.connection.close()
    }
  }
}

// Usage example: Database batch inserter
async function batchDatabaseInserter(events) {
  if (events.length === 0) return
  
  try {
    // Batch insert into database
    await db.events.insertMany(events.map(event => ({
      type: event.type,
      data: JSON.stringify(event.data),
      timestamp: new Date(event.timestamp),
      userId: event.data.userId,
      source: event.source
    })))
    
    console.log(`Inserted ${events.length} events into database`)
  } catch (error) {
    console.error('Database batch insert failed:', error)
    throw error
  }
}

// Start batch consumer
const batchConsumer = new BatchConsumer('database_batch', batchDatabaseInserter, 50, 2000)
batchConsumer.start().catch(console.error)

Advanced Messaging Patterns

Pattern 1: Work Queue with Competing Consumers

class WorkQueue {
  constructor(queueName, workerFunction, numWorkers = 3) {
    this.queueName = queueName
    this.workerFunction = workerFunction
    this.workers = []
    this.numWorkers = numWorkers
  }
  
  async start() {
    for (let i = 0; i < this.numWorkers; i++) {
      const worker = new Worker(this.queueName, this.workerFunction, i)
      await worker.start()
      this.workers.push(worker)
    }
    
    console.log(`Started ${this.numWorkers} workers for queue: ${this.queueName}`)
  }
  
  async stop() {
    await Promise.all(this.workers.map(worker => worker.stop()))
  }
}

class Worker {
  constructor(queueName, workerFunction, workerId) {
    this.queueName = queueName
    this.workerFunction = workerFunction
    this.workerId = workerId
    this.connection = null
    this.channel = null
    this.isProcessing = false
  }
  
  async start() {
    this.connection = await createConnection()
    this.channel = await this.connection.createChannel()
    
    // Set prefetch to 1 for fair dispatching
    await this.channel.prefetch(1)
    
    await this.channel.assertQueue(this.queueName, { durable: true })
    
    await this.channel.consume(this.queueName, async (msg) => {
      if (this.isProcessing) return
      
      this.isProcessing = true
      
      try {
        console.log(`Worker ${this.workerId} processing message`)
        
        await this.workerFunction(JSON.parse(msg.content.toString()))
        
        this.channel.ack(msg)
        console.log(`Worker ${this.workerId} completed message`)
        
      } catch (error) {
        console.error(`Worker ${this.workerId} error:`, error)
        this.channel.nack(msg, false, true) // Requeue
      } finally {
        this.isProcessing = false
      }
    }, {
      noAck: false
    })
    
    console.log(`Worker ${this.workerId} started`)
  }
  
  async stop() {
    if (this.channel) {
      await this.channel.cancel(this.queueName)
      await this.channel.close()
    }
    if (this.connection) {
      await this.connection.close()
    }
  }
}

// Usage
async function processTask(task) {
  console.log('Processing task:', task)
  
  // Simulate work
  await new Promise(resolve => setTimeout(resolve, Math.random() * 2000 + 1000))
  
  console.log('Task completed:', task.id)
}

const workQueue = new WorkQueue('task_queue', processTask, 5)
workQueue.start()

Pattern 2: Publish/Subscribe with Topic Routing

class TopicPublisher {
  constructor(exchangeName = 'app_events') {
    this.exchangeName = exchangeName
    this.connection = null
    this.channel = null
  }
  
  async connect() {
    this.connection = await createConnection()
    this.channel = await this.connection.createChannel()
    
    await this.channel.assertExchange(this.exchangeName, 'topic', {
      durable: true
    })
    
    await this.channel.confirmChannel()
  }
  
  async publish(topic, event, options = {}) {
    const routingKey = topic
    const message = {
      id: generateId(),
      timestamp: new Date().toISOString(),
      event,
      source: 'event-service'
    }
    
    return this.channel.publish(
      this.exchangeName,
      routingKey,
      Buffer.from(JSON.stringify(message)),
      {
        persistent: true,
        messageId: message.id,
        headers: {
          'content-type': 'application/json',
          'event-topic': topic,
          ...options.headers
        }
      }
    )
  }
}

class TopicSubscriber {
  constructor(exchangeName, pattern, handler) {
    this.exchangeName = exchangeName
    this.pattern = pattern
    this.handler = handler
    this.connection = null
    this.channel = null
    this.queueName = null
  }
  
  async start() {
    this.connection = await createConnection()
    this.channel = await this.connection.createChannel()
    
    await this.channel.assertExchange(this.exchangeName, 'topic', {
      durable: true
    })
    
    // Create unique queue for this subscriber
    this.queueName = `${this.pattern.replace('*', 'wildcard')}_${Date.now()}`
    
    await this.channel.assertQueue(this.queueName, {
      durable: true,
      exclusive: false,
      autoDelete: false
    })
    
    // Bind with pattern
    await this.channel.bindQueue(this.queueName, this.exchangeName, this.pattern)
    
    await this.channel.consume(this.queueName, async (msg) => {
      try {
        const event = JSON.parse(msg.content.toString())
        
        console.log(`Received event: ${msg.fields.routingKey}`, event)
        
        await this.handler(event, {
          routingKey: msg.fields.routingKey,
          exchange: msg.fields.exchange,
          timestamp: msg.properties.timestamp
        })
        
        this.channel.ack(msg)
        
      } catch (error) {
        console.error('Error processing event:', error)
        this.channel.nack(msg, false, false)
      }
    }, {
      noAck: false
    })
    
    console.log(`Topic subscriber started for pattern: ${this.pattern}`)
  }
}

// Usage example
async function userEventHandler(event, context) {
  switch (context.routingKey) {
    case 'user.created':
      console.log('New user created:', event.event.data)
      break
    case 'user.updated':
      console.log('User updated:', event.event.data)
      break
    case 'user.deleted':
      console.log('User deleted:', event.event.data)
      break
  }
}

const publisher = new TopicPublisher()
await publisher.connect()

// Create subscribers for different patterns
const userSubscriber = new TopicSubscriber('app_events', 'user.*', userEventHandler)
const orderSubscriber = new TopicSubscriber('app_events', 'order.*', orderEventHandler)

await userSubscriber.start()
await orderSubscriber.start()

// Publish events
await publisher.publish('user.created', { userId: '123', email: 'user@example.com' })
await publisher.publish('order.created', { orderId: '456', amount: 99.99 })

Pattern 3: Dead Letter Queue (DLQ) Pattern

class DLQManager {
  constructor() {
    this.connection = null
    this.channel = null
  }
  
  async setup() {
    this.connection = await createConnection()
    this.channel = await this.connection.createChannel()
    
    // Declare main exchange
    await this.channel.assertExchange('events', 'topic', { durable: true })
    
    // Declare dead letter exchange
    await this.channel.assertExchange('events_dlx', 'topic', { durable: true })
    
    // Declare dead letter queue
    await this.channel.assertQueue('events_dlq', {
      durable: true,
      arguments: {
        'x-message-ttl': 604800000, // 7 days
        'x-dead-letter-exchange': 'events',
        'x-dead-letter-routing-key': 'retry'
      }
    })
    
    // Bind DLQ to DLX
    await this.channel.bindQueue('events_dlq', 'events_dlx', '#')
    
    // Declare processing queue with DLQ
    await this.channel.assertQueue('events_processing', {
      durable: true,
      arguments: {
        'x-dead-letter-exchange': 'events_dlx',
        'x-dead-letter-routing-key': 'events_processing'
      }
    })
    
    // Bind processing queue to main exchange
    await this.channel.bindQueue('events_processing', 'events', 'important.*')
    
    // Set up consumer for processing queue
    await this.channel.consume('events_processing', async (msg) => {
      try {
        const event = JSON.parse(msg.content.toString())
        
        console.log('Processing important event:', event)
        
        // Simulate processing that might fail
        if (Math.random() < 0.2) { // 20% failure rate
          throw new Error('Simulated processing failure')
        }
        
        await this.processEvent(event)
        this.channel.ack(msg)
        
      } catch (error) {
        console.error('Processing failed:', error)
        this.channel.nack(msg, false, false) // Send to DLQ
      }
    }, {
      noAck: false
    })
    
    // Set up consumer for DLQ
    await this.channel.consume('events_dlq', async (msg) => {
      const failedEvent = JSON.parse(msg.content.toString())
      
      console.log('Failed event in DLQ:', failedEvent)
      
      // Extract failure information
      const failureInfo = {
        originalEvent: failedEvent,
        failureReason: msg.properties.headers['x-first-death-reason'],
        failedAt: new Date(parseInt(msg.properties.headers['x-first-death-'])),
        retryCount: parseInt(msg.properties.headers['x-death'].filter(x => x.count)[0]?.count || 0)
      }
      
      await this.handleFailedEvent(failureInfo)
      this.channel.ack(msg)
    }, {
      noAck: false
    })
    
    console.log('DLQ setup complete')
  }
  
  async processEvent(event) {
    // Your business logic here
    console.log('Successfully processed event:', event.id)
  }
  
  async handleFailedEvent(failureInfo) {
    console.log('Handling failed event:', failureInfo)
    
    // Store in database for manual review
    await db.failedEvents.create({
      eventId: failureInfo.originalEvent.id,
      reason: failureInfo.failureReason,
      failedAt: failureInfo.failedAt,
      retryCount: failureInfo.retryCount,
      eventData: failureInfo.originalEvent
    })
    
    // Send alert
    await sendAlert(`Event processing failed: ${failureInfo.originalEvent.id}`)
  }
  
  async sendAlert(message) {
    // Alert implementation (email, Slack, etc.)
    console.log('ALERT:', message)
  }
}

// Start DLQ manager
const dlqManager = new DLQManager()
dlqManager.setup().catch(console.error)

Pattern 4: Priority Queue Pattern

class PriorityPublisher {
  constructor() {
    this.connection = null
    this.channel = null
  }
  
  async connect() {
    this.connection = await createConnection()
    this.channel = await this.connection.createChannel()
    
    // Declare exchange with priority support
    await this.channel.assertExchange('priority_events', 'direct', {
      durable: true
    })
  }
  
  async publish(routingKey, message, priority = 5) {
    const messageData = {
      id: generateId(),
      timestamp: new Date().toISOString(),
      data: message,
      priority
    }
    
    return this.channel.publish(
      'priority_events',
      routingKey,
      Buffer.from(JSON.stringify(messageData)),
      {
        priority, // 0-9, higher = more important
        persistent: true,
        messageId: messageData.id,
        headers: {
          'content-type': 'application/json',
          'priority': priority.toString()
        }
      }
    )
  }
}

class PriorityConsumer {
  constructor(queueName, handler) {
    this.queueName = queueName
    this.handler = handler
    this.connection = null
    this.channel = null
  }
  
  async start() {
    this.connection = await createConnection()
    this.channel = await this.connection.createChannel()
    
    // Declare queue with max priority
    await this.channel.assertQueue(this.queueName, {
      durable: true,
      arguments: {
        'x-max-priority': 10 // Maximum priority value
      }
    })
    
    // Bind to priority exchange
    await this.channel.bindQueue(this.queueName, 'priority_events', this.queueName)
    
    await this.channel.consume(this.queueName, async (msg) => {
      try {
        const message = JSON.parse(msg.content.toString())
        const priority = msg.properties.priority || 5
        
        console.log(`Processing message with priority ${priority}:`, message)
        
        await this.handler(message, priority)
        this.channel.ack(msg)
        
      } catch (error) {
        console.error('Error processing priority message:', error)
        this.channel.nack(msg, false, false)
      }
    }, {
      noAck: false
    })
    
    console.log(`Priority consumer started: ${this.queueName}`)
  }
}

// Usage example
async function priorityHandler(message, priority) {
  // Handle based on priority
  if (priority >= 8) {
    console.log('HIGH PRIORITY - Immediate processing')
    await processImmediately(message)
  } else if (priority >= 5) {
    console.log('MEDIUM PRIORITY - Normal processing')
    await processNormally(message)
  } else {
    console.log('LOW PRIORITY - Background processing')
    await processInBackground(message)
  }
}

const publisher = new PriorityPublisher()
const consumer = new PriorityConsumer('priority_queue', priorityHandler)

await publisher.connect()
await consumer.start()

// Publish messages with different priorities
await publisher.publish('priority_queue', { task: 'urgent_task' }, 9)
await publisher.publish('priority_queue', { task: 'normal_task' }, 5)
await publisher.publish('priority_queue', { task: 'background_task' }, 1)

Error Handling and Reliability

Connection Resilience

class ResilientConnection {
  constructor(config) {
    this.config = config
    this.connection = null
    this.channels = new Map()
    this.reconnectTimer = null
    this.isReconnecting = false
    this.connectionPromise = null
  }
  
  async getConnection() {
    if (this.connection && this.connection.connection.serverProperties) {
      return this.connection
    }
    
    if (this.connectionPromise) {
      return this.connectionPromise
    }
    
    this.connectionPromise = this.establishConnection()
    return this.connectionPromise
  }
  
  async establishConnection() {
    while (true) {
      try {
        const connection = await amqp.connect(this.config.url, {
          timeout: this.config.timeout || 30000,
          heartbeat: this.config.heartbeat || 60,
          clientProperties: {
            product_name: 'resilient-app',
            connection_name: 'resilient-connection'
          }
        })
        
        this.setupConnectionHandlers(connection)
        this.connection = connection
        this.connectionPromise = null
        
        console.log('Connection established successfully')
        return connection
        
      } catch (error) {
        console.error('Connection failed:', error)
        
        // Exponential backoff
        const delay = Math.min(1000 * Math.pow(2, this.getRetryCount()), 30000)
        console.log(`Retrying connection in ${delay}ms...`)
        
        await new Promise(resolve => setTimeout(resolve, delay))
      }
    }
  }
  
  setupConnectionHandlers(connection) {
    connection.on('error', (error) => {
      console.error('Connection error:', error)
      this.handleConnectionError(error)
    })
    
    connection.on('close', (reason) => {
      console.log('Connection closed:', reason)
      this.handleConnectionClose(reason)
    })
    
    connection.on('blocked', (reason) => {
      console.warn('Connection blocked:', reason)
    })
    
    connection.on('unblocked', () => {
      console.log('Connection unblocked')
    })
  }
  
  handleConnectionError(error) {
    if (this.isReconnecting) return
    
    console.error('Handling connection error:', error)
    this.scheduleReconnect()
  }
  
  handleConnectionClose(reason) {
    if (this.isReconnecting) return
    
    console.log('Handling connection close:', reason)
    this.connection = null
    this.channels.clear()
    this.scheduleReconnect()
  }
  
  scheduleReconnect() {
    if (this.reconnectTimer) {
      clearTimeout(this.reconnectTimer)
    }
    
    this.isReconnecting = true
    
    this.reconnectTimer = setTimeout(async () => {
      console.log('Attempting to reconnect...')
      this.isReconnecting = false
      
      try {
        await this.establishConnection()
      } catch (error) {
        console.error('Reconnection failed:', error)
        this.scheduleReconnect()
      }
    }, 5000) // Wait 5 seconds before reconnecting
  }
  
  async getChannel() {
    const connection = await this.getConnection()
    
    // Check if we already have a channel for this connection
    const existingChannel = this.channels.get(connection)
    if (existingChannel) {
      return existingChannel
    }
    
    // Create new channel
    const channel = await connection.createChannel()
    this.channels.set(connection, channel)
    
    // Set up channel error handler
    channel.on('error', (error) => {
      console.error('Channel error:', error)
    })
    
    channel.on('close', () => {
      console.log('Channel closed')
      this.channels.delete(connection)
    })
    
    return channel
  }
  
  getRetryCount() {
    // Implement retry count tracking
    return this.retryCount = (this.retryCount || 0) + 1
  }
}

Message Processing Guarantees

class ReliableConsumer {
  constructor(queueName, handler, options = {}) {
    this.queueName = queueName
    this.handler = handler
    this.options = {
      maxRetries: options.maxRetries || 3,
      retryDelay: options.retryDelay || 5000,
      visibilityTimeout: options.visibilityTimeout || 30000,
      deadLetterQueue: `${queueName}_dlq`,
      ...options
    }
    
    this.connection = null
    this.channel = null
    this.processingMessages = new Map()
  }
  
  async start() {
    this.connection = await createConnection()
    this.channel = await this.connection.createChannel()
    
    // Declare dead letter exchange and queue
    await this.channel.assertExchange(`${this.queueName}_dlx`, 'direct', {
      durable: true
    })
    
    await this.channel.assertQueue(this.options.deadLetterQueue, {
      durable: true,
      arguments: {
        'x-message-ttl': 604800000 // 7 days
      }
    })
    
    await this.channel.bindQueue(
      this.options.deadLetterQueue,
      `${this.queueName}_dlx`,
      this.options.deadLetterQueue
    )
    
    // Declare main queue with DLQ
    await this.channel.assertQueue(this.queueName, {
      durable: true,
      arguments: {
        'x-dead-letter-exchange': `${this.queueName}_dlx`,
        'x-dead-letter-routing-key': this.options.deadLetterQueue,
        'x-message-ttl': this.options.visibilityTimeout
      }
    })
    
    await this.channel.consume(this.queueName, this.handleMessage.bind(this), {
      noAck: false
    })
    
    // Set up visibility timeout checker
    setInterval(() => this.checkTimeouts(), 5000)
  }
  
  async handleMessage(message) {
    const messageId = message.properties.messageId || message.fields.deliveryTag
    const startTime = Date.now()
    
    // Track message processing
    this.processingMessages.set(messageId, {
      message,
      startTime,
      retries: this.getMessageRetries(message)
    })
    
    try {
      console.log(`Processing message: ${messageId}`)
      
      // Process the message
      await this.handler(JSON.parse(message.content.toString()), message)
      
      // Remove from tracking and acknowledge
      this.processingMessages.delete(messageId)
      this.channel.ack(message)
      
      console.log(`Message processed successfully: ${messageId}`)
      
    } catch (error) {
      console.error(`Error processing message ${messageId}:`, error)
      
      const processingInfo = this.processingMessages.get(messageId)
      
      if (processingInfo.retries < this.options.maxRetries) {
        // Retry the message
        console.log(`Retrying message ${messageId} (attempt ${processingInfo.retries + 1})`)
        
        // Update retry count in headers
        this.processingMessages.set(messageId, {
          ...processingInfo,
          retries: processingInfo.retries + 1
        })
        
        // Requeue with delay
        setTimeout(() => {
          this.channel.nack(message, false, true)
        }, this.options.retryDelay)
        
      } else {
        // Max retries reached - send to DLQ
        console.log(`Max retries reached for message ${messageId}, sending to DLQ`)
        
        this.processingMessages.delete(messageId)
        this.channel.nack(message, false, false) // Don't requeue
        
        // Log to error tracking system
        await this.logProcessingError(messageId, error, processingInfo.retries)
      }
    }
  }
  
  getMessageRetries(message) {
    const retryHeader = message.properties.headers['x-retry-count']
    return retryHeader ? parseInt(retryHeader) : 0
  }
  
  checkTimeouts() {
    const now = Date.now()
    
    for (const [messageId, processingInfo] of this.processingMessages) {
      if (now - processingInfo.startTime > this.options.visibilityTimeout) {
        console.warn(`Message ${messageId} processing timeout`)
        
        // Treat as failed and send to DLQ
        this.channel.nack(processingInfo.message, false, false)
        this.processingMessages.delete(messageId)
        
        // Log timeout
        this.logTimeout(messageId, processingInfo)
      }
    }
  }
  
  async logProcessingError(messageId, error, retries) {
    // Log to your monitoring system
    await errorLogger.log({
      messageId,
      error: error.message,
      stack: error.stack,
      retries,
      timestamp: new Date().toISOString(),
      queue: this.queueName
    })
  }
  
  async logTimeout(messageId, processingInfo) {
    // Log timeout to monitoring system
    await errorLogger.log({
      messageId,
      error: 'Processing timeout',
      processingTime: Date.now() - processingInfo.startTime,
      retries: processingInfo.retries,
      timestamp: new Date().toISOString(),
      queue: this.queueName
    })
  }
}

Performance Optimization

Connection Pooling

class ConnectionPool {
  constructor(config, poolSize = 5) {
    this.config = config
    this.poolSize = poolSize
    this.pool = []
    this.waitingQueue = []
    this.activeConnections = 0
  }
  
  async initialize() {
    for (let i = 0; i < this.poolSize; i++) {
      try {
        const connection = await this.createConnection()
        this.pool.push(connection)
      } catch (error) {
        console.error(`Failed to create connection ${i}:`, error)
      }
    }
    
    console.log(`Connection pool initialized with ${this.pool.length} connections`)
  }
  
  async getConnection() {
    // Return available connection from pool
    if (this.pool.length > 0) {
      const connection = this.pool.pop()
      this.activeConnections++
      return connection
    }
    
    // Wait for available connection
    return new Promise((resolve) => {
      this.waitingQueue.push(resolve)
    })
  }
  
  releaseConnection(connection) {
    // Check if connection is still valid
    if (connection.connection && connection.connection.serverProperties) {
      this.pool.push(connection)
      this.activeConnections--
      
      // Resolve next waiting request
      if (this.waitingQueue.length > 0) {
        const nextResolve = this.waitingQueue.shift()
        const nextConnection = this.pool.pop()
        this.activeConnections++
        nextResolve(nextConnection)
      }
    } else {
      // Connection is broken, create new one
      this.createConnection().then(newConnection => {
        this.pool.push(newConnection)
      })
    }
  }
  
  async createConnection() {
    const connection = await amqp.connect(this.config.url, {
      timeout: this.config.timeout || 30000,
      heartbeat: this.config.heartbeat || 60
    })
    
    connection.on('error', () => {
      this.activeConnections--
    })
    
    connection.on('close', () => {
      this.activeConnections--
    })
    
    return connection
  }
  
  getStats() {
    return {
      poolSize: this.poolSize,
      availableConnections: this.pool.length,
      activeConnections: this.activeConnections,
      waitingRequests: this.waitingQueue.length
    }
  }
}

// Usage with connection pooling
const connectionPool = new ConnectionPool(rabbitConfig, 10)
await connectionPool.initialize()

class PooledProducer {
  constructor(connectionPool) {
    this.connectionPool = connectionPool
    this.channels = new Map()
  }
  
  async publish(exchange, routingKey, message, options = {}) {
    const connection = await this.connectionPool.getConnection()
    
    try {
      let channel = this.channels.get(connection)
      
      if (!channel) {
        channel = await connection.createChannel()
        await channel.assertExchange(exchange, 'topic', { durable: true })
        await channel.confirmChannel()
        this.channels.set(connection, channel)
      }
      
      const result = channel.publish(exchange, routingKey, Buffer.from(JSON.stringify(message)), {
        persistent: true,
        messageId: generateId(),
        timestamp: Date.now(),
        ...options
      })
      
      this.connectionPool.releaseConnection(connection)
      return result
      
    } catch (error) {
      this.connectionPool.releaseConnection(connection)
      throw error
    }
  }
}

Batch Publishing Optimization

class BatchPublisher {
  constructor(exchangeName, batchSize = 100, flushInterval = 1000) {
    this.exchangeName = exchangeName
    this.batchSize = batchSize
    this.flushInterval = flushInterval
    this.connection = null
    this.channel = null
    this.messageQueue = []
    this.flushTimer = null
    this.isPublishing = false
  }
  
  async start() {
    this.connection = await createConnection()
    this.channel = await this.connection.createChannel()
    
    await this.channel.assertExchange(this.exchangeName, 'topic', {
      durable: true
    })
    
    await this.channel.confirmChannel()
    
    // Start periodic flush
    this.startPeriodicFlush()
    
    console.log(`Batch publisher started for exchange: ${this.exchangeName}`)
  }
  
  async publish(routingKey, message, options = {}) {
    this.messageQueue.push({
      routingKey,
      message: {
        id: generateId(),
        timestamp: new Date().toISOString(),
        data: message
      },
      options
    })
    
    // Flush immediately if batch is full
    if (this.messageQueue.length >= this.batchSize) {
      await this.flush()
    }
  }
  
  async flush() {
    if (this.messageQueue.length === 0 || this.isPublishing) return
    
    this.isPublishing = true
    
    try {
      const batch = this.messageQueue.splice(0)
      console.log(`Publishing batch of ${batch.length} messages`)
      
      // Group messages by routing key for efficiency
      const messagesByKey = batch.reduce((acc, item) => {
        if (!acc[item.routingKey]) acc[item.routingKey] = []
        acc[item.routingKey].push(item)
        return acc
      }, {})
      
      // Publish messages
      for (const [routingKey, messages] of Object.entries(messagesByKey)) {
        for (const { message, options } of messages) {
          const published = this.channel.publish(
            this.exchangeName,
            routingKey,
            Buffer.from(JSON.stringify(message)),
            {
              persistent: true,
              messageId: message.id,
              timestamp: Date.now(),
              ...options
            }
          )
          
          if (!published) {
            throw new Error('Failed to publish message')
          }
        }
      }
      
      // Wait for confirms
      await this.waitForConfirms(batch.length)
      
      console.log(`Batch published successfully`)
      
    } catch (error) {
      console.error('Error publishing batch:', error)
      // Re-queue failed messages
      this.messageQueue.unshift(...batch)
    } finally {
      this.isPublishing = false
    }
  }
  
  startPeriodicFlush() {
    this.flushTimer = setInterval(async () => {
      if (this.messageQueue.length > 0) {
        await this.flush()
      }
    }, this.flushInterval)
  }
  
  async waitForConfirms(expectedCount) {
    return new Promise((resolve) => {
      let confirmedCount = 0
      
      const confirmHandler = () => {
        confirmedCount++
        if (confirmedCount >= expectedCount) {
          this.channel.removeListener('confirm', confirmHandler)
          resolve()
        }
      }
      
      this.channel.on('confirm', confirmHandler)
    })
  }
  
  async stop() {
    if (this.flushTimer) {
      clearInterval(this.flushTimer)
    }
    
    // Flush remaining messages
    await this.flush()
    
    if (this.channel) {
      await this.channel.close()
    }
    
    if (this.connection) {
      await this.connection.close()
    }
  }
}

Monitoring and Operations

RabbitMQ Management Interface

class RabbitMQMonitor {
  constructor(managementUrl, credentials) {
    this.managementUrl = managementUrl
    this.credentials = credentials
    this.baseAuth = Buffer.from(`${credentials.username}:${credentials.password}`).toString('base64')
  }
  
  async makeRequest(path, method = 'GET', data = null) {
    const url = `${this.managementUrl}${path}`
    
    const options = {
      method,
      headers: {
        'Authorization': `Basic ${this.baseAuth}`,
        'Content-Type': 'application/json'
      }
    }
    
    if (data) {
      options.body = JSON.stringify(data)
    }
    
    const response = await fetch(url, options)
    
    if (!response.ok) {
      throw new Error(`Management API error: ${response.status} ${response.statusText}`)
    }
    
    return response.json()
  }
  
  async getOverview() {
    return await this.makeRequest('/api/overview')
  }
  
  async getQueues() {
    return await this.makeRequest('/api/queues')
  }
  
  async getExchanges() {
    return await this.makeRequest('/api/exchanges')
  }
  
  async getConnections() {
    return await this.makeRequest('/api/connections')
  }
  
  async getChannels() {
    return await this.makeRequest('/api/channels')
  }
  
  async getQueueDetails(queueName) {
    return await this.makeRequest(`/api/queues/${encodeURIComponent(queueName)}`)
  }
  
  async purgeQueue(queueName) {
    return await this.makeRequest(`/api/queues/${encodeURIComponent(queueName)}/contents`, 'DELETE')
  }
  
  async createQueue(queueName, config) {
    return await this.makeRequest(`/api/queues/${encodeURIComponent(queueName)}`, 'PUT', config)
  }
  
  async deleteQueue(queueName) {
    return await this.makeRequest(`/api/queues/${encodeURIComponent(queueName)}`, 'DELETE')
  }
}

// Usage example
const monitor = new RabbitMQMonitor('http://localhost:15672/api', {
  username: 'admin',
  password: 'admin123'
})

// Get system overview
const overview = await monitor.getOverview()
console.log('RabbitMQ Overview:', overview)

// Get queue details
const queues = await monitor.getQueues()
console.log('Queues:', queues)

// Monitor specific queue
setInterval(async () => {
  const queueDetails = await monitor.getQueueDetails('email_notifications')
  
  if (queueDetails.messages_ready > 1000) {
    console.warn(`Queue email_notifications has ${queueDetails.messages_ready} messages ready`)
    await sendAlert(`Queue backlog alert: ${queueDetails.messages_ready} messages`)
  }
}, 30000) // Check every 30 seconds

Custom Metrics Collection

class RabbitMQMetrics {
  constructor(connection) {
    this.connection = connection
    this.metrics = {
      messagesPublished: 0,
      messagesConsumed: 0,
      messagesAcked: 0,
      messagesNacked: 0,
      publishErrors: 0,
      consumeErrors: 0,
      connectionErrors: 0,
      channelErrors: 0
    }
    this.startTime = Date.now()
  }
  
  setupMetricsCollection(channel) {
    // Track publishes
    const originalPublish = channel.publish.bind(channel)
    channel.publish = (exchange, routingKey, content, options) => {
      try {
        const result = originalPublish(exchange, routingKey, content, options)
        if (result) {
          this.metrics.messagesPublished++
        } else {
          this.metrics.publishErrors++
        }
        return result
      } catch (error) {
        this.metrics.publishErrors++
        throw error
      }
    }
    
    // Track confirms
    channel.on('confirm', (correlationId, ack) => {
      if (ack) {
        this.metrics.messagesAcked++
      } else {
        this.metrics.messagesNacked++
      }
    })
    
    // Track consumes
    const originalConsume = channel.consume.bind(channel)
    channel.consume = (queue, callback, options) => {
      const wrappedCallback = async (msg) => {
        try {
          this.metrics.messagesConsumed++
          await callback(msg)
        } catch (error) {
          this.metrics.consumeErrors++
          throw error
        }
      }
      
      return originalConsume(queue, wrappedCallback, options)
    }
    
    // Track ack/nack
    const originalAck = channel.ack.bind(channel)
    channel.ack = (msg) => {
      this.metrics.messagesAcked++
      originalAck(msg)
    }
    
    const originalNack = channel.nack.bind(channel)
    channel.nack = (msg, allUpTo, requeue) => {
      this.metrics.messagesNacked++
      originalNack(msg, allUpTo, requeue)
    }
  }
  
  getMetrics() {
    const uptime = Date.now() - this.startTime
    
    return {
      ...this.metrics,
      uptime,
      publishRate: this.metrics.messagesPublished / (uptime / 1000),
      consumeRate: this.metrics.messagesConsumed / (uptime / 1000),
      ackRate: this.metrics.messagesAcked / (uptime / 1000),
      errorRate: (this.metrics.publishErrors + this.metrics.consumeErrors) / (uptime / 1000)
    }
  }
  
  resetMetrics() {
    this.metrics = {
      messagesPublished: 0,
      messagesConsumed: 0,
      messagesAcked: 0,
      messagesNacked: 0,
      publishErrors: 0,
      consumeErrors: 0,
      connectionErrors: 0,
      channelErrors: 0
    }
    this.startTime = Date.now()
  }
}

// Usage example
const connection = await createConnection()
const channel = await connection.createChannel()

const metrics = new RabbitMQMetrics(connection)
metrics.setupMetricsCollection(channel)

// Report metrics every minute
setInterval(() => {
  const currentMetrics = metrics.getMetrics()
  console.log('RabbitMQ Metrics:', currentMetrics)
  
  // Send to monitoring system
  sendMetricsToMonitoring(currentMetrics)
}, 60000)

Health Check Service

class RabbitMQHealthCheck {
  constructor(connectionPool) {
    this.connectionPool = connectionPool
    this.lastHealthCheck = null
    this.isHealthy = true
  }
  
  async performHealthCheck() {
    const startTime = Date.now()
    
    try {
      // Get connection from pool
      const connection = await this.connectionPool.getConnection()
      
      // Create temporary channel for health check
      const channel = await connection.createChannel()
      
      // Try to perform basic operations
      await channel.assertQueue('health_check_queue', { durable: false })
      
      // Publish and consume test message
      const testMessage = {
        timestamp: Date.now(),
        checkId: generateId()
      }
      
      await channel.publish('', 'health_check_queue', Buffer.from(JSON.stringify(testMessage)))
      
      // Clean up
      await channel.deleteQueue('health_check_queue')
      await channel.close()
      
      // Return connection to pool
      this.connectionPool.releaseConnection(connection)
      
      const duration = Date.now() - startTime
      this.lastHealthCheck = {
        status: 'healthy',
        duration,
        timestamp: new Date().toISOString()
      }
      
      this.isHealthy = true
      
    } catch (error) {
      const duration = Date.now() - startTime
      this.lastHealthCheck = {
        status: 'unhealthy',
        error: error.message,
        duration,
        timestamp: new Date().toISOString()
      }
      
      this.isHealthy = false
      console.error('Health check failed:', error)
    }
  }
  
  async startPeriodicHealthCheck(intervalMs = 30000) {
    setInterval(async () => {
      await this.performHealthCheck()
      
      if (!this.isHealthy) {
        console.warn('RabbitMQ health check failed - triggering alert')
        await this.triggerHealthAlert()
      }
    }, intervalMs)
  }
  
  async triggerHealthAlert() {
    // Send alert to monitoring system
    await sendAlert({
      service: 'rabbitmq',
      status: 'unhealthy',
      check: this.lastHealthCheck,
      timestamp: new Date().toISOString()
    })
  }
  
  getHealthStatus() {
    return {
      isHealthy: this.isHealthy,
      lastCheck: this.lastHealthCheck
    }
  }
}

// Express health check endpoint
const express = require('express')
const app = express()

const healthCheck = new RabbitMQHealthCheck(connectionPool)
await healthCheck.startPeriodicHealthCheck()

app.get('/health/rabbitmq', async (req, res) => {
  const health = await healthCheck.performHealthCheck()
  const statusCode = health.isHealthy ? 200 : 503
  
  res.status(statusCode).json({
    status: health.isHealthy ? 'healthy' : 'unhealthy',
    timestamp: health.lastHealthCheck.timestamp,
    details: health.lastHealthCheck
  })
})

app.listen(3000, () => {
  console.log('Health check service listening on port 3000')
})

Production Deployment

Docker Production Configuration

# docker-compose.prod.yml
version: '3.8'

services:
  rabbitmq:
    image: rabbitmq:3.12-management-alpine
    container_name: rabbitmq-prod
    restart: unless-stopped
    ports:
      - "5672:5672"
      - "15672:15672"
    environment:
      RABBITMQ_DEFAULT_USER: ${RABBITMQ_USER}
      RABBITMQ_DEFAULT_PASS: ${RABBITMQ_PASS}
      RABBITMQ_DEFAULT_VHOST: "/"
      RABBITMQ_ERLANG_COOKIE: ${RABBITMQ_ERLANG_COOKIE}
    volumes:
      - rabbitmq_data:/var/lib/rabbitmq
      - ./rabbitmq.conf:/etc/rabbitmq/rabbitmq.conf
      - ./definitions.json:/etc/rabbitmq/definitions.json
    healthcheck:
      test: ["CMD", "rabbitmq-diagnostics", "ping"]
      interval: 30s
      timeout: 10s
      retries: 3
      start_period: 60s
    deploy:
      resources:
        limits:
          memory: 2G
          cpus: '1.0'
        reservations:
          memory: 1G
          cpus: '0.5'

  # HAProxy for load balancing
  haproxy:
    image: haproxy:2.8-alpine
    container_name: rabbitmq-lb
    restart: unless-stopped
    ports:
      - "5672:5672"
    volumes:
      - ./haproxy.cfg:/usr/local/etc/haproxy/haproxy.cfg
    depends_on:
      - rabbitmq
    deploy:
      resources:
        limits:
          memory: 512M
          cpus: '0.25'

volumes:
  rabbitmq_data:

RabbitMQ Configuration

# rabbitmq.conf
# Production RabbitMQ configuration

# Memory and disk thresholds
vm_memory_high_watermark.relative = 0.6
disk_free_limit.absolute = 2GB

# Connection limits
heartbeat = 60
tcp_listen_options.backlog = 128
tcp_listen_options.nodelay = true

# Resource limits
default_vhost = /
default_user = admin
default_pass = ${RABBITMQ_PASS}
default_permissions.configure = .*
default_permissions.read = .*

# Clustering
cluster_formation.peer_discovery_backend = classic_config
cluster_formation.classic_config.nodes.1 = rabbit@rabbitmq1
cluster_formation.classic_config.nodes.2 = rabbit@rabbitmq2
cluster_formation.classic_config.nodes.3 = rabbit@rabbitmq3

# Federation (if needed)
federation_upstream = upstream-server

# Management plugin
management.tcp.port = 15672
management.tcp.ip = 0.0.0.0

# MQTT plugin (if needed)
mqtt.tcp.port = 1883
mqtt.tcp.ip = 0.0.0.0
mqtt.default_user = mqtt_user
mqtt.default_pass = ${MQTT_PASS}

# STOMP plugin (if needed)
stomp.tcp.port = 61613
stomp.tcp.ip = 0.0.0.0

HAProxy Configuration

# haproxy.cfg
global
    daemon
    maxconn 4096
    log stdout format raw local0

defaults
    mode tcp
    timeout connect 5000ms
    timeout client 50000ms
    timeout server 50000ms
    option tcplog

listen rabbitmq_cluster
    bind *:5672
    mode tcp
    balance roundrobin
    option tcpka
    option tcplog
    server rabbitmq1 rabbitmq1:5672 check inter 2000 rise 2 fall 3
    server rabbitmq2 rabbitmq2:5672 check inter 2000 rise 2 fall 3
    server rabbitmq3 rabbitmq3:5672 check inter 2000 rise 2 fall 3

listen rabbitmq_management
    bind *:15672
    mode tcp
    balance roundrobin
    option tcplog
    server rabbitmq1 rabbitmq1:15672 check
    server rabbitmq2 rabbitmq2:15672 check
    server rabbitmq3 rabbitmq3:15672 check

Kubernetes Deployment

# rabbitmq-k8s.yaml
apiVersion: apps/v1
kind: StatefulSet
metadata:
  name: rabbitmq
spec:
  serviceName: rabbitmq
  replicas: 3
  selector:
    matchLabels:
      app: rabbitmq
  template:
    metadata:
      labels:
        app: rabbitmq
    spec:
      containers:
      - name: rabbitmq
        image: rabbitmq:3.12-management-alpine
        ports:
        - containerPort: 5672
        - containerPort: 15672
        env:
        - name: RABBITMQ_DEFAULT_USER
          valueFrom:
            secretKeyRef:
              name: rabbitmq-secret
              key: username
        - name: RABBITMQ_DEFAULT_PASS
          valueFrom:
            secretKeyRef:
              name: rabbitmq-secret
              key: password
        - name: RABBITMQ_ERLANG_COOKIE
          valueFrom:
            secretKeyRef:
              name: rabbitmq-secret
              key: erlang-cookie
        volumeMounts:
        - name: rabbitmq-config
          mountPath: /etc/rabbitmq
        - name: rabbitmq-data
          mountPath: /var/lib/rabbitmq
        resources:
          requests:
            memory: "1Gi"
            cpu: "500m"
          limits:
            memory: "2Gi"
            cpu: "1000m"
        livenessProbe:
          exec:
            command:
            - rabbitmq-diagnostics
            - ping
          initialDelaySeconds: 60
          periodSeconds: 30
        readinessProbe:
          exec:
            command:
            - rabbitmq-diagnostics
            - ping
          initialDelaySeconds: 10
          periodSeconds: 5
  volumeClaimTemplates:
  - metadata:
      name: rabbitmq-config
    spec:
      accessModes: ["ReadWriteOnce"]
      resources:
        requests:
          storage: 100Mi
  - metadata:
      name: rabbitmq-data
    spec:
      accessModes: ["ReadWriteOnce"]
      resources:
        requests:
          storage: 10Gi

---
apiVersion: v1
kind: Service
metadata:
  name: rabbitmq-service
spec:
  selector:
    app: rabbitmq
  ports:
  - name: amqp
    port: 5672
    targetPort: 5672
  - name: management
    port: 15672
    targetPort: 15672
  type: ClusterIP
  clusterIP: None

---
apiVersion: v1
kind: Service
metadata:
  name: rabbitmq-lb
spec:
  selector:
    app: rabbitmq
  ports:
  - name: amqp
    port: 5672
    targetPort: 5672
  type: LoadBalancer

Resources and Further Learning

Official Documentation

Client Libraries

Books and Courses

  • “RabbitMQ in Action” by Alvaro Videla and Jason J. W. Williams
  • “Learning RabbitMQ” by Lovisa Johansson
  • “Distributed Systems with RabbitMQ” - Online course on Udemy
  • “Message Queue Patterns” - Enterprise messaging patterns

Tools and Utilities

  • RabbitMQ Management UI: Built-in web interface
  • rabbitmqadmin: Command-line management tool
  • rabbitmqctl: Command-line broker control
  • PerfTest: Performance testing tool
  • RabbitMQ Cluster Management: github.com/rabbitmq/rabbitmq-cluster-management

Monitoring Solutions

Advanced Topics

Community and Support

Example Projects

Performance Tuning Guides

Quick Reference Cheat Sheet

Basic Operations

# Start RabbitMQ
sudo systemctl start rabbitmq-server

# Stop RabbitMQ
sudo systemctl stop rabbitmq-server

# Check status
sudo systemctl status rabbitmq-server

# List exchanges
rabbitmqadmin list exchanges

# List queues
rabbitmqadmin list queues

# Purge queue
rabbitmqadmin purge queue_name

# Delete queue
rabbitmqadmin delete queue queue_name

Node.js Client Basics

// Connect
const connection = await amqp.connect('amqp://localhost:5672')
const channel = await connection.createChannel()

// Declare exchange
await channel.assertExchange('events', 'topic', { durable: true })

// Declare queue
await channel.assertQueue('my_queue', { durable: true })

// Bind queue
await channel.bindQueue('my_queue', 'events', 'routing.key')

// Publish
channel.publish('events', 'routing.key', Buffer.from('message'))

// Consume
await channel.consume('my_queue', (msg) => {
  console.log(msg.content.toString())
  channel.ack(msg)
})

Conclusion

Congratulations! You’ve journeyed from RabbitMQ basics to building production-ready, scalable messaging systems.

Key Takeaways:

  1. Message Queues are Essential: Modern distributed systems need reliable asynchronous communication
  2. RabbitMQ is Powerful: Rich routing, clustering, and management capabilities
  3. System Design Matters: Choose the right pattern for your use case
  4. Reliability is Critical: Implement proper error handling, retries, and monitoring
  5. Performance Requires Thought: Connection pooling, batching, and optimization

Your Next Steps:

  1. Set up a local RabbitMQ instance using Docker
  2. Build a simple producer/consumer application
  3. Implement advanced patterns like work queues and pub/sub
  4. Add monitoring and error handling
  5. Deploy to production with proper configuration

RabbitMQ provides the foundation for building robust, scalable distributed systems. The patterns and practices covered here will help you design systems that can handle millions of messages reliably. Now go build amazing messaging architectures! 🚀


Last Updated: December 4, 2025 RabbitMQ Version: 3.12 amqplib Version: 0.10.3 This tutorial covers both basic concepts and production-ready implementations