Implementing Distributed Transactions in Microservices: SAGA Pattern with RabbitMQ and ASP.NET Core

25 minute read

Modern microservices architectures bring tremendous benefits in scalability and maintainability, but they also introduce a critical challenge: how do you maintain data consistency across multiple services and databases? Traditional ACID transactions work perfectly within a single database, but they break down when operations span multiple distributed services. Consider an e-commerce scenario where placing an order involves updating inventory, processing payment, and creating order records across different services—if any step fails after others succeed, you’re left with inconsistent data. The SAGA pattern solves this problem by coordinating distributed transactions through a sequence of local transactions, each with compensation logic to handle failures gracefully. In this comprehensive guide, you’ll learn how to implement the SAGA choreography pattern using RabbitMQ for event-driven messaging and ASP.NET Core for building resilient microservices that maintain data consistency even when things go wrong.

Understanding the SAGA Pattern

The SAGA pattern is a distributed transaction management approach that breaks down a complex business transaction into a sequence of smaller, local transactions. Unlike traditional ACID transactions that lock resources across multiple databases, SAGA ensures data consistency through a choreographed series of events and compensating actions.

How SAGA Works:

  1. Local Transactions: Each service performs its own local database transaction
  2. Event Publishing: After successful completion, the service publishes an event or message
  3. Chain Reaction: Other services listen for these events and execute their own transactions
  4. Compensation Logic: If any transaction fails, previously completed transactions are undone through compensating actions

Key Benefits:

  • No Distributed Locks: Avoids the complexity and performance issues of distributed locking
  • Fault Tolerance: System can recover gracefully from failures
  • Scalability: Each service manages its own data independently
  • Eventual Consistency: Data becomes consistent over time, not immediately

SAGA Implementation Approaches SAGA can be implemented using two distinct patterns, each with its own trade-offs:

Choreography Pattern Services communicate directly with each other through events, without central coordination. Each service knows what events to listen for and what events to publish next.

Fig - Choreography saga (Collected)

Characteristics:

  • Decentralized control
  • Services are loosely coupled
  • No single point of failure
  • More complex to track transaction state

Orchestration Pattern A central orchestrator service coordinates the entire transaction flow, telling each service what to do and when.

Fig - Orchestration saga (Collected)

Characteristics:

  • Centralized control and monitoring
  • Easier to track transaction progress
  • Single point of failure (the orchestrator)
  • Orchestrator can become a bottleneck

In this tutorial, we’ll implement the choreography pattern as it promotes better service autonomy and scalability in microservices architectures.

Implementation of Choreography Pattern

In this section, we’ll build a comprehensive e-commerce application demonstrating the SAGA choreography pattern with three microservices communicating through RabbitMQ to handle a complete order-to-payment flow.

Project Overview

We’ll implement a realistic e-commerce scenario with:

  • Order Service: Manages customer orders and orchestrates the overall flow
  • Catalog Service: Manages product inventory and stock reservations
  • Payment Service: Handles payment processing and transaction management

Enhanced Transaction Flow:

  1. Customer places order → Order Service creates order record
  2. Order Service publishes “OrderCreated” event → RabbitMQ
  3. Catalog Service receives event → Validates and reserves inventory
  4. Catalog Service publishes “InventoryReserved” event → RabbitMQ
  5. Payment Service receives event → Processes payment transaction
  6. Payment Service publishes “PaymentProcessed” event → RabbitMQ
  7. Order & Catalog Services receive payment result → Finalize or compensate

Compensation Scenarios:

  • Inventory Failure: Order cancelled immediately
  • Payment Failure: Inventory released, order cancelled
  • Service Timeout: Automatic compensation after timeout period

Prerequisites

  • Visual Studio 2026 or Visual Studio Code with C# extensions
  • .NET 10.0 (Latest LTS) or .NET 11.0 (Preview)
  • Docker Desktop (for RabbitMQ container)
  • Entity Framework Core 10.0+ with SQLite provider
  • SQLite database for simplicity

Step 1: Setup RabbitMQ with Docker

Start RabbitMQ in a Docker container:

docker run -d --hostname ecommerce-rabbit --name saga-rabbitmq -p 15672:15672 -p 5672:5672 rabbitmq:4-management

Verify RabbitMQ:

  • Open browser to http://localhost:15672
  • Login with username: guest, password: guest

Step 2: Create Solution Structure

Create the solution with proper project structure:

# Create solution
dotnet new sln -n EcommerceSaga

# Create projects with .NET 10 framework and traditional controllers
dotnet new webapi -n Order.API --use-controllers --use-program-main
dotnet new webapi -n Catalog.API --use-controllers --use-program-main
dotnet new webapi -n Payment.API --use-controllers --use-program-main
dotnet new classlib -n Shared

# Add projects to solution
dotnet sln add Order.API/Order.API.csproj
dotnet sln add Catalog.API/Catalog.API.csproj
dotnet sln add Payment.API/Payment.API.csproj
dotnet sln add Shared/Shared.csproj

Final structure:

EcommerceSaga/
├── EcommerceSaga.sln
├── Order.API/
├── Catalog.API/
├── Payment.API/
└── Shared/

This structure allows us to share common models and utilities in the Shared project while keeping each microservice isolated.

  • Run the following command to build the entire solution (if you have a .sln file):
    dotnet build EcommerceSaga.sln
    

Step 3: Create Shared Models

Add the following models in the Shared project:

Models/OrderCreatedEvent.cs

namespace Shared.Models
{
    public class OrderCreatedEvent
    {
        public int OrderId { get; set; }
        public int ProductId { get; set; }
        public string ProductName { get; set; }
        public int Quantity { get; set; }
        public decimal UnitPrice { get; set; }
        public decimal TotalAmount { get; set; }
        public string CustomerEmail { get; set; }
    }
}

Models/InventoryReservedEvent.cs

namespace Shared.Models
{
    public class InventoryReservedEvent
    {
        public int OrderId { get; set; }
        public int ProductId { get; set; }
        public bool IsSuccess { get; set; }
        public string Message { get; set; }
        public decimal TotalAmount { get; set; }
        public string CustomerEmail { get; set; }
        public int ReservedQuantity { get; set; }
    }
}

Models/PaymentProcessedEvent.cs

namespace Shared.Models
{
    public class PaymentProcessedEvent
    {
        public int OrderId { get; set; }
        public int ProductId { get; set; }
        public bool IsSuccess { get; set; }
        public string TransactionId { get; set; }
        public decimal Amount { get; set; }
        public string Message { get; set; }
        public string CustomerEmail { get; set; }
    }
}

Models/InventoryReleaseEvent.cs

namespace Shared.Models
{
    public class InventoryReleaseEvent
    {
        public int OrderId { get; set; }
        public int ProductId { get; set; }
        public int Quantity { get; set; }
        public string Reason { get; set; }
    }
}

Step 4: Build the Order Service

Install Required Packages

