DEV Community

Cover image for Building a Microservices Order System with Gin, RabbitMQ, and PostgreSQL
CodeFalconX
CodeFalconX

Posted on

Building a Microservices Order System with Gin, RabbitMQ, and PostgreSQL

Table of Contents

Architecture Overview

We'll build 4 independent services:

  • Order Service: Creates orders and publishes events
  • Inventory Service: Updates stock levels
  • Billing Service: Generates invoices and processes payments
  • Notification Service: Sends email/SMS alerts

Each service has its own PostgreSQL database and communicates via RabbitMQ using a fanout exchange pattern.

Project Structure

order-system/
├── order-service/
│   └── main.go
├── inventory-service/
│   └── main.go
├── billing-service/
│   └── main.go
├── notification-service/
│   └── main.go
├── shared/
│   ├── events/
│   │   └── events.go
│   └── rabbitmq/
│       └── rabbitmq.go
└── docker-compose.yml
Enter fullscreen mode Exit fullscreen mode

Step 1: Docker Compose Setup

Create docker-compose.yml:

# version: '3.8'

services:
  # RabbitMQ
  rabbitmq:
    image: rabbitmq:3-management
    container_name: rabbitmq
    ports:
      - "5672:5672"
      - "15672:15672"
    environment:
      RABBITMQ_DEFAULT_USER: guest
      RABBITMQ_DEFAULT_PASS: guest
    healthcheck:
      test: rabbitmq-diagnostics -q ping
      interval: 30s
      timeout: 10s
      retries: 5

  # PostgreSQL for Order Service
  order-db:
    image: postgres:15
    container_name: order-db
    environment:
      POSTGRES_DB: order_db
      POSTGRES_USER: postgres
      POSTGRES_PASSWORD: postgres
    ports:
      - "5436:5432"
    volumes:
      - order-data:/var/lib/postgresql/data

  # PostgreSQL for Inventory Service
  inventory-db:
    image: postgres:15
    container_name: inventory-db
    environment:
      POSTGRES_DB: inventory_db
      POSTGRES_USER: postgres
      POSTGRES_PASSWORD: postgres
    ports:
      - "5437:5432"
    volumes:
      - inventory-data:/var/lib/postgresql/data

  # PostgreSQL for Billing Service
  billing-db:
    image: postgres:15
    container_name: billing-db
    environment:
      POSTGRES_DB: billing_db
      POSTGRES_USER: postgres
      POSTGRES_PASSWORD: postgres
    ports:
      - "5438:5432"
    volumes:
      - billing-data:/var/lib/postgresql/data

  # PostgreSQL for Notification Service
  notification-db:
    image: postgres:15
    container_name: notification-db
    environment:
      POSTGRES_DB: notification_db
      POSTGRES_USER: postgres
      POSTGRES_PASSWORD: postgres
    ports:
      - "5439:5432"
    volumes:
      - notification-data:/var/lib/postgresql/data

volumes:
  order-data:
  inventory-data:
  billing-data:
  notification-data:
Enter fullscreen mode Exit fullscreen mode

Step 2: Shared Code

shared/events/events.go

package events

import "time"

// EventType represents the type of event
type EventType string

const (
    OrderCreated EventType = "order.created"
    OrderUpdated EventType = "order.updated"
    OrderCancelled EventType = "order.cancelled"
)

// OrderCreatedEvent represents an order creation event
type OrderCreatedEvent struct {
    EventID     string    `json:"event_id"`
    EventType   EventType `json:"event_type"`
    Timestamp   time.Time `json:"timestamp"`
    OrderID     string    `json:"order_id"`
    CustomerID  string    `json:"customer_id"`
    Email       string    `json:"email"`
    Phone       string    `json:"phone"`
    TotalAmount float64   `json:"total_amount"`
    Items       []OrderItem `json:"items"`
}

// OrderItem represents an item in the order
type OrderItem struct {
    ProductID string  `json:"product_id"`
    Name      string  `json:"name"`
    Quantity  int     `json:"quantity"`
    Price     float64 `json:"price"`
}
Enter fullscreen mode Exit fullscreen mode

shared/rabbitmq/rabbitmq.go

package rabbitmq

