How RabbitMQ is Used in This Project


Overall Flow

POST /shopping-carts/{id}/checkout
        │
        ├── 1. Validate credit card format
        ├── 2. POST /credit-card-authorizer/authorize  (via ALB)
        │         ├── 402 → return 402, stop
        │         └── 200 → continue
        ├── 3. basicPublish → order_queue  (with Publisher Confirms)
        └── 4. return 200 { order_id }
                                │
                                │ (async)
                                ▼
                    Warehouse Consumer (10 threads)
                        └── basicConsume → process order → basicAck

Shopping Cart: Publishing a Message

File: services/shopping-cart-service/.../RabbitMQPublisher.java

public void publishOrder(int orderId, int shoppingCartId, List<CartItem> items) throws Exception {
    // 1. Build the message body (JSON)
    Map<String, Object> message = new HashMap<>();
    message.put("orderId", orderId);
    message.put("shoppingCartId", shoppingCartId);
    message.put("items", items.stream()
            .map(item -> Map.of("productId", item.getProductId(), "quantity", item.getQuantity()))
            .toList());
 
    byte[] body = objectMapper.writeValueAsBytes(message);
 
    // 2. Borrow a Channel from the pool
    Channel channel = channelPool.borrowObject();
    try {
        // 3. Publish (PERSISTENT = message survives a RabbitMQ restart)
        channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, body);
 
        // 4. Wait for RabbitMQ to confirm the message was written (max 5s)
        channel.waitForConfirmsOrDie(5000);
    } finally {
        // 5. Return Channel to the pool
        channelPool.returnObject(channel);
    }
}

Message format (JSON):

{
  "orderId": 42,
  "shoppingCartId": 7,
  "items": [
    { "productId": 1, "quantity": 2 }
  ]
}

Warehouse Consumer: Consuming Messages

File: services/warehouse-consumer/.../WarehouseConsumer.java

// Start 10 threads, each independently consuming from the queue
for (int i = 0; i < 10; i++) {
    pool.submit(() -> {
        Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME, true, false, false, null);
        channel.basicQos(10);  // prefetch: receive at most 10 unacked messages at a time
 
        DeliverCallback deliverCallback = (tag, delivery) -> {
            try {
                JsonNode root = objectMapper.readTree(delivery.getBody());
                for (JsonNode item : root.get("items")) {
                    int productId = item.get("productId").asInt();
                    int quantity  = item.get("quantity").asInt();
                    // Update in-memory inventory
                    productQuantities
                        .computeIfAbsent(productId, k -> new AtomicLong(0))
                        .addAndGet(quantity);
                }
                orderCount.incrementAndGet();
 
                // Success → tell RabbitMQ this message is done, safe to delete
                channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
 
            } catch (Exception e) {
                // Failure → put the message back in the queue for retry
                channel.basicNack(deliveryTag, false, true);  // requeue=true
            }
        };
 
        channel.basicConsume(QUEUE_NAME, false, deliverCallback, ...);
        //                              ↑
        //                         autoAck=false  (manual ACK)
    });
}

Queue Configuration

channel.queueDeclare(
    "order_queue",  // queue name
    true,           // durable  — queue survives a RabbitMQ restart
    false,          // exclusive — multiple consumers allowed
    false,          // autoDelete — do not delete when no consumers
    null
);
ParameterValueMeaning
durabletrueQueue persists after RabbitMQ restart
exclusivefalseMultiple consumers can connect
autoDeletefalseQueue is not deleted when empty