cd Order.API
dotnet add package Microsoft.EntityFrameworkCore.Sqlite
dotnet add package Microsoft.EntityFrameworkCore.Tools
dotnet add package Plain.RabbitMQ
dotnet add package Swashbuckle.AspNetCore
# Note: Using System.Text.Json (built-in) instead of Newtonsoft.Json
dotnet add reference ../Shared/Shared.csproj

Models/Order.cs

using System.ComponentModel.DataAnnotations;
using System.ComponentModel.DataAnnotations.Schema;

namespace Order.API.Models
{
    public class Order
    {
        [Key]
        [DatabaseGenerated(DatabaseGeneratedOption.Identity)]
        public int Id { get; set; }
        
        public int ProductId { get; set; }
        public required string ProductName { get; set; }
        public int Quantity { get; set; }
        
        [Column(TypeName = "decimal(18,2)")]
        public decimal UnitPrice { get; set; }
        
        [Column(TypeName = "decimal(18,2)")]
        public decimal TotalAmount { get; set; }
        
        public string Status { get; set; } = "Pending"; // Pending, InventoryReserved, PaymentProcessing, Confirmed, Cancelled
        public DateTime CreatedAt { get; set; } = DateTime.UtcNow;
        public DateTime? UpdatedAt { get; set; }
        public required string CustomerEmail { get; set; }
    }
}

Data/OrderDbContext.cs

using Microsoft.EntityFrameworkCore;
using Order.API.Models;

namespace Order.API.Data
{
    public class OrderDbContext : DbContext
    {
        public OrderDbContext(DbContextOptions<OrderDbContext> options) : base(options)
        {
        }

        public DbSet<Models.Order> Orders { get; set; }

        protected override void OnModelCreating(ModelBuilder modelBuilder)
        {
            modelBuilder.Entity<Models.Order>(entity =>
            {
                entity.HasIndex(e => e.ProductId);
                entity.Property(e => e.Status).HasMaxLength(20);
                entity.Property(e => e.ProductName).HasMaxLength(200);
            });
        }
    }
}

Services/InventoryResponseListener.cs

using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using System.Text.Json;
using Order.API.Data;
using Plain.RabbitMQ;
using Shared.Models;

namespace Order.API.Services
{
    public class InventoryResponseListener : IHostedService
    {
        private readonly ISubscriber _subscriber;
        private readonly IServiceScopeFactory _scopeFactory;
        private readonly ILogger<InventoryResponseListener> _logger;

        public InventoryResponseListener(
            ISubscriber subscriber,
            IServiceScopeFactory scopeFactory,
            ILogger<InventoryResponseListener> logger)
        {
            _subscriber = subscriber;
            _scopeFactory = scopeFactory;
            _logger = logger;
        }

        public Task StartAsync(CancellationToken cancellationToken)
        {
            _subscriber.Subscribe(ProcessInventoryResponse);
            _logger.LogInformation("Inventory Response Listener started");
            return Task.CompletedTask;
        }

        private bool ProcessInventoryResponse(string message, IDictionary<string, object> headers)
        {
            try
            {
                _logger.LogInformation($"Received inventory response: {message}");
                var response = JsonSerializer.Deserialize<InventoryReservedEvent>(message);

                if (response == null)
                {
                    _logger.LogError("Failed to deserialize inventory response");
                    return false;
                }

                using var scope = _scopeFactory.CreateScope();
                var context = scope.ServiceProvider.GetRequiredService<OrderDbContext>();

                var order = context.Orders.FirstOrDefault(o => o.Id == response.OrderId);
                if (order == null)
                {
                    _logger.LogWarning($"Order {response.OrderId} not found");
                    return true;
                }

                if (response.IsSuccess)
                {
                    // Move to next step - inventory reserved, waiting for payment
                    order.Status = "InventoryReserved";
                    order.UpdatedAt = DateTime.UtcNow;
                    _logger.LogInformation($"Order {order.Id} inventory reserved, awaiting payment processing");
                }
                else
                {
                    // Compensation: Cancel order due to inventory failure
                    order.Status = "Cancelled";
                    order.UpdatedAt = DateTime.UtcNow;
                    _logger.LogWarning($"Order {order.Id} cancelled due to inventory failure: {response.Message}");
                }

                context.SaveChanges();
                return true;
            }
            catch (Exception ex)
            {
                _logger.LogError(ex, "Error processing inventory response");
                return false;
            }
        }

        public Task StopAsync(CancellationToken cancellationToken)
        {
            _logger.LogInformation("Inventory Response Listener stopped");
            return Task.CompletedTask;
        }
    }
}


Services/PaymentResponseListener.cs

using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using System.Text.Json;
using Order.API.Data;
using Plain.RabbitMQ;
using Shared.Models;

namespace Order.API.Services
{
    public class PaymentResponseListener : IHostedService
    {
        private readonly ISubscriber _paymentSubscriber;
        private readonly IPublisher _publisher;
        private readonly IServiceScopeFactory _scopeFactory;
        private readonly ILogger<PaymentResponseListener> _logger;

        public PaymentResponseListener(
            ISubscriber paymentSubscriber,
            IPublisher publisher,
            IServiceScopeFactory scopeFactory,
            ILogger<PaymentResponseListener> logger)
        {
            _paymentSubscriber = paymentSubscriber;
            _publisher = publisher;
            _scopeFactory = scopeFactory;
            _logger = logger;
        }

        public Task StartAsync(CancellationToken cancellationToken)
        {
            _paymentSubscriber.Subscribe(ProcessPaymentResponse);
            _logger.LogInformation("Payment Response Listener started");
            return Task.CompletedTask;
        }

        private bool ProcessPaymentResponse(string message, IDictionary<string, object> headers)
        {
            try
            {
                _logger.LogInformation($"Received payment response: {message}");
                var response = JsonSerializer.Deserialize<PaymentProcessedEvent>(message);

                if (response == null)
                {
                    _logger.LogError("Failed to deserialize payment response");
                    return false;
                }

                using var scope = _scopeFactory.CreateScope();
                var context = scope.ServiceProvider.GetRequiredService<OrderDbContext>();

                var order = context.Orders.FirstOrDefault(o => o.Id == response.OrderId);
                if (order == null)
                {
                    _logger.LogWarning($"Order {response.OrderId} not found");
                    return true;
                }

                if (response.IsSuccess)
                {
                    // Payment successful - confirm order
                    order.Status = "Confirmed";
                    order.UpdatedAt = DateTime.UtcNow;
                    _logger.LogInformation($"Order {order.Id} confirmed after successful payment: {response.TransactionId}");
                }
                else
                {
                    // Payment failed - compensate by cancelling order and releasing inventory
                    order.Status = "Cancelled";
                    order.UpdatedAt = DateTime.UtcNow;
                    
                    _logger.LogWarning($"Order {order.Id} cancelled due to payment failure: {response.Message}");

                    // Publish inventory release event for compensation
                    var releaseEvent = new InventoryReleaseEvent
                    {
                        OrderId = response.OrderId,
                        ProductId = response.ProductId,
                        Quantity = order.Quantity,
                        Reason = "Payment failed"
                    };

                    _publisher.Publish(
                        JsonSerializer.Serialize(releaseEvent),
                        "inventory.release",
                        null);

                    _logger.LogInformation($"Published inventory release event for order {response.OrderId}");
                }

                context.SaveChanges();
                return true;
            }
            catch (Exception ex)
            {
                _logger.LogError(ex, "Error processing payment response");
                return false;
            }
        }