import (
    "encoding/json"
    "fmt"
    "log"

    amqp "github.com/rabbitmq/amqp091-go"
)

type RabbitMQ struct {
    conn    *amqp.Connection
    channel *amqp.Channel
}

func NewRabbitMQ(url string) (*RabbitMQ, error) {
    conn, err := amqp.Dial(url)
    if err != nil {
        return nil, fmt.Errorf("failed to connect to RabbitMQ: %w", err)
    }

    ch, err := conn.Channel()
    if err != nil {
        conn.Close()
        return nil, fmt.Errorf("failed to open channel: %w", err)
    }

    return &RabbitMQ{
        conn:    conn,
        channel: ch,
    }, nil
}

func (r *RabbitMQ) DeclareExchange(name, kind string) error {
    return r.channel.ExchangeDeclare(
        name,
        kind,
        true,  // durable
        false, // auto-deleted
        false, // internal
        false, // no-wait
        nil,   // arguments
    )
}

func (r *RabbitMQ) DeclareQueue(name string) (amqp.Queue, error) {
    return r.channel.QueueDeclare(
        name,
        true,  // durable
        false, // delete when unused
        false, // exclusive
        false, // no-wait
        nil,   // arguments
    )
}

func (r *RabbitMQ) BindQueue(queueName, exchangeName, routingKey string) error {
    return r.channel.QueueBind(
        queueName,
        routingKey,
        exchangeName,
        false, // no-wait
        nil,   // arguments
    )
}

func (r *RabbitMQ) Publish(exchange, routingKey string, body interface{}) error {
    jsonBody, err := json.Marshal(body)
    if err != nil {
        return fmt.Errorf("failed to marshal message: %w", err)
    }

    err = r.channel.Publish(
        exchange,
        routingKey,
        false, // mandatory
        false, // immediate
        amqp.Publishing{
            ContentType: "application/json",
            Body:        jsonBody,
        },
    )
    if err != nil {
        return fmt.Errorf("failed to publish message: %w", err)
    }

    log.Printf("Published message to exchange: %s, routing key: %s", exchange, routingKey)
    return nil
}

func (r *RabbitMQ) Consume(queueName string) (<-chan amqp.Delivery, error) {
    msgs, err := r.channel.Consume(
        queueName,
        "",    // consumer
        false, // auto-ack (manual ack for reliability)
        false, // exclusive
        false, // no-local
        false, // no-wait
        nil,   // args
    )
    if err != nil {
        return nil, fmt.Errorf("failed to register consumer: %w", err)
    }

    return msgs, nil
}

func (r *RabbitMQ) Close() {
    if r.channel != nil {
        r.channel.Close()
    }
    if r.conn != nil {
        r.conn.Close()
    }
}
Enter fullscreen mode Exit fullscreen mode

Step 3: Order Service

order-service/main.go

package main

import (
    "log"
    "net/http"
    "time"

    "github.com/gin-gonic/gin"
    "github.com/google/uuid"
    "gorm.io/driver/postgres"
    "gorm.io/gorm"

    "your-module/shared/events"
    "your-module/shared/rabbitmq"
)

type Order struct {
    ID          string    `gorm:"primaryKey" json:"id"`
    CustomerID  string    `json:"customer_id"`
    Email       string    `json:"email"`
    Phone       string    `json:"phone"`
    TotalAmount float64   `json:"total_amount"`
    Status      string    `json:"status"`
    CreatedAt   time.Time `json:"created_at"`
}

type OrderItem struct {
    ID        uint    `gorm:"primaryKey" json:"id"`
    OrderID   string  `json:"order_id"`
    ProductID string  `json:"product_id"`
    Name      string  `json:"name"`
    Quantity  int     `json:"quantity"`
    Price     float64 `json:"price"`
}

type CreateOrderRequest struct {
    CustomerID string                `json:"customer_id" binding:"required"`
    Email      string                `json:"email" binding:"required,email"`
    Phone      string                `json:"phone" binding:"required"`
    Items      []events.OrderItem    `json:"items" binding:"required,min=1"`
}

var (
    db  *gorm.DB
    mq  *rabbitmq.RabbitMQ
)

