Distributed transaction using SAGA pattern, RabbitMQ and asp.net core

7 minute read

Distributed transaction is one that spans multiple databases across the network while preserving ACID properties. It is very important in Microservices because of its distributed nature. To manage data consistency we may use SAGA design pattern. In this article, I will show you distributed transaction using SAGA pattern, RabbitMQ and asp.net core.

Saga Pattern

A saga is a sequence of local transactions. Each local transaction updates the local database and publishes a messages or event to message broker for updating next corresponding database. If next database transaction fails, a series of transactions will occur to undo the changes.

Saga is implemented in two ways -

  • Choreography
  • Orchestration

Choreography In choreography, participants exchange events without a centralized control.

Fig - Choreography saga (Collected)

Orchestration In orchestration, participants exchange events with a centralized control.

Fig - Orchestration saga (Collected)

Implementation of Choregraphy Pattern

Tools and technology used

  • Visual studio 2019
  • SQLite
  • ASP.NET Core
  • RabbitMQ

Step 1: Run docker container for RabbitMQ

  • Run the following command to run rabbitmq in a container
docker run -d --hostname host-rabbit --name ecommerce-rabbit -p 15672:15672 -p 5672:5672 rabbitmq:3-management

Step 2: Create two web api application

  • Create a solution name Ecommerece.sln
  • Create a web api application name - Catalog.API and add add it to the solution.
  • Create another web api application name - Order.API and add it to the solution.
  • Create a class library name - Shared and add it to the solution.

Step 3: Create Shared model classes

  • Create following two model classes in Shared->Models folder

OrderRequest.cs

namespace Shared.Models
{
    public class OrderRequest
    {
        public int OrderId { get; set; }
        public int CatalogId { get; set; }
        public int Units { get; set; }
        public string Name { get; set; }
    }
}

CatalogResponse.cs

namespace Shared.Models
{
    public class CatalogResponse
    {
        public int OrderId { get; set; }
        public int CatalogId { get; set; }
        public bool IsSuccess { get; set; }
    }
}

Organize Ordering.API

Step 4: Install nuget packages in Ordering.API Project

  • Install following nuget packages in Catalog.API Project
Microsoft.EntityFrameworkCore
Microsoft.EntityFrameworkCore.Sqlite
Microsoft.EntityFrameworkCore.Tools
Plain.RabbitMQ

Step 5: Create CatalogResponseListener class in Ordering.API project

  • Create CatalogResponseListener class in Ordering.API as follows

CatalogResponseListener.cs

using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Newtonsoft.Json;
using Ordering.API.Db;
using Plain.RabbitMQ;
using Shared.Models;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;

namespace Ordering.API
{
    public class CatalogResponseListener : IHostedService
    {
        private ISubscriber _subscriber;
        private readonly IServiceScopeFactory _scopeFactory;
        public CatalogResponseListener(ISubscriber subscripber, IServiceScopeFactory scopeFactory)
        {
            this._subscriber = subscripber;
            this._scopeFactory = scopeFactory;
        }
        public Task StartAsync(CancellationToken cancellationToken)
        {
            _subscriber.Subscribe(Subscribe);
            return Task.CompletedTask;
        }

        private bool Subscribe(string message, IDictionary<string, object> header)
        {
            var response = JsonConvert.DeserializeObject<CatalogResponse>(message);

            if(!response.IsSuccess)
            {
                using (var scope = _scopeFactory.CreateScope())
                {
                    var _orderingContext = scope.ServiceProvider.GetRequiredService<OrderingContext>();
                    
                    // If transaction is not successful, Remove ordering item
                    var orderItem = _orderingContext.OrderItems.Where(o => o.ProductId == response.CatalogId && o.OrderId == response.OrderId).FirstOrDefault();
                    _orderingContext.OrderItems.Remove(orderItem);
                    _orderingContext.SaveChanges();
                }
            }
            return true;
        }

        public Task StopAsync(CancellationToken cancellationToken)
        {
            return Task.CompletedTask;
        }
    }
}