        public Task StopAsync(CancellationToken cancellationToken)
        {
            _logger.LogInformation("Payment Response Listener stopped");
            return Task.CompletedTask;
        }
    }
}


Controllers/OrdersController.cs


using Microsoft.AspNetCore.Mvc;
using Microsoft.EntityFrameworkCore;
using System.Text.Json;
using Order.API.Data;
using Plain.RabbitMQ;
using Shared.Models;

namespace Order.API.Controllers
{
    [Route("api/[controller]")]
    [ApiController]
    public class OrdersController : ControllerBase
    {
        private readonly OrderDbContext _context;
        private readonly IPublisher _publisher;
        private readonly ILogger<OrdersController> _logger;

        public OrdersController(
            OrderDbContext context, 
            IPublisher publisher, 
            ILogger<OrdersController> logger)
        {
            _context = context;
            _publisher = publisher;
            _logger = logger;
        }

        [HttpGet]
        public async Task<ActionResult<IEnumerable<Models.Order>>> GetOrders()
        {
            return await _context.Orders.OrderByDescending(o => o.CreatedAt).ToListAsync();
        }

        [HttpGet("{id}")]
        public async Task<ActionResult<Models.Order>> GetOrder(int id)
        {
            var order = await _context.Orders.FindAsync(id);
            return order == null ? NotFound() : order;
        }

        [HttpPost]
        public async Task<ActionResult<Models.Order>> CreateOrder(CreateOrderRequest request)
        {
            try
            {
                var order = new Models.Order
                {
                    ProductId = request.ProductId,
                    ProductName = request.ProductName,
                    Quantity = request.Quantity,
                    UnitPrice = request.UnitPrice,
                    TotalAmount = request.Quantity * request.UnitPrice,
                    CustomerEmail = request.CustomerEmail
                };

                _context.Orders.Add(order);
                await _context.SaveChangesAsync();

                _logger.LogInformation($"Order {order.Id} created, publishing OrderCreated event");

                // Publish order created event
                var orderCreatedEvent = new OrderCreatedEvent
                {
                    OrderId = order.Id,
                    ProductId = order.ProductId,
                    ProductName = order.ProductName,
                    Quantity = order.Quantity,
                    UnitPrice = order.UnitPrice,
                    TotalAmount = order.TotalAmount,
                    CustomerEmail = order.CustomerEmail
                };

                _publisher.Publish(
                    JsonSerializer.Serialize(orderCreatedEvent),
                    "order.created",
                    null);

                return CreatedAtAction(nameof(GetOrder), new { id = order.Id }, order);
            }
            catch (Exception ex)
            {
                _logger.LogError(ex, "Error creating order");
                return StatusCode(500, "Internal server error");
            }
        }
    }

    public class CreateOrderRequest
    {
        public int ProductId { get; set; }
        public string ProductName { get; set; }
        public int Quantity { get; set; }
        public decimal UnitPrice { get; set; }
        public string CustomerEmail { get; set; }
    }
}

appsettings.json

{
  "Logging": {
    "LogLevel": {
      "Default": "Information",
      "Microsoft.AspNetCore": "Warning",
      "Microsoft.EntityFrameworkCore": "Warning"
    }
  },
  "ConnectionStrings": {
    "DefaultConnection": "Data Source=orders.db"
  },
  "AllowedHosts": "*"
}

Program.cs

using Microsoft.EntityFrameworkCore;
using Order.API.Data;
using Order.API.Services;
using Plain.RabbitMQ;
using RabbitMQ.Client;

namespace Order.API
{
    public class Program
    {
        public static void Main(string[] args)
        {
            var builder = WebApplication.CreateBuilder(args);

            // Add services
            builder.Services.AddControllers();
            builder.Services.AddEndpointsApiExplorer();
            builder.Services.AddSwaggerGen();

            // Database
            builder.Services.AddDbContext<OrderDbContext>(options =>
                options.UseSqlite(builder.Configuration.GetConnectionString("DefaultConnection")));

            // RabbitMQ Configuration
            builder.Services.AddSingleton<IConnectionProvider>(
                new ConnectionProvider("amqp://guest:guest@localhost:5672"));

            // Publisher for Order events
            builder.Services.AddSingleton<IPublisher>(provider =>
                new Publisher(
                    provider.GetService<IConnectionProvider>(),
                    "order.exchange",
                    ExchangeType.Topic));

            // Subscriber for Inventory events
            builder.Services.AddSingleton<ISubscriber>(provider =>
                new Subscriber(
                    provider.GetService<IConnectionProvider>(),
                    "inventory.exchange",
                    "inventory.response.queue",
                    "inventory.reserved",
                    ExchangeType.Topic));

            // Subscriber for Payment events
            builder.Services.AddKeyedSingleton<ISubscriber>("PaymentSubscriber", (provider, key) =>
                new Subscriber(
                    provider.GetService<IConnectionProvider>(),
                    "payment.exchange",
                    "payment.response.queue",
                    "payment.processed",
                    ExchangeType.Topic));

            // Background Services
            builder.Services.AddHostedService<InventoryResponseListener>();
            builder.Services.AddHostedService<PaymentResponseListener>();

            var app = builder.Build();

            // Configure the HTTP request pipeline
            if (app.Environment.IsDevelopment())
            {
                app.UseSwagger();
                app.UseSwaggerUI();
            }

            app.UseHttpsRedirection();
            app.UseAuthorization();
            app.MapControllers();

            // Ensure database is created
            using (var scope = app.Services.CreateScope())
            {
                var context = scope.ServiceProvider.GetRequiredService<OrderDbContext>();
                context.Database.EnsureCreated();
            }

            app.Run();
        }
    }
}

  • Run the following command to build the Order.API project:
    dotnet build
    

Step 5: Build the Catalog Service

Install Required Packages

cd ../Catalog.API
dotnet add package Microsoft.EntityFrameworkCore.Sqlite
dotnet add package Microsoft.EntityFrameworkCore.Tools
dotnet add package Plain.RabbitMQ
dotnet add package Swashbuckle.AspNetCore
# Note: Using System.Text.Json (built-in with .NET 10) instead of Newtonsoft.Json
dotnet add reference ../Shared/Shared.csproj

Models/Product.cs


using System.ComponentModel.DataAnnotations;
using System.ComponentModel.DataAnnotations.Schema;

namespace Catalog.API.Models
{
    public class Product
    {
        [Key]
        [DatabaseGenerated(DatabaseGeneratedOption.Identity)]
        public int Id { get; set; }
        
        [Required]
        [MaxLength(200)]
        public required string Name { get; set; }
        
        [MaxLength(1000)]
        public required string Description { get; set; }
        
        [Column(TypeName = "decimal(18,2)")]
        public decimal Price { get; set; }
        