func main() {
    // Initialize database
    var err error
    dsn := "host=localhost user=postgres password=postgres dbname=order_db port=5432 sslmode=disable"
    db, err = gorm.Open(postgres.Open(dsn), &gorm.Config{})
    if err != nil {
        log.Fatal("Failed to connect to database:", err)
    }

    // Auto migrate
    db.AutoMigrate(&Order{}, &OrderItem{})

    // Initialize RabbitMQ
    mq, err = rabbitmq.NewRabbitMQ("amqp://guest:guest@localhost:5672/")
    if err != nil {
        log.Fatal("Failed to connect to RabbitMQ:", err)
    }
    defer mq.Close()

    // Declare exchange
    err = mq.DeclareExchange("order_events", "fanout")
    if err != nil {
        log.Fatal("Failed to declare exchange:", err)
    }

    // Initialize Gin
    r := gin.Default()

    // Routes
    r.POST("/orders", createOrder)
    r.GET("/orders/:id", getOrder)
    r.GET("/orders", listOrders)

    log.Println("Order Service running on :8080")
    r.Run(":8080")
}

func createOrder(c *gin.Context) {
    var req CreateOrderRequest
    if err := c.ShouldBindJSON(&req); err != nil {
        c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
        return
    }

    // Calculate total amount
    var totalAmount float64
    for _, item := range req.Items {
        totalAmount += item.Price * float64(item.Quantity)
    }

    // Create order
    order := Order{
        ID:          uuid.New().String(),
        CustomerID:  req.CustomerID,
        Email:       req.Email,
        Phone:       req.Phone,
        TotalAmount: totalAmount,
        Status:      "pending",
        CreatedAt:   time.Now(),
    }

    if err := db.Create(&order).Error; err != nil {
        c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to create order"})
        return
    }

    // Create order items
    for _, item := range req.Items {
        orderItem := OrderItem{
            OrderID:   order.ID,
            ProductID: item.ProductID,
            Name:      item.Name,
            Quantity:  item.Quantity,
            Price:     item.Price,
        }
        db.Create(&orderItem)
    }

    // Publish event
    event := events.OrderCreatedEvent{
        EventID:     uuid.New().String(),
        EventType:   events.OrderCreated,
        Timestamp:   time.Now(),
        OrderID:     order.ID,
        CustomerID:  order.CustomerID,
        Email:       order.Email,
        Phone:       order.Phone,
        TotalAmount: order.TotalAmount,
        Items:       req.Items,
    }

    if err := mq.Publish("order_events", "", event); err != nil {
        log.Printf("Failed to publish event: %v", err)
    }

    c.JSON(http.StatusCreated, order)
}

func getOrder(c *gin.Context) {
    id := c.Param("id")
    var order Order
    if err := db.First(&order, "id = ?", id).Error; err != nil {
        c.JSON(http.StatusNotFound, gin.H{"error": "Order not found"})
        return
    }

    var items []OrderItem
    db.Where("order_id = ?", id).Find(&items)

    c.JSON(http.StatusOK, gin.H{
        "order": order,
        "items": items,
    })
}

func listOrders(c *gin.Context) {
    var orders []Order
    db.Order("created_at desc").Find(&orders)
    c.JSON(http.StatusOK, orders)
}
Enter fullscreen mode Exit fullscreen mode

Step 4: Inventory Service

inventory-service/main.go

package main

import (
    "encoding/json"
    "log"
    "net/http"

    "github.com/gin-gonic/gin"
    "gorm.io/driver/postgres"
    "gorm.io/gorm"

    "your-module/shared/events"
    "your-module/shared/rabbitmq"
)

type Product struct {
    ID       string `gorm:"primaryKey" json:"id"`
    Name     string `json:"name"`
    Stock    int    `json:"stock"`
    Reserved int    `json:"reserved"`
}

type StockHistory struct {
    ID        uint   `gorm:"primaryKey" json:"id"`
    OrderID   string `json:"order_id"`
    ProductID string `json:"product_id"`
    Quantity  int    `json:"quantity"`
    Action    string `json:"action"`
}

var db *gorm.DB

