How RabbitMQ is Used in This Project
Overall Flow
POST /shopping-carts/{id}/checkout
│
├── 1. Validate credit card format
├── 2. POST /credit-card-authorizer/authorize (via ALB)
│ ├── 402 → return 402, stop
│ └── 200 → continue
├── 3. basicPublish → order_queue (with Publisher Confirms)
└── 4. return 200 { order_id }
│
│ (async)
▼
Warehouse Consumer (10 threads)
└── basicConsume → process order → basicAck
Shopping Cart: Publishing a Message
File: services/shopping-cart-service/.../RabbitMQPublisher.java
public void publishOrder(int orderId, int shoppingCartId, List<CartItem> items) throws Exception {
// 1. Build the message body (JSON)
Map<String, Object> message = new HashMap<>();
message.put("orderId", orderId);
message.put("shoppingCartId", shoppingCartId);
message.put("items", items.stream()
.map(item -> Map.of("productId", item.getProductId(), "quantity", item.getQuantity()))
.toList());
byte[] body = objectMapper.writeValueAsBytes(message);
// 2. Borrow a Channel from the pool
Channel channel = channelPool.borrowObject();
try {
// 3. Publish (PERSISTENT = message survives a RabbitMQ restart)
channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, body);
// 4. Wait for RabbitMQ to confirm the message was written (max 5s)
channel.waitForConfirmsOrDie(5000);
} finally {
// 5. Return Channel to the pool
channelPool.returnObject(channel);
}
}Message format (JSON):
{
"orderId": 42,
"shoppingCartId": 7,
"items": [
{ "productId": 1, "quantity": 2 }
]
}Warehouse Consumer: Consuming Messages
File: services/warehouse-consumer/.../WarehouseConsumer.java
// Start 10 threads, each independently consuming from the queue
for (int i = 0; i < 10; i++) {
pool.submit(() -> {
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
channel.basicQos(10); // prefetch: receive at most 10 unacked messages at a time
DeliverCallback deliverCallback = (tag, delivery) -> {
try {
JsonNode root = objectMapper.readTree(delivery.getBody());
for (JsonNode item : root.get("items")) {
int productId = item.get("productId").asInt();
int quantity = item.get("quantity").asInt();
// Update in-memory inventory
productQuantities
.computeIfAbsent(productId, k -> new AtomicLong(0))
.addAndGet(quantity);
}
orderCount.incrementAndGet();
// Success → tell RabbitMQ this message is done, safe to delete
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
} catch (Exception e) {
// Failure → put the message back in the queue for retry
channel.basicNack(deliveryTag, false, true); // requeue=true
}
};
channel.basicConsume(QUEUE_NAME, false, deliverCallback, ...);
// ↑
// autoAck=false (manual ACK)
});
}Queue Configuration
channel.queueDeclare(
"order_queue", // queue name
true, // durable — queue survives a RabbitMQ restart
false, // exclusive — multiple consumers allowed
false, // autoDelete — do not delete when no consumers
null
);| Parameter | Value | Meaning |
|---|---|---|
| durable | true | Queue persists after RabbitMQ restart |
| exclusive | false | Multiple consumers can connect |
| autoDelete | false | Queue is not deleted when empty |