        public int AvailableStock { get; set; }
        public int ReservedStock { get; set; }
        public int MaxStockThreshold { get; set; } = 100;
        
        public DateTime CreatedAt { get; set; } = DateTime.UtcNow;
        public DateTime? UpdatedAt { get; set; }
        
        // Computed property for total stock
        [NotMapped]
        public int TotalStock => AvailableStock + ReservedStock;
    }
}

Data/CatalogDbContext.cs


using Microsoft.EntityFrameworkCore;
using Catalog.API.Models;

namespace Catalog.API.Data
{
    public class CatalogDbContext : DbContext
    {
        public CatalogDbContext(DbContextOptions<CatalogDbContext> options) : base(options)
        {
        }

        public DbSet<Product> Products { get; set; }

        protected override void OnModelCreating(ModelBuilder modelBuilder)
        {
            // Seed some sample data
            modelBuilder.Entity<Product>().HasData(
                new Product 
                { 
                    Id = 1, 
                    Name = "Gaming Laptop", 
                    Description = "High-performance gaming laptop with RTX graphics", 
                    Price = 1299.99m, 
                    AvailableStock = 10, 
                    ReservedStock = 0,
                    MaxStockThreshold = 20 
                },
                new Product 
                { 
                    Id = 2, 
                    Name = "Wireless Mouse", 
                    Description = "Ergonomic wireless gaming mouse", 
                    Price = 79.99m, 
                    AvailableStock = 25, 
                    ReservedStock = 0,
                    MaxStockThreshold = 50 
                },
                new Product 
                { 
                    Id = 3, 
                    Name = "Mechanical Keyboard", 
                    Description = "RGB mechanical keyboard with Cherry MX switches", 
                    Price = 159.99m, 
                    AvailableStock = 15, 
                    ReservedStock = 0,
                    MaxStockThreshold = 30 
                }
            );

            // Add index for better performance
            modelBuilder.Entity<Product>()
                .HasIndex(p => p.Name);
        }
    }
}

Services/OrderCreatedListener.cs


using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using System.Text.Json;
using Catalog.API.Data;
using Plain.RabbitMQ;
using Shared.Models;

namespace Catalog.API.Services
{
    public class OrderCreatedListener : IHostedService
    {
        private readonly ISubscriber _subscriber;
        private readonly IPublisher _publisher;
        private readonly IServiceScopeFactory _scopeFactory;
        private readonly ILogger<OrderCreatedListener> _logger;

        public OrderCreatedListener(
            ISubscriber subscriber,
            IPublisher publisher,
            IServiceScopeFactory scopeFactory,
            ILogger<OrderCreatedListener> logger)
        {
            _subscriber = subscriber;
            _publisher = publisher;
            _scopeFactory = scopeFactory;
            _logger = logger;
        }

        public Task StartAsync(CancellationToken cancellationToken)
        {
            _subscriber.Subscribe(ProcessOrderCreated);
            _logger.LogInformation("Order Created Listener started");
            return Task.CompletedTask;
        }

        private bool ProcessOrderCreated(string message, IDictionary<string, object> headers)
        {
            InventoryReservedEvent? response = null;
            
            try
            {
                _logger.LogInformation($"Received order created event: {message}");
                var orderCreated = JsonSerializer.Deserialize<OrderCreatedEvent>(message);

                if (orderCreated == null)
                {
                    _logger.LogError("Failed to deserialize order created event");
                    return false;
                }

                using var scope = _scopeFactory.CreateScope();
                var context = scope.ServiceProvider.GetRequiredService<CatalogDbContext>();

                response = new InventoryReservedEvent
                {
                    OrderId = orderCreated.OrderId,
                    ProductId = orderCreated.ProductId,
                    TotalAmount = orderCreated.TotalAmount,
                    CustomerEmail = orderCreated.CustomerEmail
                };

                var product = context.Products.Find(orderCreated.ProductId);

                if (product == null)
                {
                    response.IsSuccess = false;
                    response.Message = $"Product with ID {orderCreated.ProductId} not found";
                    _logger.LogWarning($"Product {orderCreated.ProductId} not found for order {orderCreated.OrderId}");
                }
                else if (product.AvailableStock < orderCreated.Quantity)
                {
                    response.IsSuccess = false;
                    response.Message = $"Insufficient stock. Available: {product.AvailableStock}, Requested: {orderCreated.Quantity}";
                    _logger.LogWarning($"Insufficient stock for product {orderCreated.ProductId}. Available: {product.AvailableStock}, Requested: {orderCreated.Quantity}");
                }
                else
                {
                    // Reserve inventory (don't commit yet, wait for payment confirmation)
                    var originalStock = product.AvailableStock;
                    product.AvailableStock -= orderCreated.Quantity;
                    product.ReservedStock += orderCreated.Quantity;
                    product.UpdatedAt = DateTime.UtcNow;
                    
                    context.SaveChanges();

                    response.IsSuccess = true;
                    response.Message = "Inventory reserved successfully, awaiting payment";
                    response.ReservedQuantity = orderCreated.Quantity;
                    
                    _logger.LogInformation($"Inventory reserved for product {orderCreated.ProductId}. " +
                                         $"Available stock changed from {originalStock} to {product.AvailableStock}");
                }

                // Publish inventory response
                _publisher.Publish(
                    JsonSerializer.Serialize(response),
                    "inventory.reserved",
                    null);

                return true;
            }
            catch (Exception ex)
            {
                _logger.LogError(ex, "Error processing order created event");
                
                // Publish failure response if we have basic info
                if (response != null)
                {
                    response.IsSuccess = false;
                    response.Message = "Internal error processing inventory reservation";
                    
                    _publisher.Publish(
                        JsonSerializer.Serialize(response),
                        "inventory.reserved",
                        null);
                }

                return false;
            }
        }

        public Task StopAsync(CancellationToken cancellationToken)
        {
            _logger.LogInformation("Order Created Listener stopped");
            return Task.CompletedTask;
        }
    }
}

Services/InventoryReleaseListener.cs


using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using System.Text.Json;
using Catalog.API.Data;
using Plain.RabbitMQ;
using Shared.Models;

namespace Catalog.API.Services
{
    public class InventoryReleaseListener : IHostedService
    {
        private readonly ISubscriber _subscriber;
        private readonly IServiceScopeFactory _scopeFactory;
        private readonly ILogger<InventoryReleaseListener> _logger;

        public InventoryReleaseListener(
            ISubscriber subscriber,
            IServiceScopeFactory scopeFactory,
            ILogger<InventoryReleaseListener> logger)
        {
            _subscriber = subscriber;
            _scopeFactory = scopeFactory;
            _logger = logger;
        }

        public Task StartAsync(CancellationToken cancellationToken)
        {
            _subscriber.Subscribe(ProcessInventoryRelease);
            _logger.LogInformation("Inventory Release Listener started");
            return Task.CompletedTask;
        }