func main() {
    // Initialize database
    var err error
    dsn := "host=localhost user=postgres password=postgres dbname=inventory_db port=5433 sslmode=disable"
    db, err = gorm.Open(postgres.Open(dsn), &gorm.Config{})
    if err != nil {
        log.Fatal("Failed to connect to database:", err)
    }

    // Auto migrate
    db.AutoMigrate(&Product{}, &StockHistory{})

    // Seed initial products
    seedProducts()

    // Initialize RabbitMQ
    mq, err := rabbitmq.NewRabbitMQ("amqp://guest:guest@localhost:5672/")
    if err != nil {
        log.Fatal("Failed to connect to RabbitMQ:", err)
    }
    defer mq.Close()

    // Declare and bind queue
    queue, err := mq.DeclareQueue("inventory_queue")
    if err != nil {
        log.Fatal("Failed to declare queue:", err)
    }

    err = mq.BindQueue(queue.Name, "order_events", "")
    if err != nil {
        log.Fatal("Failed to bind queue:", err)
    }

    // Start consuming messages
    go consumeOrderEvents(mq)

    // Initialize Gin
    r := gin.Default()

    // Routes
    r.GET("/products", listProducts)
    r.GET("/products/:id", getProduct)
    r.POST("/products", createProduct)
    r.PUT("/products/:id/stock", updateStock)

    log.Println("Inventory Service running on :8081")
    r.Run(":8081")
}

func seedProducts() {
    products := []Product{
        {ID: "prod-1", Name: "Laptop", Stock: 50, Reserved: 0},
        {ID: "prod-2", Name: "Mouse", Stock: 200, Reserved: 0},
        {ID: "prod-3", Name: "Keyboard", Stock: 150, Reserved: 0},
    }

    for _, p := range products {
        db.FirstOrCreate(&p, Product{ID: p.ID})
    }
}

func consumeOrderEvents(mq *rabbitmq.RabbitMQ) {
    msgs, err := mq.Consume("inventory_queue")
    if err != nil {
        log.Fatal("Failed to consume messages:", err)
    }

    log.Println("Waiting for order events...")

    for msg := range msgs {
        var event events.OrderCreatedEvent
        if err := json.Unmarshal(msg.Body, &event); err != nil {
            log.Printf("Error unmarshaling message: %v", err)
            msg.Nack(false, false)
            continue
        }

        log.Printf("Processing order event: %s", event.OrderID)

        // Reserve stock for each item
        tx := db.Begin()
        success := true

        for _, item := range event.Items {
            var product Product
            if err := tx.First(&product, "id = ?", item.ProductID).Error; err != nil {
                log.Printf("Product not found: %s", item.ProductID)
                success = false
                break
            }

            if product.Stock-product.Reserved < item.Quantity {
                log.Printf("Insufficient stock for product: %s", item.ProductID)
                success = false
                break
            }

            // Update reserved stock
            product.Reserved += item.Quantity
            tx.Save(&product)

            // Record history
            history := StockHistory{
                OrderID:   event.OrderID,
                ProductID: item.ProductID,
                Quantity:  item.Quantity,
                Action:    "reserved",
            }
            tx.Create(&history)
        }

        if success {
            tx.Commit()
            log.Printf("Successfully reserved stock for order: %s", event.OrderID)
            msg.Ack(false)
        } else {
            tx.Rollback()
            log.Printf("Failed to reserve stock for order: %s", event.OrderID)
            msg.Nack(false, true)
        }
    }
}

func listProducts(c *gin.Context) {
    var products []Product
    db.Find(&products)
    c.JSON(http.StatusOK, products)
}

func getProduct(c *gin.Context) {
    id := c.Param("id")
    var product Product
    if err := db.First(&product, "id = ?", id).Error; err != nil {
        c.JSON(http.StatusNotFound, gin.H{"error": "Product not found"})
        return
    }
    c.JSON(http.StatusOK, product)
}

func createProduct(c *gin.Context) {
    var product Product
    if err := c.ShouldBindJSON(&product); err != nil {
        c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
        return
    }

    if err := db.Create(&product).Error; err != nil {
        c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to create product"})
        return
    }

    c.JSON(http.StatusCreated, product)
}

