Distributed transaction using SAGA pattern, RabbitMQ and asp.net core
Distributed transaction is one that spans multiple databases across the network while preserving ACID properties. It is very important in Microservices because of its distributed nature. To manage data consistency we may use SAGA design pattern. In this article, I will show you distributed transaction using SAGA pattern, RabbitMQ and asp.net core.
Saga Pattern
A saga is a sequence of local transactions. Each local transaction updates the local database and publishes a messages or event to message broker for updating next corresponding database. If next database transaction fails, a series of transactions will occur to undo the changes.
Saga is implemented in two ways -
- Choreography
- Orchestration
Choreography In choreography, participants exchange events without a centralized control.
Fig - Choreography saga (Collected)
Orchestration In orchestration, participants exchange events with a centralized control.
Fig - Orchestration saga (Collected)
Implementation of Choregraphy Pattern
Tools and technology used
- Visual studio 2019
- SQLite
- ASP.NET Core
- RabbitMQ
Step 1: Run docker container for RabbitMQ
- Run the following command to run rabbitmq in a container
docker run -d --hostname host-rabbit --name ecommerce-rabbit -p 15672:15672 -p 5672:5672 rabbitmq:3-management
Step 2: Create two web api application
- Create a solution name Ecommerece.sln
- Create a web api application name - Catalog.API and add add it to the solution.
- Create another web api application name - Order.API and add it to the solution.
- Create a class library name - Shared and add it to the solution.
Step 3: Create Shared model classes
- Create following two model classes in Shared->Models folder
OrderRequest.cs
namespace Shared.Models
{
public class OrderRequest
{
public int OrderId { get; set; }
public int CatalogId { get; set; }
public int Units { get; set; }
public string Name { get; set; }
}
}
CatalogResponse.cs
namespace Shared.Models
{
public class CatalogResponse
{
public int OrderId { get; set; }
public int CatalogId { get; set; }
public bool IsSuccess { get; set; }
}
}
Organize Ordering.API
Step 4: Install nuget packages in Ordering.API Project
- Install following nuget packages in Catalog.API Project
Microsoft.EntityFrameworkCore
Microsoft.EntityFrameworkCore.Sqlite
Microsoft.EntityFrameworkCore.Tools
Plain.RabbitMQ
Step 5: Create CatalogResponseListener class in Ordering.API project
- Create CatalogResponseListener class in Ordering.API as follows
CatalogResponseListener.cs
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Newtonsoft.Json;
using Ordering.API.Db;
using Plain.RabbitMQ;
using Shared.Models;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
namespace Ordering.API
{
public class CatalogResponseListener : IHostedService
{
private ISubscriber _subscriber;
private readonly IServiceScopeFactory _scopeFactory;
public CatalogResponseListener(ISubscriber subscripber, IServiceScopeFactory scopeFactory)
{
this._subscriber = subscripber;
this._scopeFactory = scopeFactory;
}
public Task StartAsync(CancellationToken cancellationToken)
{
_subscriber.Subscribe(Subscribe);
return Task.CompletedTask;
}
private bool Subscribe(string message, IDictionary<string, object> header)
{
var response = JsonConvert.DeserializeObject<CatalogResponse>(message);
if(!response.IsSuccess)
{
using (var scope = _scopeFactory.CreateScope())
{
var _orderingContext = scope.ServiceProvider.GetRequiredService<OrderingContext>();
// If transaction is not successful, Remove ordering item
var orderItem = _orderingContext.OrderItems.Where(o => o.ProductId == response.CatalogId && o.OrderId == response.OrderId).FirstOrDefault();
_orderingContext.OrderItems.Remove(orderItem);
_orderingContext.SaveChanges();
}
}
return true;
}
public Task StopAsync(CancellationToken cancellationToken)
{
return Task.CompletedTask;
}
}
}
Step 6: Change connection string
- Add connection string for SQLite in appsettings.json
{
"Logging": {
"LogLevel": {
"Default": "Information",
"Microsoft": "Warning",
"Microsoft.Hosting.Lifetime": "Information"
}
},
"ConnectionStrings": {
"DefaultConnection": "Data Source=db/ordering.db"
},
"AllowedHosts": "*"
}
Step 7: Configure Database, listener and RabbitMQ
- Configure SQLite, listener and RabbitMQ in Startup class
Startup.cs
using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore.Hosting;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.OpenApi.Models;
using Ordering.API.Db;
using Plain.RabbitMQ;
using RabbitMQ.Client;
namespace Ordering.API
{
public class Startup
{
public Startup(IConfiguration configuration)
{
Configuration = configuration;
}
public IConfiguration Configuration { get; }
// This method gets called by the runtime. Use this method to add services to the container.
public void ConfigureServices(IServiceCollection services)
{
services.AddControllers();
services.AddSwaggerGen(c =>
{
c.SwaggerDoc("v1", new OpenApiInfo { Title = "Ordering.API", Version = "v1" });
});
// Configure database
services.AddDbContext<OrderingContext>(options => options.UseSqlite(Configuration.GetConnectionString("DefaultConnection")));
//Configure rabbitmq
services.AddSingleton<IConnectionProvider>(new ConnectionProvider("amqp://guest:guest@localhost:5672"));
services.AddSingleton<IPublisher>(p => new Publisher(p.GetService<IConnectionProvider>(),
"order_exchange", // exchange name
ExchangeType.Topic));
services.AddSingleton<ISubscriber>(s => new Subscriber(s.GetService<IConnectionProvider>(),
"catalog_exchange", // Exchange name
"catalog_response_queue", //queue name
"catalog_response_routingkey", // routing key
ExchangeType.Topic));
services.AddHostedService<CatalogResponseListener>();
}
// This method gets called by the runtime. Use this method to configure the HTTP request pipeline.
public void Configure(IApplicationBuilder app, IWebHostEnvironment env)
{
if (env.IsDevelopment())
{
app.UseDeveloperExceptionPage();
app.UseSwagger();
app.UseSwaggerUI(c => c.SwaggerEndpoint("/swagger/v1/swagger.json", "Ordering.API v1"));
}
app.UseHttpsRedirection();
app.UseRouting();
app.UseAuthorization();
app.UseEndpoints(endpoints =>
{
endpoints.MapControllers();
});
}
}
}
Step 8: Create a OrderItem model class
- Create OrderItem model class in Models folder
OrderItem.cs
using System.ComponentModel.DataAnnotations;
using System.ComponentModel.DataAnnotations.Schema;
namespace Ordering.API.Models
{
public class OrderItem
{
[Key]
[DatabaseGenerated(DatabaseGeneratedOption.Identity)]
public int Id { get; set; }
public int OrderId { get; set; }
public int ProductId { get; set; }
public string ProductName { get; set; }
public decimal UnitPrice { get; set; }
public int Units { get; set; }
}
}
Step 9: Create a database context class
- Create OrderingContext in Db folder
OrderingContext.cs
using Microsoft.EntityFrameworkCore;
using Ordering.API.Models;
namespace Ordering.API.Db
{
public class OrderingContext : DbContext
{
public OrderingContext(DbContextOptions<OrderingContext> options) : base(options)
{
}
public DbSet<OrderItem> OrderItems { get; set; }
}
}
Step 10: Create controller class
- Create a controller class and api endpoint in controllers folder
OrderItemsController.cs
using Microsoft.AspNetCore.Mvc;
using Microsoft.EntityFrameworkCore;
using Newtonsoft.Json;
using Ordering.API.Db;
using Ordering.API.Models;
using Plain.RabbitMQ;
using Shared.Models;
using System.Collections.Generic;
using System.Threading.Tasks;
namespace Ordering.API.Controllers
{
[Route("api/[controller]")]
[ApiController]
public class OrderItemsController : ControllerBase
{
private readonly OrderingContext _context;
private readonly IPublisher _publisher;
public OrderItemsController(OrderingContext context, IPublisher publisher)
{
_context = context;
_publisher = publisher;
}
// GET: api/OrderItems
[HttpGet]
public async Task<ActionResult<IEnumerable<OrderItem>>> GetOrderItems()
{
return await _context.OrderItems.ToListAsync();
}
// GET: api/OrderItems/5
[HttpGet("{id}")]
public async Task<ActionResult<OrderItem>> GetOrderItem(int id)
{
var orderItem = await _context.OrderItems.FindAsync(id);
if (orderItem == null)
{
return NotFound();
}
return orderItem;
}
// POST: api/OrderItems
// To protect from overposting attacks, see https://go.microsoft.com/fwlink/?linkid=2123754
[HttpPost]
public async Task PostOrderItem(OrderItem orderItem)
{
_context.OrderItems.Add(orderItem);
await _context.SaveChangesAsync();
// New inserted identity value
int id = orderItem.Id;
_publisher.Publish(JsonConvert.SerializeObject(new OrderRequest
{
OrderId = orderItem.OrderId,
CatalogId = orderItem.ProductId,
Units = orderItem.Units,
Name = orderItem.ProductName
}),
"order_created_routingkey", // Routing key
null);
}
}
}
Step 11: Add migration in the application
- Run the following command in the package manager console
PM> Add-Migration initmig
PM> Update-Database -Verbose
Organize Catalog.API
Step 12: Install nuget packages in Catalog.API
- Install following nuget packages in Catalog.API project
Microsoft.EntityFrameworkCore
Microsoft.EntityFrameworkCore.Sqlite
Microsoft.EntityFrameworkCore.Tools
Plain.RabbitMQ
Step 13: Create CatalogItem model class
- Create CatalogItem model class in Models folder
CatalogItem.cs
using System.ComponentModel.DataAnnotations;
using System.ComponentModel.DataAnnotations.Schema;
namespace Catalog.API.Models
{
public class CatalogItem
{
[Key]
[DatabaseGenerated(DatabaseGeneratedOption.Identity)]
public int Id { get; set; }
public string Name { get; set; }
public string Description { get; set; }
public decimal Price { get; set; }
public int AvailableStock { get; set; }
public int MaxStockThreshold { get; set; }
}
}
Step 14: Configure connectionstring
- Configure connectionstring in appsettings.json is as follows
{
"Logging": {
"LogLevel": {
"Default": "Information",
"Microsoft": "Warning",
"Microsoft.Hosting.Lifetime": "Information"
}
},
"ConnectionStrings": {
"DefaultConnection": "Data Source=db/catalog.db"
},
"AllowedHosts": "*"
}
Step 15: Add context class in Db folder
- Add DbContext class in Db folder
CatalogContext.cs
using Catalog.API.Models;
using Microsoft.EntityFrameworkCore;
namespace Catalog.API.Db
{
public class CatalogContext : DbContext
{
public CatalogContext(DbContextOptions<CatalogContext> options) : base(options)
{
}
public DbSet<CatalogItem> CatalogItems { get; set; }
}
}
Step 16: Add OrderCreatedListener class
- OrderCreatedListener class in Catalog.API project
OrderCreatedListener.cs
using Catalog.API.Db;
using Catalog.API.Models;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Newtonsoft.Json;
using Plain.RabbitMQ;
using Shared.Models;
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
namespace Catalog.API
{
public class OrderCreatedListener : IHostedService
{
private readonly ISubscriber _subscribe;
private readonly IPublisher _publisher;
private readonly IServiceScopeFactory _scopeFactory;
public OrderCreatedListener(ISubscriber subscriber, IPublisher publisher, IServiceScopeFactory scopeFactory)
{
_subscribe = subscriber;
_publisher = publisher;
_scopeFactory = scopeFactory;
}
public Task StartAsync(CancellationToken cancellationToken)
{
_subscribe.Subscribe(Subscribe);
return Task.CompletedTask;
}
private bool Subscribe(string message, IDictionary<string, object> header)
{
var response = JsonConvert.DeserializeObject<OrderRequest>(message);
using (var scope = _scopeFactory.CreateScope())
{
var _context = scope.ServiceProvider.GetRequiredService<CatalogContext>();
try
{
CatalogItem catalogItem = _context.CatalogItems.Find(response.CatalogId);
if (catalogItem == null || catalogItem.AvailableStock < response.Units)
throw new Exception();
catalogItem.AvailableStock = catalogItem.AvailableStock - response.Units;
_context.Entry(catalogItem).State = EntityState.Modified;
_context.SaveChanges();
_publisher.Publish(JsonConvert.SerializeObject(
new CatalogResponse { OrderId = response.OrderId, CatalogId = response.CatalogId, IsSuccess = true }
), "catalog_response_routingkey", null);
}
catch (Exception)
{
_publisher.Publish(JsonConvert.SerializeObject(
new CatalogResponse { OrderId = response.OrderId, CatalogId = response.CatalogId, IsSuccess = false }
), "catalog_response_routingkey", null);
}
}
return true;
}
public Task StopAsync(CancellationToken cancellationToken)
{
return Task.CompletedTask;
}
}
}
Step 17: Register database, rabbitmq and listener in Startup class
Startup.cs
using Catalog.API.Db;
using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore.Hosting;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.OpenApi.Models;
using Plain.RabbitMQ;
using RabbitMQ.Client;
namespace Catalog.API
{
public class Startup
{
public Startup(IConfiguration configuration)
{
Configuration = configuration;
}
public IConfiguration Configuration { get; }
// This method gets called by the runtime. Use this method to add services to the container.
public void ConfigureServices(IServiceCollection services)
{
services.AddControllers();
services.AddSwaggerGen(c =>
{
c.SwaggerDoc("v1", new OpenApiInfo { Title = "Catalog.API", Version = "v1" });
});
// Configure Sqlite
services.AddDbContext<CatalogContext>(options => options.UseSqlite(Configuration.GetConnectionString("DefaultConnection")));
services.AddSingleton<IConnectionProvider>(new ConnectionProvider("amqp://guest:guest@localhost:5672"));
services.AddSingleton<IPublisher>(p => new Publisher(p.GetService<IConnectionProvider>(),
"catalog_exchange",
ExchangeType.Topic));
services.AddSingleton<ISubscriber>(s => new Subscriber(s.GetService<IConnectionProvider>(),
"order_exchange",
"order_response_queue",
"order_created_routingkey",
ExchangeType.Topic
));
services.AddHostedService<OrderCreatedListener>();
}
// This method gets called by the runtime. Use this method to configure the HTTP request pipeline.
public void Configure(IApplicationBuilder app, IWebHostEnvironment env)
{
if (env.IsDevelopment())
{
app.UseDeveloperExceptionPage();
app.UseSwagger();
app.UseSwaggerUI(c => c.SwaggerEndpoint("/swagger/v1/swagger.json", "Catalog.API v1"));
}
app.UseHttpsRedirection();
app.UseRouting();
app.UseAuthorization();
app.UseEndpoints(endpoints =>
{
endpoints.MapControllers();
});
}
}
}
Step 18: Add migration in the application
- Run the following command in the package manager console
PM> Add-Migration initmig
PM> Update-Database -Verbose
- Run both project and place an order using swagger. Observe what happen when transaction fails. Thanks for your patience.