Step 6: Change connection string

  • Add connection string for SQLite in appsettings.json
{
  "Logging": {
    "LogLevel": {
      "Default": "Information",
      "Microsoft": "Warning",
      "Microsoft.Hosting.Lifetime": "Information"
    }
  },
  "ConnectionStrings": {
    "DefaultConnection": "Data Source=db/ordering.db"
  },

  "AllowedHosts": "*"
}

Step 7: Configure Database, listener and RabbitMQ

  • Configure SQLite, listener and RabbitMQ in Startup class

Startup.cs

using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore.Hosting;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.OpenApi.Models;
using Ordering.API.Db;
using Plain.RabbitMQ;
using RabbitMQ.Client;

namespace Ordering.API
{
    public class Startup
    {
        public Startup(IConfiguration configuration)
        {
            Configuration = configuration;
        }

        public IConfiguration Configuration { get; }

        // This method gets called by the runtime. Use this method to add services to the container.
        public void ConfigureServices(IServiceCollection services)
        {

            services.AddControllers();
            services.AddSwaggerGen(c =>
            {
                c.SwaggerDoc("v1", new OpenApiInfo { Title = "Ordering.API", Version = "v1" });
            });

            // Configure database
            services.AddDbContext<OrderingContext>(options => options.UseSqlite(Configuration.GetConnectionString("DefaultConnection")));


            //Configure rabbitmq
            services.AddSingleton<IConnectionProvider>(new ConnectionProvider("amqp://guest:guest@localhost:5672"));

            services.AddSingleton<IPublisher>(p => new Publisher(p.GetService<IConnectionProvider>(),
                "order_exchange", // exchange name
                ExchangeType.Topic));

            services.AddSingleton<ISubscriber>(s => new Subscriber(s.GetService<IConnectionProvider>(),
                "catalog_exchange", // Exchange name
                "catalog_response_queue", //queue name
                "catalog_response_routingkey", // routing key
                ExchangeType.Topic));

            services.AddHostedService<CatalogResponseListener>();
        }

        // This method gets called by the runtime. Use this method to configure the HTTP request pipeline.
        public void Configure(IApplicationBuilder app, IWebHostEnvironment env)
        {
            if (env.IsDevelopment())
            {
                app.UseDeveloperExceptionPage();
                app.UseSwagger();
                app.UseSwaggerUI(c => c.SwaggerEndpoint("/swagger/v1/swagger.json", "Ordering.API v1"));
            }

            app.UseHttpsRedirection();

            app.UseRouting();

            app.UseAuthorization();

            app.UseEndpoints(endpoints =>
            {
                endpoints.MapControllers();
            });
        }
    }
}


Step 8: Create a OrderItem model class

  • Create OrderItem model class in Models folder

OrderItem.cs

using System.ComponentModel.DataAnnotations;
using System.ComponentModel.DataAnnotations.Schema;

namespace Ordering.API.Models
{
    public class OrderItem
    {
        [Key]
        [DatabaseGenerated(DatabaseGeneratedOption.Identity)]
        public int Id { get; set; }
        public int OrderId { get; set; }
        public int ProductId { get; set; }
        public string ProductName { get; set; }
        public decimal UnitPrice { get; set; }
        public int Units { get; set; }
    }
}

Step 9: Create a database context class

  • Create OrderingContext in Db folder

OrderingContext.cs

using Microsoft.EntityFrameworkCore;
using Ordering.API.Models;

namespace Ordering.API.Db
{
    public class OrderingContext : DbContext
    {
        public OrderingContext(DbContextOptions<OrderingContext> options) : base(options)
        {

        }

        public DbSet<OrderItem> OrderItems { get; set; }
    }
}

Step 10: Create controller class

  • Create a controller class and api endpoint in controllers folder

OrderItemsController.cs

using Microsoft.AspNetCore.Mvc;
using Microsoft.EntityFrameworkCore;
using Newtonsoft.Json;
using Ordering.API.Db;
using Ordering.API.Models;
using Plain.RabbitMQ;
using Shared.Models;
using System.Collections.Generic;
using System.Threading.Tasks;