func updateStock(c *gin.Context) {
    id := c.Param("id")
    var req struct {
        Stock int `json:"stock" binding:"required"`
    }

    if err := c.ShouldBindJSON(&req); err != nil {
        c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
        return
    }

    var product Product
    if err := db.First(&product, "id = ?", id).Error; err != nil {
        c.JSON(http.StatusNotFound, gin.H{"error": "Product not found"})
        return
    }

    product.Stock = req.Stock
    db.Save(&product)

    c.JSON(http.StatusOK, product)
}
Enter fullscreen mode Exit fullscreen mode

Step 5: Billing Service

billing-service/main.go

package main

import (
    "encoding/json"
    "log"
    "net/http"
    "time"

    "github.com/gin-gonic/gin"
    "github.com/google/uuid"
    "gorm.io/driver/postgres"
    "gorm.io/gorm"

    "your-module/shared/events"
    "your-module/shared/rabbitmq"
)

type Invoice struct {
    ID          string    `gorm:"primaryKey" json:"id"`
    OrderID     string    `json:"order_id"`
    CustomerID  string    `json:"customer_id"`
    Amount      float64   `json:"amount"`
    Status      string    `json:"status"` // pending, paid, failed
    CreatedAt   time.Time `json:"created_at"`
    PaidAt      *time.Time `json:"paid_at,omitempty"`
}

type Payment struct {
    ID            string    `gorm:"primaryKey" json:"id"`
    InvoiceID     string    `json:"invoice_id"`
    Amount        float64   `json:"amount"`
    PaymentMethod string    `json:"payment_method"`
    Status        string    `json:"status"`
    CreatedAt     time.Time `json:"created_at"`
}

var db *gorm.DB

func main() {
    // Initialize database
    var err error
    dsn := "host=localhost user=postgres password=postgres dbname=billing_db port=5434 sslmode=disable"
    db, err = gorm.Open(postgres.Open(dsn), &gorm.Config{})
    if err != nil {
        log.Fatal("Failed to connect to database:", err)
    }

    // Auto migrate
    db.AutoMigrate(&Invoice{}, &Payment{})

    // Initialize RabbitMQ
    mq, err := rabbitmq.NewRabbitMQ("amqp://guest:guest@localhost:5672/")
    if err != nil {
        log.Fatal("Failed to connect to RabbitMQ:", err)
    }
    defer mq.Close()

    // Declare and bind queue
    queue, err := mq.DeclareQueue("billing_queue")
    if err != nil {
        log.Fatal("Failed to declare queue:", err)
    }

    err = mq.BindQueue(queue.Name, "order_events", "")
    if err != nil {
        log.Fatal("Failed to bind queue:", err)
    }

    // Start consuming messages
    go consumeOrderEvents(mq)

    // Initialize Gin
    r := gin.Default()

    // Routes
    r.GET("/invoices", listInvoices)
    r.GET("/invoices/:id", getInvoice)
    r.POST("/invoices/:id/pay", processPayment)

    log.Println("Billing Service running on :8082")
    r.Run(":8082")
}

func consumeOrderEvents(mq *rabbitmq.RabbitMQ) {
    msgs, err := mq.Consume("billing_queue")
    if err != nil {
        log.Fatal("Failed to consume messages:", err)
    }

    log.Println("Waiting for order events...")

    for msg := range msgs {
        var event events.OrderCreatedEvent
        if err := json.Unmarshal(msg.Body, &event); err != nil {
            log.Printf("Error unmarshaling message: %v", err)
            msg.Nack(false, false)
            continue
        }

        log.Printf("Processing order event: %s", event.OrderID)

        // Create invoice
        invoice := Invoice{
            ID:         uuid.New().String(),
            OrderID:    event.OrderID,
            CustomerID: event.CustomerID,
            Amount:     event.TotalAmount,
            Status:     "pending",
            CreatedAt:  time.Now(),
        }

        if err := db.Create(&invoice).Error; err != nil {
            log.Printf("Failed to create invoice: %v", err)
            msg.Nack(false, true)
            continue
        }

        log.Printf("Invoice created: %s for order: %s", invoice.ID, event.OrderID)
        msg.Ack(false)
    }
}