        private bool ProcessInventoryRelease(string message, IDictionary<string, object> headers)
        {
            try
            {
                _logger.LogInformation($"Received inventory release event: {message}");
                var releaseEvent = JsonSerializer.Deserialize<InventoryReleaseEvent>(message);

                if (releaseEvent == null)
                {
                    _logger.LogError("Failed to deserialize inventory release event");
                    return false;
                }

                using var scope = _scopeFactory.CreateScope();
                var context = scope.ServiceProvider.GetRequiredService<CatalogDbContext>();

                var product = context.Products.Find(releaseEvent.ProductId);
                if (product == null)
                {
                    _logger.LogWarning($"Product {releaseEvent.ProductId} not found for inventory release");
                    return true; // Consider it processed
                }

                // Release reserved inventory back to available stock
                var beforeAvailable = product.AvailableStock;
                var beforeReserved = product.ReservedStock;
                
                product.AvailableStock += releaseEvent.Quantity;
                product.ReservedStock = Math.Max(0, product.ReservedStock - releaseEvent.Quantity);
                product.UpdatedAt = DateTime.UtcNow;
                
                context.SaveChanges();

                _logger.LogInformation($"Released inventory for product {releaseEvent.ProductId} due to: {releaseEvent.Reason}. " +
                                     $"Available: {beforeAvailable}{product.AvailableStock}, " +
                                     $"Reserved: {beforeReserved}{product.ReservedStock}");

                return true;
            }
            catch (Exception ex)
            {
                _logger.LogError(ex, "Error processing inventory release event");
                return false;
            }
        }

        public Task StopAsync(CancellationToken cancellationToken)
        {
            _logger.LogInformation("Inventory Release Listener stopped");
            return Task.CompletedTask;
        }
    }
}

Services/PaymentConfirmationListener.cs


using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using System.Text.Json;
using Catalog.API.Data;
using Plain.RabbitMQ;
using Shared.Models;

namespace Catalog.API.Services
{
    public class PaymentConfirmationListener : IHostedService
    {
        private readonly ISubscriber _subscriber;
        private readonly IServiceScopeFactory _scopeFactory;
        private readonly ILogger<PaymentConfirmationListener> _logger;

        public PaymentConfirmationListener(
            ISubscriber subscriber,
            IServiceScopeFactory scopeFactory,
            ILogger<PaymentConfirmationListener> logger)
        {
            _subscriber = subscriber;
            _scopeFactory = scopeFactory;
            _logger = logger;
        }

        public Task StartAsync(CancellationToken cancellationToken)
        {
            _subscriber.Subscribe(ProcessPaymentConfirmation);
            _logger.LogInformation("Payment Confirmation Listener started");
            return Task.CompletedTask;
        }

        private bool ProcessPaymentConfirmation(string message, IDictionary<string, object> headers)
        {
            try
            {
                _logger.LogInformation($"Received payment confirmation: {message}");
                var paymentEvent = JsonSerializer.Deserialize<PaymentProcessedEvent>(message);

                if (paymentEvent == null)
                {
                    _logger.LogError("Failed to deserialize payment event");
                    return false;
                }

                if (paymentEvent.IsSuccess)
                {
                    // Payment successful - inventory reservation is now committed
                    // No action needed, inventory was already reserved
                    _logger.LogInformation($"Payment confirmed for order {paymentEvent.OrderId}, inventory reservation committed");
                }
                // Note: If payment fails, the InventoryReleaseListener will handle releasing the inventory

                return true;
            }
            catch (Exception ex)
            {
                _logger.LogError(ex, "Error processing payment confirmation");
                return false;
            }
        }

        public Task StopAsync(CancellationToken cancellationToken)
        {
            _logger.LogInformation("Payment Confirmation Listener stopped");
            return Task.CompletedTask;
        }
    }
}

Controllers/ProductsController.cs


using Microsoft.AspNetCore.Mvc;
using Microsoft.EntityFrameworkCore;
using Catalog.API.Data;
using Catalog.API.Models;

namespace Catalog.API.Controllers
{
    [Route("api/[controller]")]
    [ApiController]
    public class ProductsController : ControllerBase
    {
        private readonly CatalogDbContext _context;
        private readonly ILogger<ProductsController> _logger;

        public ProductsController(CatalogDbContext context, ILogger<ProductsController> logger)
        {
            _context = context;
            _logger = logger;
        }

        [HttpGet]
        public async Task<ActionResult<IEnumerable<Product>>> GetProducts()
        {
            return await _context.Products.OrderBy(p => p.Name).ToListAsync();
        }

        [HttpGet("{id}")]
        public async Task<ActionResult<Product>> GetProduct(int id)
        {
            var product = await _context.Products.FindAsync(id);
            return product == null ? NotFound() : product;
        }

        [HttpPut("{id}/stock")]
        public async Task<IActionResult> UpdateStock(int id, UpdateStockRequest request)
        {
            var product = await _context.Products.FindAsync(id);
            if (product == null)
            {
                return NotFound();
            }

            var oldAvailableStock = product.AvailableStock;
            product.AvailableStock = request.AvailableStock;
            product.UpdatedAt = DateTime.UtcNow;

            try
            {
                await _context.SaveChangesAsync();
                _logger.LogInformation($"Stock updated for product {id}: {oldAvailableStock}{request.AvailableStock}");
                return NoContent();
            }
            catch (DbUpdateConcurrencyException)
            {
                if (!ProductExists(id))
                    return NotFound();
                throw;
            }
        }

        private bool ProductExists(int id)
        {
            return _context.Products.Any(e => e.Id == id);
        }
    }

    public class UpdateStockRequest
    {
        public int AvailableStock { get; set; }
    }
}

appsettings.json

{
  "Logging": {
    "LogLevel": {
      "Default": "Information",
      "Microsoft.AspNetCore": "Warning",
      "Microsoft.EntityFrameworkCore": "Warning"
    }
  },
  "ConnectionStrings": {
    "DefaultConnection": "Data Source=catalog.db"
  },
  "AllowedHosts": "*"
}

Program.cs


using Microsoft.EntityFrameworkCore;
using Catalog.API.Data;
using Catalog.API.Services;
using Plain.RabbitMQ;
using RabbitMQ.Client;