namespace Ordering.API.Controllers
{
    [Route("api/[controller]")]
    [ApiController]
    public class OrderItemsController : ControllerBase
    {
        private readonly OrderingContext _context;
        private readonly IPublisher _publisher;

        public OrderItemsController(OrderingContext context, IPublisher publisher)
        {
            _context = context;
            _publisher = publisher;
        }

        // GET: api/OrderItems
        [HttpGet]
        public async Task<ActionResult<IEnumerable<OrderItem>>> GetOrderItems()
        {
            return await _context.OrderItems.ToListAsync();
        }

        // GET: api/OrderItems/5
        [HttpGet("{id}")]
        public async Task<ActionResult<OrderItem>> GetOrderItem(int id)
        {
            var orderItem = await _context.OrderItems.FindAsync(id);

            if (orderItem == null)
            {
                return NotFound();
            }

            return orderItem;
        }

        
        // POST: api/OrderItems
        // To protect from overposting attacks, see https://go.microsoft.com/fwlink/?linkid=2123754
        [HttpPost]
        public async Task PostOrderItem(OrderItem orderItem)
        {
            _context.OrderItems.Add(orderItem);
            await _context.SaveChangesAsync();

            // New inserted identity value
            int id = orderItem.Id;


            _publisher.Publish(JsonConvert.SerializeObject(new OrderRequest
            {
                OrderId = orderItem.OrderId,
                CatalogId = orderItem.ProductId,
                Units = orderItem.Units,
                Name = orderItem.ProductName
            }),
            "order_created_routingkey", // Routing key
            null);
        }
    }
}

Step 11: Add migration in the application

  • Run the following command in the package manager console
PM> Add-Migration initmig
PM> Update-Database -Verbose

Organize Catalog.API

Step 12: Install nuget packages in Catalog.API

  • Install following nuget packages in Catalog.API project
Microsoft.EntityFrameworkCore
Microsoft.EntityFrameworkCore.Sqlite
Microsoft.EntityFrameworkCore.Tools
Plain.RabbitMQ

Step 13: Create CatalogItem model class

  • Create CatalogItem model class in Models folder

CatalogItem.cs

using System.ComponentModel.DataAnnotations;
using System.ComponentModel.DataAnnotations.Schema;

namespace Catalog.API.Models
{
    public class CatalogItem
    {
        [Key]
        [DatabaseGenerated(DatabaseGeneratedOption.Identity)]
        public int Id { get; set; }
        public string Name { get; set; }
        public string Description { get; set; }
        public decimal Price { get; set; }
        public int AvailableStock { get; set; }
        public int MaxStockThreshold { get; set; }
    }
}

Step 14: Configure connectionstring

  • Configure connectionstring in appsettings.json is as follows
{
  "Logging": {
    "LogLevel": {
      "Default": "Information",
      "Microsoft": "Warning",
      "Microsoft.Hosting.Lifetime": "Information"
    }
  },
  "ConnectionStrings": {
    "DefaultConnection": "Data Source=db/catalog.db"
  },
  "AllowedHosts": "*"
}

Step 15: Add context class in Db folder

  • Add DbContext class in Db folder

CatalogContext.cs

using Catalog.API.Models;
using Microsoft.EntityFrameworkCore;

namespace Catalog.API.Db
{
    public class CatalogContext : DbContext
    {
        public CatalogContext(DbContextOptions<CatalogContext> options) : base(options)
        {

        }

        public DbSet<CatalogItem> CatalogItems { get; set; }
    }
}

Step 16: Add OrderCreatedListener class

  • OrderCreatedListener class in Catalog.API project

OrderCreatedListener.cs

using Catalog.API.Db;
using Catalog.API.Models;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Newtonsoft.Json;
using Plain.RabbitMQ;
using Shared.Models;
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;