func listInvoices(c *gin.Context) {
    var invoices []Invoice
    db.Order("created_at desc").Find(&invoices)
    c.JSON(http.StatusOK, invoices)
}

func getInvoice(c *gin.Context) {
    id := c.Param("id")
    var invoice Invoice
    if err := db.First(&invoice, "id = ?", id).Error; err != nil {
        c.JSON(http.StatusNotFound, gin.H{"error": "Invoice not found"})
        return
    }

    var payments []Payment
    db.Where("invoice_id = ?", id).Find(&payments)

    c.JSON(http.StatusOK, gin.H{
        "invoice":  invoice,
        "payments": payments,
    })
}

func processPayment(c *gin.Context) {
    id := c.Param("id")
    var req struct {
        PaymentMethod string `json:"payment_method" binding:"required"`
    }

    if err := c.ShouldBindJSON(&req); err != nil {
        c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
        return
    }

    var invoice Invoice
    if err := db.First(&invoice, "id = ?", id).Error; err != nil {
        c.JSON(http.StatusNotFound, gin.H{"error": "Invoice not found"})
        return
    }

    if invoice.Status == "paid" {
        c.JSON(http.StatusBadRequest, gin.H{"error": "Invoice already paid"})
        return
    }

    // Simulate payment processing
    payment := Payment{
        ID:            uuid.New().String(),
        InvoiceID:     invoice.ID,
        Amount:        invoice.Amount,
        PaymentMethod: req.PaymentMethod,
        Status:        "completed",
        CreatedAt:     time.Now(),
    }

    tx := db.Begin()
    if err := tx.Create(&payment).Error; err != nil {
        tx.Rollback()
        c.JSON(http.StatusInternalServerError, gin.H{"error": "Payment processing failed"})
        return
    }

    now := time.Now()
    invoice.Status = "paid"
    invoice.PaidAt = &now
    if err := tx.Save(&invoice).Error; err != nil {
        tx.Rollback()
        c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to update invoice"})
        return
    }

    tx.Commit()

    log.Printf("Payment processed for invoice: %s", invoice.ID)
    c.JSON(http.StatusOK, gin.H{
        "invoice": invoice,
        "payment": payment,
    })
}
Enter fullscreen mode Exit fullscreen mode

Step 6: Notification Service

notification-service/main.go

package main

import (
    "encoding/json"
    "fmt"
    "log"
    "net/http"
    "time"

    "github.com/gin-gonic/gin"
    "github.com/google/uuid"
    "gorm.io/driver/postgres"
    "gorm.io/gorm"

    "your-module/shared/events"
    "your-module/shared/rabbitmq"
)

type Notification struct {
    ID         string    `gorm:"primaryKey" json:"id"`
    OrderID    string    `json:"order_id"`
    CustomerID string    `json:"customer_id"`
    Type       string    `json:"type"` // email, sms
    Recipient  string    `json:"recipient"`
    Subject    string    `json:"subject"`
    Body       string    `json:"body"`
    Status     string    `json:"status"` // pending, sent, failed
    CreatedAt  time.Time `json:"created_at"`
    SentAt     *time.Time `json:"sent_at,omitempty"`
}

var db *gorm.DB

func main() {
    // Initialize database
    var err error
    dsn := "host=localhost user=postgres password=postgres dbname=notification_db port=5435 sslmode=disable"
    db, err = gorm.Open(postgres.Open(dsn), &gorm.Config{})
    if err != nil {
        log.Fatal("Failed to connect to database:", err)
    }

    // Auto migrate
    db.AutoMigrate(&Notification{})

    // Initialize RabbitMQ
    mq, err := rabbitmq.NewRabbitMQ("amqp://guest:guest@localhost:5672/")
    if err != nil {
        log.Fatal("Failed to connect to RabbitMQ:", err)
    }
    defer mq.Close()

    // Declare and bind queue
    queue, err := mq.DeclareQueue("notification_queue")
    if err != nil {
        log.Fatal("Failed to declare queue:", err)
    }

    err = mq.BindQueue(queue.Name, "order_events", "")
    if err != nil {
        log.Fatal("Failed to bind queue:", err)
    }

    // Start consuming messages
    go consumeOrderEvents(mq)

    // Initialize Gin
    r := gin.Default()

    // Routes
    r.GET("/notifications", listNotifications)
    r.GET("/notifications/:id", getNotification)

    log.Println("Notification Service running on :8083")
    r.Run(":8083")
}