namespace Catalog.API
{
    public class Program
    {
        public static void Main(string[] args)
        {
            var builder = WebApplication.CreateBuilder(args);

            // Add services
            builder.Services.AddControllers();
            builder.Services.AddEndpointsApiExplorer();
            builder.Services.AddSwaggerGen();

            // Database
            builder.Services.AddDbContext<CatalogDbContext>(options =>
                options.UseSqlite(builder.Configuration.GetConnectionString("DefaultConnection")));

            // RabbitMQ Configuration
            builder.Services.AddSingleton<IConnectionProvider>(
                new ConnectionProvider("amqp://guest:guest@localhost:5672"));

            // Publisher for Inventory events
            builder.Services.AddSingleton<IPublisher>(provider =>
                new Publisher(
                    provider.GetService<IConnectionProvider>(),
                    "inventory.exchange",
                    ExchangeType.Topic));

            // Subscriber for Order events
            builder.Services.AddSingleton<ISubscriber>(provider =>
                new Subscriber(
                    provider.GetService<IConnectionProvider>(),
                    "order.exchange",
                    "order.created.queue",
                    "order.created",
                    ExchangeType.Topic));

            // Subscriber for Inventory Release events
            builder.Services.AddKeyedSingleton<ISubscriber>("ReleaseSubscriber", (provider, key) =>
                new Subscriber(
                    provider.GetService<IConnectionProvider>(),
                    "order.exchange",
                    "inventory.release.queue",
                    "inventory.release",
                    ExchangeType.Topic));

            // Subscriber for Payment events
            builder.Services.AddKeyedSingleton<ISubscriber>("PaymentSubscriber", (provider, key) =>
                new Subscriber(
                    provider.GetService<IConnectionProvider>(),
                    "payment.exchange",
                    "payment.confirmation.queue",
                    "payment.processed",
                    ExchangeType.Topic));

            // Background Services
            builder.Services.AddHostedService<OrderCreatedListener>();
            builder.Services.AddHostedService<InventoryReleaseListener>();
            builder.Services.AddHostedService<PaymentConfirmationListener>();

            var app = builder.Build();

            // Configure the HTTP request pipeline
            if (app.Environment.IsDevelopment())
            {
                app.UseSwagger();
                app.UseSwaggerUI();
            }

            app.UseHttpsRedirection();
            app.UseAuthorization();
            app.MapControllers();

            // Ensure database is created and seeded
            using (var scope = app.Services.CreateScope())
            {
                var context = scope.ServiceProvider.GetRequiredService<CatalogDbContext>();
                context.Database.EnsureCreated();
            }

            app.Run();
        }
    }
}

Step 6: Build the Payment Service

Install Required Packages

cd ../Payment.API
dotnet add package Microsoft.EntityFrameworkCore.Sqlite
dotnet add package Microsoft.EntityFrameworkCore.Tools
dotnet add package Plain.RabbitMQ
dotnet add package Swashbuckle.AspNetCore
# Note: Using System.Text.Json (built-in with .NET 10) instead of Newtonsoft.Json
dotnet add reference ../Shared/Shared.csproj

Models/PaymentTransaction.cs


using System.ComponentModel.DataAnnotations;
using System.ComponentModel.DataAnnotations.Schema;

namespace Payment.API.Models
{
    public class PaymentTransaction
    {
        [Key]
        [DatabaseGenerated(DatabaseGeneratedOption.Identity)]
        public int Id { get; set; }
        
        public int OrderId { get; set; }
        public int ProductId { get; set; }
        
        [Column(TypeName = "decimal(18,2)")]
        public decimal Amount { get; set; }
        
        [Required]
        [MaxLength(200)]
        public string CustomerEmail { get; set; }
        
        [MaxLength(50)]
        public string Status { get; set; } = "Pending"; // Pending, Completed, Failed, Refunded
        
        [MaxLength(100)]
        public string TransactionId { get; set; }
        
        [MaxLength(50)]
        public string PaymentMethod { get; set; } = "CreditCard";
        
        public DateTime CreatedAt { get; set; } = DateTime.UtcNow;
        public DateTime? ProcessedAt { get; set; }
        
        [MaxLength(500)]
        public string FailureReason { get; set; }
        
        [MaxLength(100)]
        public string ExternalTransactionId { get; set; }
    }
}

Data/PaymentDbContext.cs


using Microsoft.EntityFrameworkCore;
using Payment.API.Models;

namespace Payment.API.Data
{
    public class PaymentDbContext : DbContext
    {
        public PaymentDbContext(DbContextOptions<PaymentDbContext> options) : base(options)
        {
        }

        public DbSet<PaymentTransaction> PaymentTransactions { get; set; }

        protected override void OnModelCreating(ModelBuilder modelBuilder)
        {
            modelBuilder.Entity<PaymentTransaction>(entity =>
            {
                entity.HasIndex(e => e.OrderId);
                entity.HasIndex(e => e.TransactionId);
                entity.HasIndex(e => e.CustomerEmail);
                entity.Property(e => e.TransactionId).IsRequired();
            });
        }
    }
}

Services/PaymentProcessor.cs

using Microsoft.Extensions.Logging;

namespace Payment.API.Services
{
    public interface IPaymentProcessor
    {
        Task<PaymentResult> ProcessPaymentAsync(decimal amount, string customerEmail);
    }

    public class PaymentProcessor : IPaymentProcessor
    {
        private readonly ILogger<PaymentProcessor> _logger;
        private readonly Random _random = new Random();

        public PaymentProcessor(ILogger<PaymentProcessor> logger)
        {
            _logger = logger;
        }

        public async Task<PaymentResult> ProcessPaymentAsync(decimal amount, string customerEmail)
        {
            _logger.LogInformation($"Processing payment of ${amount} for {customerEmail}");
            
            // Simulate payment processing time
            await Task.Delay(TimeSpan.FromSeconds(2));

            // Simulate payment success/failure (90% success rate for demo)
            var isSuccess = _random.Next(1, 101) <= 90;
            
            var result = new PaymentResult
            {
                IsSuccess = isSuccess,
                TransactionId = Guid.NewGuid().ToString("N")[..16].ToUpper(),
                Amount = amount,
                ProcessedAt = DateTime.UtcNow
            };

            if (!isSuccess)
            {
                var failureReasons = new[]
                {
                    "Insufficient funds",
                    "Card expired",
                    "Card declined",
                    "Invalid card number",
                    "Processing error"
                };
                result.FailureReason = failureReasons[_random.Next(failureReasons.Length)];
            }

            _logger.LogInformation($"Payment result: {(isSuccess ? "Success" : "Failed")} - {result.TransactionId}");
            return result;
        }
    }

    public class PaymentResult
    {
        public bool IsSuccess { get; set; }
        public string TransactionId { get; set; }
        public decimal Amount { get; set; }
        public DateTime ProcessedAt { get; set; }
        public string FailureReason { get; set; }
    }
}

Services/InventoryReservedListener.cs


using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using System.Text.Json;
using Payment.API.Data;
using Payment.API.Models;
using Payment.API.Services;
using Plain.RabbitMQ;
using Shared.Models;

namespace Payment.API.Services
{
    public class InventoryReservedListener : IHostedService
    {
        private readonly ISubscriber _subscriber;
        private readonly IPublisher _publisher;
        private readonly IServiceScopeFactory _scopeFactory;
        private readonly ILogger<InventoryReservedListener> _logger;

        public InventoryReservedListener(
            ISubscriber subscriber,
            IPublisher publisher,
            IServiceScopeFactory scopeFactory,
            ILogger<InventoryReservedListener> logger)
        {
            _subscriber = subscriber;
            _publisher = publisher;
            _scopeFactory = scopeFactory;
            _logger = logger;
        }

        public Task StartAsync(CancellationToken cancellationToken)
        {
            _subscriber.Subscribe(ProcessInventoryReserved);
            _logger.LogInformation("Inventory Reserved Listener started");
            return Task.CompletedTask;
        }