namespace Catalog.API
{
    public class OrderCreatedListener : IHostedService
    {
        private readonly ISubscriber _subscribe;
        private readonly IPublisher _publisher;
        private readonly IServiceScopeFactory _scopeFactory;
        public OrderCreatedListener(ISubscriber subscriber, IPublisher publisher, IServiceScopeFactory scopeFactory)
        {
            _subscribe = subscriber;
            _publisher = publisher;
            _scopeFactory = scopeFactory;
        }
        public Task StartAsync(CancellationToken cancellationToken)
        {
            _subscribe.Subscribe(Subscribe);
            return Task.CompletedTask;
        }

        private bool Subscribe(string message, IDictionary<string, object> header)
        {
            var response = JsonConvert.DeserializeObject<OrderRequest>(message);

            using (var scope = _scopeFactory.CreateScope())
            {
                var _context = scope.ServiceProvider.GetRequiredService<CatalogContext>();
                try
                {
                    CatalogItem catalogItem = _context.CatalogItems.Find(response.CatalogId);

                    if (catalogItem == null || catalogItem.AvailableStock < response.Units)
                        throw new Exception();

                    catalogItem.AvailableStock = catalogItem.AvailableStock - response.Units;
                    _context.Entry(catalogItem).State = EntityState.Modified;
                    _context.SaveChanges();

                    _publisher.Publish(JsonConvert.SerializeObject(
                            new CatalogResponse { OrderId = response.OrderId, CatalogId = response.CatalogId, IsSuccess = true }
                        ), "catalog_response_routingkey", null);
                }
                catch (Exception)
                {
                    _publisher.Publish(JsonConvert.SerializeObject(
                    new CatalogResponse { OrderId = response.OrderId, CatalogId = response.CatalogId, IsSuccess = false }
                ), "catalog_response_routingkey", null);
                }
            }

            return true;
        }

        public Task StopAsync(CancellationToken cancellationToken)
        {
            return Task.CompletedTask;
        }
    }
}

Step 17: Register database, rabbitmq and listener in Startup class

Startup.cs

using Catalog.API.Db;
using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore.Hosting;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.OpenApi.Models;
using Plain.RabbitMQ;
using RabbitMQ.Client;

namespace Catalog.API
{
    public class Startup
    {
        public Startup(IConfiguration configuration)
        {
            Configuration = configuration;
        }

        public IConfiguration Configuration { get; }

        // This method gets called by the runtime. Use this method to add services to the container.
        public void ConfigureServices(IServiceCollection services)
        {

            services.AddControllers();
            services.AddSwaggerGen(c =>
            {
                c.SwaggerDoc("v1", new OpenApiInfo { Title = "Catalog.API", Version = "v1" });
            });

            // Configure Sqlite
            services.AddDbContext<CatalogContext>(options => options.UseSqlite(Configuration.GetConnectionString("DefaultConnection")));

            services.AddSingleton<IConnectionProvider>(new ConnectionProvider("amqp://guest:guest@localhost:5672"));
            services.AddSingleton<IPublisher>(p => new Publisher(p.GetService<IConnectionProvider>(),
                "catalog_exchange",
                ExchangeType.Topic));

            services.AddSingleton<ISubscriber>(s => new Subscriber(s.GetService<IConnectionProvider>(),
                "order_exchange",
                "order_response_queue",
                "order_created_routingkey",
                ExchangeType.Topic
                ));

            services.AddHostedService<OrderCreatedListener>();
        }

        // This method gets called by the runtime. Use this method to configure the HTTP request pipeline.
        public void Configure(IApplicationBuilder app, IWebHostEnvironment env)
        {
            if (env.IsDevelopment())
            {
                app.UseDeveloperExceptionPage();
                app.UseSwagger();
                app.UseSwaggerUI(c => c.SwaggerEndpoint("/swagger/v1/swagger.json", "Catalog.API v1"));
            }

            app.UseHttpsRedirection();

            app.UseRouting();

            app.UseAuthorization();

            app.UseEndpoints(endpoints =>
            {
                endpoints.MapControllers();
            });
        }
    }
}

Step 18: Add migration in the application

  • Run the following command in the package manager console
PM> Add-Migration initmig
PM> Update-Database -Verbose
  • Run both project and place an order using swagger. Observe what happen when transaction fails. Thanks for your patience.

Source code