func consumeOrderEvents(mq *rabbitmq.RabbitMQ) {
    msgs, err := mq.Consume("notification_queue")
    if err != nil {
        log.Fatal("Failed to consume messages:", err)
    }

    log.Println("Waiting for order events...")

    for msg := range msgs {
        var event events.OrderCreatedEvent
        if err := json.Unmarshal(msg.Body, &event); err != nil {
            log.Printf("Error unmarshaling message: %v", err)
            msg.Nack(false, false)
            continue
        }

        log.Printf("Processing order event: %s", event.OrderID)

        // Send email notification
        emailNotification := Notification{
            ID:         uuid.New().String(),
            OrderID:    event.OrderID,
            CustomerID: event.CustomerID,
            Type:       "email",
            Recipient:  event.Email,
            Subject:    "Order Confirmation",
            Body:       formatEmailBody(event),
            Status:     "pending",
            CreatedAt:  time.Now(),
        }

        if err := db.Create(&emailNotification).Error; err != nil {
            log.Printf("Failed to create email notification: %v", err)
        } else {
            sendEmail(emailNotification.ID)
        }

        // Send SMS notification
        smsNotification := Notification{
            ID:         uuid.New().String(),
            OrderID:    event.OrderID,
            CustomerID: event.CustomerID,
            Type:       "sms",
            Recipient:  event.Phone,
            Subject:    "",
            Body:       formatSMSBody(event),
            Status:     "pending",
            CreatedAt:  time.Now(),
        }

        if err := db.Create(&smsNotification).Error; err != nil {
            log.Printf("Failed to create SMS notification: %v", err)
        } else {
            sendSMS(smsNotification.ID)
        }

        msg.Ack(false)
    }
}

func formatEmailBody(event events.OrderCreatedEvent) string {
    body := fmt.Sprintf(
        "Dear Customer,\n\n"+
            "Thank you for your order!\n\n"+
            "Order ID: %s\n"+
            "Total Amount: $%.2f\n\n"+
            "Items:\n",
        event.OrderID,
        event.TotalAmount,
    )

    for _, item := range event.Items {
        body += fmt.Sprintf("- %s (x%d) - $%.2f\n", item.Name, item.Quantity, item.Price)
    }

    body += "\nWe'll notify you when your order is shipped.\n\nBest regards,\nYour Store"
    return body
}

func formatSMSBody(event events.OrderCreatedEvent) string {
    return fmt.Sprintf(
        "Order confirmed! Order ID: %s. Total: $%.2f. Thank you for your purchase!",
        event.OrderID,
        event.TotalAmount,
    )
}

func sendEmail(notificationID string) {
    var notification Notification
    if err := db.First(&notification, "id = ?", notificationID).Error; err != nil {
        log.Printf("Notification not found: %v", err)
        return
    }

    // Simulate email sending
    log.Printf("Sending email to: %s", notification.Recipient)
    log.Printf("Subject: %s", notification.Subject)
    log.Printf("Body: %s", notification.Body)

    // Update status
    now := time.Now()
    notification.Status = "sent"
    notification.SentAt = &now
    db.Save(&notification)

    log.Printf("Email sent successfully: %s", notificationID)
}

func sendSMS(notificationID string) {
    var notification Notification
    if err := db.First(&notification, "id = ?", notificationID).Error; err != nil {
        log.Printf("Notification not found: %v", err)
        return
    }

    // Simulate SMS sending
    log.Printf("Sending SMS to: %s", notification.Recipient)
    log.Printf("Message: %s", notification.Body)

    // Update status
    now := time.Now()
    notification.Status = "sent"
    notification.SentAt = &now
    db.Save(&notification)

    log.Printf("SMS sent successfully: %s", notificationID)
}

func listNotifications(c *gin.Context) {
    var notifications []Notification
    db.Order("created_at desc").Find(&notifications)
    c.JSON(http.StatusOK, notifications)
}