        private bool ProcessInventoryReserved(string message, IDictionary<string, object> headers)
        {
            PaymentProcessedEvent response = null;
            
            try
            {
                _logger.LogInformation($"Received inventory reserved event: {message}");
                var inventoryEvent = JsonSerializer.Deserialize<InventoryReservedEvent>(message);

                if (inventoryEvent == null)
                {
                    _logger.LogError("Failed to deserialize inventory reserved event");
                    return false;
                }

                response = new PaymentProcessedEvent
                {
                    OrderId = inventoryEvent.OrderId,
                    ProductId = inventoryEvent.ProductId,
                    Amount = inventoryEvent.TotalAmount,
                    CustomerEmail = inventoryEvent.CustomerEmail
                };

                if (!inventoryEvent.IsSuccess)
                {
                    // Inventory reservation failed, no payment needed
                    response.IsSuccess = false;
                    response.Message = "Payment skipped due to inventory failure";
                    _logger.LogInformation($"Skipping payment for order {inventoryEvent.OrderId} due to inventory failure");
                }
                else
                {
                    // Process payment
                    using var scope = _scopeFactory.CreateScope();
                    var context = scope.ServiceProvider.GetRequiredService<PaymentDbContext>();
                    var paymentProcessor = scope.ServiceProvider.GetRequiredService<IPaymentProcessor>();

                    // Create payment record
                    var payment = new PaymentTransaction
                    {
                        OrderId = inventoryEvent.OrderId,
                        ProductId = inventoryEvent.ProductId,
                        Amount = inventoryEvent.TotalAmount,
                        CustomerEmail = inventoryEvent.CustomerEmail,
                        Status = "Processing"
                    };

                    context.PaymentTransactions.Add(payment);
                    context.SaveChanges();

                    // Process payment
                    var paymentResult = paymentProcessor.ProcessPaymentAsync(
                        inventoryEvent.TotalAmount, 
                        inventoryEvent.CustomerEmail).Result;

                    // Update payment record
                    payment.Status = paymentResult.IsSuccess ? "Completed" : "Failed";
                    payment.TransactionId = paymentResult.TransactionId;
                    payment.ProcessedAt = paymentResult.ProcessedAt;
                    payment.FailureReason = paymentResult.FailureReason;
                    payment.ExternalTransactionId = paymentResult.TransactionId;

                    context.SaveChanges();

                    // Prepare response
                    response.IsSuccess = paymentResult.IsSuccess;
                    response.TransactionId = paymentResult.TransactionId;
                    response.Message = paymentResult.IsSuccess 
                        ? "Payment processed successfully" 
                        : $"Payment failed: {paymentResult.FailureReason}";

                    _logger.LogInformation($"Payment processing completed for order {inventoryEvent.OrderId}: " +
                                         $"{(paymentResult.IsSuccess ? "Success" : "Failed")}");
                }

                // Publish payment response
                _publisher.Publish(
                    JsonSerializer.Serialize(response),
                    "payment.processed",
                    null);

                return true;
            }
            catch (Exception ex)
            {
                _logger.LogError(ex, "Error processing inventory reserved event");
                
                // Publish failure response
                if (response != null)
                {
                    response.IsSuccess = false;
                    response.Message = "Internal error processing payment";
                    
                    _publisher.Publish(
                        JsonSerializer.Serialize(response),
                        "payment.processed",
                        null);
                }

                return false;
            }
        }

        public Task StopAsync(CancellationToken cancellationToken)
        {
            _logger.LogInformation("Inventory Reserved Listener stopped");
            return Task.CompletedTask;
        }
    }
}

Controllers/PaymentsController.cs


using Microsoft.AspNetCore.Mvc;
using Microsoft.EntityFrameworkCore;
using Payment.API.Data;
using Payment.API.Models;

namespace Payment.API.Controllers
{
    [Route("api/[controller]")]
    [ApiController]
    public class PaymentsController : ControllerBase
    {
        private readonly PaymentDbContext _context;
        private readonly ILogger<PaymentsController> _logger;

        public PaymentsController(PaymentDbContext context, ILogger<PaymentsController> logger)
        {
            _context = context;
            _logger = logger;
        }

        [HttpGet]
        public async Task<ActionResult<IEnumerable<PaymentTransaction>>> GetPayments()
        {
            return await _context.PaymentTransactions
                .OrderByDescending(p => p.CreatedAt)
                .ToListAsync();
        }

        [HttpGet("{id}")]
        public async Task<ActionResult<PaymentTransaction>> GetPayment(int id)
        {
            var payment = await _context.PaymentTransactions.FindAsync(id);
            return payment == null ? NotFound() : payment;
        }

        [HttpGet("order/{orderId}")]
        public async Task<ActionResult<PaymentTransaction>> GetPaymentByOrder(int orderId)
        {
            var payment = await _context.PaymentTransactions
                .FirstOrDefaultAsync(p => p.OrderId == orderId);
            return payment == null ? NotFound() : payment;
        }

        [HttpGet("transaction/{transactionId}")]
        public async Task<ActionResult<PaymentTransaction>> GetPaymentByTransaction(string transactionId)
        {
            var payment = await _context.PaymentTransactions
                .FirstOrDefaultAsync(p => p.TransactionId == transactionId);
            return payment == null ? NotFound() : payment;
        }
    }
}

appsettings.json

{
  "Logging": {
    "LogLevel": {
      "Default": "Information",
      "Microsoft.AspNetCore": "Warning",
      "Microsoft.EntityFrameworkCore": "Warning"
    }
  },
  "ConnectionStrings": {
    "DefaultConnection": "Data Source=payments.db"
  },
  "PaymentSettings": {
    "SuccessRate": 90,
    "ProcessingDelayMs": 2000
  },
  "AllowedHosts": "*"
}

Program.cs


using Microsoft.EntityFrameworkCore;
using Payment.API.Data;
using Payment.API.Services;
using Plain.RabbitMQ;
using RabbitMQ.Client;

namespace Payment.API
{
    public class Program
    {
        public static void Main(string[] args)
        {
            var builder = WebApplication.CreateBuilder(args);

            // Add services
            builder.Services.AddControllers();
            builder.Services.AddEndpointsApiExplorer();
            builder.Services.AddSwaggerGen();

            // Database
            builder.Services.AddDbContext<PaymentDbContext>(options =>
                options.UseSqlite(builder.Configuration.GetConnectionString("DefaultConnection")));

            // Payment Service
            builder.Services.AddScoped<IPaymentProcessor, PaymentProcessor>();

            // RabbitMQ Configuration
            builder.Services.AddSingleton<IConnectionProvider>(
                new ConnectionProvider("amqp://guest:guest@localhost:5672"));

            // Publisher for Payment events
            builder.Services.AddSingleton<IPublisher>(provider =>
                new Publisher(
                    provider.GetService<IConnectionProvider>(),
                    "payment.exchange",
                    ExchangeType.Topic));

            // Subscriber for Inventory events
            builder.Services.AddSingleton<ISubscriber>(provider =>
                new Subscriber(
                    provider.GetService<IConnectionProvider>(),
                    "inventory.exchange",
                    "inventory.reserved.queue",
                    "inventory.reserved",
                    ExchangeType.Topic));

            // Background Services
            builder.Services.AddHostedService<InventoryReservedListener>();

            var app = builder.Build();

            // Configure the HTTP request pipeline
            if (app.Environment.IsDevelopment())
            {
                app.UseSwagger();
                app.UseSwaggerUI();
            }

            app.UseHttpsRedirection();
            app.UseAuthorization();
            app.MapControllers();

            // Ensure database is created
            using (var scope = app.Services.CreateScope())
            {
                var context = scope.ServiceProvider.GetRequiredService<PaymentDbContext>();
                context.Database.EnsureCreated();
            }

            app.Run();
        }
    }
}
  • Run dotnet build to ensure everything compiles successfully.