func getNotification(c *gin.Context) {
    id := c.Param("id")
    var notification Notification
    if err := db.First(&notification, "id = ?", id).Error; err != nil {
        c.JSON(http.StatusNotFound, gin.H{"error": "Notification not found"})
        return
    }
    c.JSON(http.StatusOK, notification)
}
Enter fullscreen mode Exit fullscreen mode

Step 7: Setup Instructions

Prerequisites

  • Go 1.21+
  • Docker & Docker Compose
  • PostgreSQL client (optional)

1. Initialize Go Modules

For each service directory, run:

# Initialize module
go mod init your-module

# Install dependencies
go get -u github.com/gin-gonic/gin
go get -u gorm.io/gorm
go get -u gorm.io/driver/postgres
go get -u github.com/google/uuid
go get -u github.com/rabbitmq/amqp091-go
Enter fullscreen mode Exit fullscreen mode

2. Start Infrastructure

# Start RabbitMQ and PostgreSQL databases
docker-compose up -d

# Verify services are running
docker ps
Enter fullscreen mode Exit fullscreen mode

3. Access RabbitMQ Management UI

Open browser: http://localhost:15672

  • Username: guest
  • Password: guest

4. Run Services

Open 4 separate terminals:

# Terminal 1 - Order Service
cd order-service
go run main.go

# Terminal 2 - Inventory Service
cd inventory-service
go run main.go

# Terminal 3 - Billing Service
cd billing-service
go run main.go

# Terminal 4 - Notification Service
cd notification-service
go run main.go
Enter fullscreen mode Exit fullscreen mode

Testing the System

Create an Order

curl -X POST http://localhost:8080/orders \
  -H "Content-Type: application/json" \
  -d '{
    "customer_id": "cust-123",
    "email": "customer@example.com",
    "phone": "+1234567890",
    "items": [
      {
        "product_id": "prod-1",
        "name": "Laptop",
        "quantity": 2,
        "price": 999.99
      },
      {
        "product_id": "prod-2",
        "name": "Mouse",
        "quantity": 1,
        "price": 29.99
      }
    ]
  }'
Enter fullscreen mode Exit fullscreen mode

Check Order

curl http://localhost:8080/orders
Enter fullscreen mode Exit fullscreen mode

Check Inventory

curl http://localhost:8081/products
Enter fullscreen mode Exit fullscreen mode

Check Invoices

curl http://localhost:8082/invoices
Enter fullscreen mode Exit fullscreen mode

Check Notifications

curl http://localhost:8083/notifications
Enter fullscreen mode Exit fullscreen mode

Process Payment

# Replace {invoice_id} with actual ID
curl -X POST http://localhost:8082/invoices/{invoice_id}/pay \
  -H "Content-Type: application/json" \
  -d '{
    "payment_method": "credit_card"
  }'
Enter fullscreen mode Exit fullscreen mode

Architecture Flow

  1. Order Service receives POST request
  2. Order saved to order_db PostgreSQL database
  3. Order Created Event published to RabbitMQ order_events fanout exchange
  4. All three queues receive the event:
    • inventory_queue
    • billing_queue
    • notification_queue
  5. Each service processes independently:
    • Inventory Service: Reserves stock
    • Billing Service: Creates invoice
    • Notification Service: Sends email and SMS

Service Ports

  • Order Service: http://localhost:8080
  • Inventory Service: http://localhost:8081
  • Billing Service: http://localhost:8082
  • Notification Service: http://localhost:8083
  • RabbitMQ Management: http://localhost:15672

Database Ports

  • Order DB: localhost:5436
  • Inventory DB: localhost:5437
  • Billing DB: localhost:5438
  • Notification DB: localhost:5439

Key Features

Event-Driven Architecture: Decoupled services via RabbitMQ
Fanout Exchange: Broadcast events to multiple consumers
Database Per Service: Each service has its own PostgreSQL database
Manual Acknowledgment: Reliable message processing
RESTful APIs: Gin framework for HTTP endpoints
Transaction Management: GORM transactions for data consistency
Error Handling: Proper error handling and logging

Github: https://github.com/codefalconx/gin-rabbitmq-order-system

Reference:

Top comments (0)