Step 7: Configure Multiple Startup Projects

  1. In Visual Studio, right-click solution → “Set StartUp Projects”
  2. Select “Multiple startup projects”
  3. Set Order.API, Catalog.API, and Payment.API to “Start”
  4. Configure different ports in launchSettings.json:
    • Order.API: https://localhost:7001
    • Catalog.API: https://localhost:7002
    • Payment.API: https://localhost:7003

Step 8: Testing the Enhanced SAGA Implementation

Test Scenario 1: Complete Success Flow (Happy Path)

  1. Check Products: Open Catalog.API Swagger at https://localhost:7002/swagger
    GET /api/products
    
  2. Create Order: Open Order.API Swagger at https://localhost:7001/swagger
    POST /api/orders
    {
      "productId": 1,
      "productName": "Gaming Laptop",
      "quantity": 2,
      "unitPrice": 1299.99,
      "customerEmail": "customer@example.com"
    }
    
  3. Expected Flow:
    • Order status: Pending → InventoryReserved → PaymentProcessing → Confirmed
    • Inventory: Available stock decreases, reserved stock increases
    • Payment: Transaction created and processed successfully
  4. Verify Results:
    • Order status should be “Confirmed”
    • Product inventory properly updated
    • Payment transaction completed in Payment.API

Test Scenario 2: Inventory Failure (Immediate Compensation)

  1. Create Large Order:
    POST /api/orders
    {
      "productId": 1,
      "productName": "Gaming Laptop",
      "quantity": 50,
      "unitPrice": 1299.99,
      "customerEmail": "customer@example.com"
    }
    
  2. Expected Flow:
    • Order status: Pending → Cancelled
    • No inventory changes
    • No payment attempted

Test Scenario 3: Payment Failure (Multi-step Compensation)

  1. Create Valid Order (payment service has ~10% failure rate for demo):
    POST /api/orders
    {
      "productId": 1,
      "productName": "Gaming Laptop",
      "quantity": 1,
      "unitPrice": 1299.99,
      "customerEmail": "customer@example.com"
    }
    
  2. If Payment Fails, Observe:
    • Order status: Pending → InventoryReserved → Cancelled
    • Reserved inventory released back to available stock
    • Payment transaction marked as failed

Test Scenario 4: Non-existent Product

  1. Create Invalid Order:
    POST /api/orders
    {
      "productId": 999,
      "productName": "Non-existent Product",
      "quantity": 1,
      "unitPrice": 100.00,
      "customerEmail": "customer@example.com"
    }
    
  2. Expected Results:
    • Order cancelled immediately
    • Error message: “Product with ID 999 not found”

Monitor the Complete Flow

Using RabbitMQ Management UI:

  1. Open http://localhost:15672
  2. Navigate to Exchanges:
    • order.exchange - Order events
    • inventory.exchange - Inventory events
    • payment.exchange - Payment events
  3. Check Queues:
    • order.created.queue
    • inventory.response.queue
    • payment.response.queue
    • inventory.release.queue

Service Logs Monitoring: Watch the console logs of all three services to see:

  • Event publishing and receiving
  • Business logic processing
  • Compensation actions
  • Error handling

APIs for Verification:

  • Orders: GET https://localhost:7001/api/orders
  • Products: GET https://localhost:7002/api/products
  • Payments: GET https://localhost:7003/api/payments

Step 9: Understanding the Enhanced SAGA Flow

Complete Success Flow:

1. Customer creates order via Order.API
2. Order.API saves order with "Pending" status
3. Order.API publishes OrderCreatedEvent to RabbitMQ
4. Catalog.API receives OrderCreatedEvent
5. Catalog.API reserves inventory and publishes InventoryReservedEvent
6. Order.API updates order status to "InventoryReserved"
7. Payment.API receives InventoryReservedEvent
8. Payment.API processes payment and publishes PaymentProcessedEvent
9. Order.API receives PaymentProcessedEvent (success) and updates to "Confirmed"
10. Catalog.API receives PaymentProcessedEvent (inventory reservation committed)

Compensation Flow (Inventory Failure):

1. Customer creates order via Order.API
2. Order.API saves order with "Pending" status
3. Order.API publishes OrderCreatedEvent to RabbitMQ
4. Catalog.API receives OrderCreatedEvent
5. Catalog.API detects insufficient stock
6. Catalog.API publishes InventoryReservedEvent (success=false)
7. Order.API receives InventoryReservedEvent and updates to "Cancelled"

Compensation Flow (Payment Failure):

1-6. Same as success flow through inventory reservation
7. Payment.API processes payment and fails
8. Payment.API publishes PaymentProcessedEvent (success=false)
9. Order.API receives PaymentProcessedEvent and updates to "Cancelled"
10. Order.API publishes InventoryReleaseEvent for compensation
11. Catalog.API receives InventoryReleaseEvent and releases reserved inventory

Enhanced Business Logic Features:

Order Status Tracking:

  • Pending - Order created, awaiting inventory check
  • InventoryReserved - Inventory allocated, awaiting payment
  • PaymentProcessing - Payment being processed
  • Confirmed - All steps successful
  • Cancelled - Failed at any step

Inventory Management:

  • AvailableStock - Immediately available for new orders
  • ReservedStock - Allocated to pending orders awaiting payment
  • Automatic release of reservations when payments fail

Payment Processing:

  • Realistic payment simulation with configurable failure rates
  • Transaction tracking with external transaction IDs
  • Detailed failure reason tracking

Production Considerations

Error Handling and Resilience

  • Retry Logic: Implement exponential backoff for failed messages
  • Dead Letter Queues: Handle messages that fail repeatedly
  • Idempotency: Ensure message processing is idempotent
  • Timeouts: Implement timeouts for long-running processes

Monitoring and Observability

  • Correlation IDs: Track requests across services
  • Structured Logging: Use structured logging for better searchability
  • Health Checks: Implement health checks for all services
  • Metrics: Monitor message processing rates and errors

Data Consistency

  • Event Sourcing: Consider event sourcing for audit trails
  • Outbox Pattern: Ensure reliable message publishing
  • Saga State Management: Track saga state for complex workflows

Hopefully, this enhanced implementation provides a robust foundation for distributed transactions using the SAGA pattern with RabbitMQ and ASP.NET Core. If you have any questions or need further assistance, feel free to ask in the comments! If you found this guide helpful, please share it with your network. Happy coding!


Complete Source Code on GitHub

Comments