Application of Event Sourcing using asp.net core, DDD, EF, EventStoreDB and SQL Server
Introduction
Event sourcing is a most important pattern to design a microservice based application. If you are working with multiple services in a microservice based application, you have to use event driven architecture. In this article I will discuss and apply event sourcing using asp.net core, DDD and EventStoreDB.
Domain Driven Design (DDD)_
Domain-driven design (DDD) is a major software design approach, focusing on modeling software to match a domain according to input from that domain’s experts. Under domain-driven design, the structure and language of software code (class names, class methods, class variables) should match the business domain. (Wikipedia)
Event Sourcing
Event sourcing is a technique to store all events of an Object to get all of its versions. Event sourcing pattern is used to implement microservice based application. Using this pattern, we can track the changes of an object in its lifecycle.
EventStoreDB EventStoreDB specially built for Event Sourcing. It is a NoSQL database. This is a one-way database – we can only insert data into database.
Implementation
Let’s implement Event Sourcing using DDD and EventStoreDB.
Tools and Technologies Used
- Visual Studio 2022
- .NET 6.0
- ASP.NET Core Web API
- Visual C#
- DDD
- EventStoreDB
- SQL Server
Step 1: Install EventStoreDB
-
You can install EventStoreDB using EventStoreDB documentation. Visit the following link : https://developers.eventstore.com/server/v20.10/installation.html#quick-start
-
Or, you can run docker image of EventStoreDB as below.
docker run --name esdb-node -it -p 2113:2113 -p 1113:1113 eventstore/eventstore:latest --insecure --run-projections=All --enable-external-tcp --enable-atom-pub-over-http
- Here I used docker images for EventStoreDB. After running the above command, browse EventStoreDB using the following link.
http://localhost:2113/web/index.html#/dashboard
Step 2: Create solution and projects
- Create a solution name EventSourcing.
- Add a new web api projects name - Catalog.API
- Add three class library projects name – Application, Domain, Infrastructure
Step 3: Install nuget packages
- Install following nuget packages in Catalog.API
PM> Install-Package Microsoft.EntityFrameworkCore.Tools
- Install following nuget packages in Application
PM> Install-Package MediatR
PM> Install-Package MediatR.Extensions.Microsoft.DependencyInjection
PM> Install-Package Microsoft.Extensions.DependencyInjection.Abstractions
PM> Install-Package Newtonsoft.Json
- Install following nuget packages in Infrastructure
PM> Install-Package EventStore.Client.Grpc.Streams
PM> Install-Package Microsoft.EntityFrameworkCore
PM> Install-Package Microsoft.Extensions.Configuration.Abstractions
PM> Install-Package Microsoft.Extensions.DependencyInjection.Abstractions
PM> Install-Package Newtonsoft.Json
Step 4: Organize Domain
- Create IBaseEntity interface for Base entity in Entities->Common folder.
IBaseEntity.cs
namespace Domain.Entities.Common
{
/// Ref: Coverience in C#
/// <summary>
/// Interface for Base Entity
/// </summary>
/// <typeparam name="TKey"></typeparam>
public interface IBaseEntity<out TKey>
{
TKey Id { get; }
}
}
- Create IDomainEvent interface for domain event in Entities->Common folder
IDomainEvent.cs
namespace Domain.Entities.Common
{
/// <summary>
/// Interface for Domain Event
/// </summary>
/// <typeparam name="TKey"></typeparam>
public interface IDomainEvent<out TKey>
{
public long AggregateVersion { get; }
TKey AggregateId { get; }
DateTime TimeStamp { get; }
}
}
- Create IAggregateRoot interface for Aggregate Root in Entities->Common folder. IAggregateRoot is the combination of IBaseEntity and IAggregateRoot
IAggregateRoot.cs
namespace Domain.Entities.Common
{
/// <summary>
/// Interface for AggregateRoot
/// IAggregateRoot is the combination of IBaseEntity and IAggregateRoot
/// </summary>
/// <typeparam name="TKey"></typeparam>
public interface IAggregateRoot<out TKey> : IBaseEntity<TKey>
{
long Version { get; }
IReadOnlyCollection<IDomainEvent<TKey>> Events { get; }
void ClearEvents();
}
}
- Create BaseEntity class for Base Aggregate Root in Entities->Common folder. BaseEntity’s properties is shared in all entity classes.
BaseEntity.cs
namespace Domain.Entities.Common
{
/// <summary>
/// Base class for BaseAggregateRoot class
/// Shared for all entities
/// </summary>
/// <typeparam name="TKey"></typeparam>
public abstract class BaseEntity<TKey> : IBaseEntity<TKey>
{
protected BaseEntity() { }
protected BaseEntity(TKey id) => Id = id;
//Implementation of interface
public TKey Id { get; protected set; }
}
}
- Create BaseDomainEvent class for all domain event class in Entities->Common folder.
BaseDomainEvent.cs
namespace Domain.Entities.Common
{
/// <summary>
/// Base domain event for all domain event
/// </summary>
/// <typeparam name="TA"></typeparam>
/// <typeparam name="TKey"></typeparam>
public class BaseDomainEvent<TA, TKey> : IDomainEvent<TKey> where TA : IAggregateRoot<TKey>
{
protected BaseDomainEvent() { }
public BaseDomainEvent(TA aggregateRoot)
{
if(aggregateRoot is null)
{
throw new ArgumentNullException(nameof(aggregateRoot));
}
AggregateId = aggregateRoot.Id;
AggregateVersion = aggregateRoot.Version;
TimeStamp = DateTime.Now;
}
//Implementation of IDomainEvent
public long AggregateVersion { get; private set; }
//Implementation of IDomainEvent
public TKey AggregateId { get; private set; }
//Implementation of IDomainEvent
public DateTime TimeStamp { get; private set; }
}
}
- Create BaseAggregateRoot class for all aggregate root in Entities->Common folder. Here _event queue is used to queue all events of the aggregate root. AddEvent method used to add new event.
BaseAggregateRoot.cs
using System.Collections.Immutable;
using System.Reflection;
namespace Domain.Entities.Common
{
public abstract class BaseAggregateRoot<TA, TKey> : BaseEntity<TKey>, IAggregateRoot<TKey> where TA : IAggregateRoot<TKey>
{
// Queuing all events
private readonly Queue<IDomainEvent<TKey>> _events = new Queue<IDomainEvent<TKey>>();
protected BaseAggregateRoot() { }
protected BaseAggregateRoot(TKey id) : base(id)
{
}
protected void AddEvent(IDomainEvent<TKey> @event)
{
if(@event == null)
{
throw new ArgumentNullException(nameof(@event));
}
_events.Enqueue(@event);
Apply(@event);
Version++;
}
/// <summary>
/// This method is implemented in the derived class
/// Apply this method to implement different events
/// </summary>
/// <param name="event"></param>
protected abstract void Apply(IDomainEvent<TKey> @event);
// Implementation of IAggregateRoot
// Aggregate version
public long Version{ get; private set; }
// Implementation of IAggregateRoot
public IReadOnlyCollection<IDomainEvent<TKey>> Events => _events.ToImmutableArray();
// Implementation of IAggregateRoot
public void ClearEvents()
{
_events.Clear();
}
#region Factory
private static readonly ConstructorInfo CTor;
static BaseAggregateRoot()
{
var aggregateType = typeof(TA);
CTor = aggregateType.GetConstructor(BindingFlags.Instance | BindingFlags.NonPublic | BindingFlags.Public,
null, new Type[0], new ParameterModifier[0]);
if (null == CTor)
throw new InvalidOperationException($"Unable to find required private parameterless constructor for Aggregate of type '{aggregateType.Name}'");
}
/// <summary>
/// Create Base Aggregate root when Rehydrate all Events
/// </summary>
/// <param name="events"></param>
/// <returns></returns>
/// <exception cref="ArgumentNullException"></exception>
public static TA Create(IEnumerable<IDomainEvent<TKey>> events)
{
if (null == events || !events.Any())
throw new ArgumentNullException(nameof(events));
var result = (TA)CTor.Invoke(new object[0]);
// Problem is in here
var baseAggregate = result as BaseAggregateRoot<TA, TKey>;
if (baseAggregate != null)
foreach (var @event in events)
baseAggregate.AddEvent(@event);
result.ClearEvents();
return result;
}
#endregion
}
}
- Create CatalogItemCreated Event class in Events->CatalogItem folder
CatalogItemCreated.cs
using Domain.Entities.Common;
namespace Domain.Events.CatalogItem
{
/// <summary>
/// Catalog item created event
/// </summary>
public class CatalogItemCreated: BaseDomainEvent<Entities.CatalogItem, Guid>
{
private CatalogItemCreated()
{
}
public CatalogItemCreated(Entities.CatalogItem catalogItem) : base(catalogItem)
{
//Id = catalogItem.Id;
Name = catalogItem.Name;
Description = catalogItem.Description;
Price = catalogItem.Price;
AvailableStock = catalogItem.AvailableStock;
RestockThreshold = catalogItem.RestockThreshold;
MaxStockThreshold = catalogItem.MaxStockThreshold;
OnReorder = catalogItem.OnReorder;
IsDeleted = catalogItem.IsDeleted;
}
//public Guid Id { get; set; }
public string Name { get; private set; }
public string Description { get; private set; }
public double Price { get; private set; }
// Quantity in stock
public int AvailableStock { get; private set; }
// Available stock at which we should reorder
public int RestockThreshold { get; private set; }
// Maximum number of units that can be in-stock at any time (due to physicial/logistical constraints in warehouses)
public int MaxStockThreshold { get; private set; }
/// <summary>
/// True if item is on reorder
/// </summary>
public bool OnReorder { get; private set; }
public bool IsDeleted { get; private set; }
}
}
- Create CatalogItemDeleted Event class in Events->CatalogItem folder
CatalogItemDeleted.cs
using Domain.Entities.Common;
namespace Domain.Events.CatalogItem
{
/// <summary>
/// Catalog item created event
/// </summary>
public class CatalogItemDeleted: BaseDomainEvent<Entities.CatalogItem, Guid>
{
private CatalogItemDeleted()
{
}
public CatalogItemDeleted(Entities.CatalogItem catalogItem) : base(catalogItem)
{
IsDeleted = catalogItem.IsDeleted;
}
public bool IsDeleted { get; private set; }
}
}
- Create CatalogItemUpdated Event class in Events->CatalogItem folder
CatalogItemUpdated.cs
using Domain.Entities.Common;
namespace Domain.Events.CatalogItem
{
/// <summary>
/// Catalog item created event
/// </summary>
public class CatalogItemUpdated: BaseDomainEvent<Entities.CatalogItem, Guid>
{
private CatalogItemUpdated()
{
}
public CatalogItemUpdated(Entities.CatalogItem catalogItem) : base(catalogItem)
{
//Id = catalogItem.Id;
Name = catalogItem.Name;
Description = catalogItem.Description;
Price = catalogItem.Price;
AvailableStock = catalogItem.AvailableStock;
RestockThreshold = catalogItem.RestockThreshold;
MaxStockThreshold = catalogItem.MaxStockThreshold;
OnReorder = catalogItem.OnReorder;
}
//public Guid Id { get; set; }
public string Name { get; private set; }
public string Description { get; private set; }
public double Price { get; private set; }
// Quantity in stock
public int AvailableStock { get; private set; }
// Available stock at which we should reorder
public int RestockThreshold { get; private set; }
// Maximum number of units that can be in-stock at any time (due to physicial/logistical constraints in warehouses)
public int MaxStockThreshold { get; private set; }
/// <summary>
/// True if item is on reorder
/// </summary>
public bool OnReorder { get; private set; }
}
}
- Create CatalogItem Aggregate Root class in Entities folder. Here we used DDD so CatalogItem is the main domain class here. Create, update, delete is performed using this class.
CatalogItem.cs
using Domain.Entities.Common;
using Domain.Events.CatalogItem;
namespace Domain.Entities
{
public class CatalogItem : BaseAggregateRoot<CatalogItem, Guid>
{
private CatalogItem()
{
}
public CatalogItem(Guid id, string name, string description, double price, int availableStock,
int restockThreshold, int maxStockThreshold, bool onReorder) : base(id)
{
Id = id;
Name = name;
Description = description;
Price = price;
AvailableStock = availableStock;
RestockThreshold = restockThreshold;
MaxStockThreshold = maxStockThreshold;
OnReorder = onReorder;
if (Version > 0)
{
throw new Exception("Catalog item already created");
}
if (string.IsNullOrEmpty(name))
{
//Validation Exception will be placed here
throw new Exception("Name Can not be Empty");
}
if (price <= 0)
{
//Validation Exception will be placed here
throw new Exception("Price must be positive value");
}
// Add CatalogItem Event Here to create
AddEvent(new CatalogItemCreated(this));
}
public string Name { get; private set; }
public string Description { get; private set; }
public double Price { get; private set; }
// Quantity in stock
public int AvailableStock { get; private set; }
// Available stock at which we should reorder
public int RestockThreshold { get; private set; }
// Maximum number of units that can be in-stock at any time (due to physicial/logistical constraints in warehouses)
public int MaxStockThreshold { get; private set; }
/// <summary>
/// True if item is on reorder
/// </summary>
public bool OnReorder { get; private set; }
public bool IsDeleted { get; private set; } = false;
public static CatalogItem Create(string name, string description, double price, int availableStock,
int restockThreshold, int maxStockThreshold, bool onReorder)
{
return new CatalogItem(Guid.NewGuid(), name, description, price, availableStock, restockThreshold, maxStockThreshold, onReorder); ;
}
public void Update(Guid id, string name, string description, double price, int availableStock,
int restockThreshold, int maxStockThreshold, bool onReorder)
{
Id = id;
Name = name;
Description = description;
Price = price;
AvailableStock = availableStock;
RestockThreshold = restockThreshold;
MaxStockThreshold = maxStockThreshold;
OnReorder = onReorder;
AddEvent(new CatalogItemUpdated(this));
}
public void Delete(Guid id)
{
Id = id;
IsDeleted = true;
AddEvent(new CatalogItemDeleted(this));
}
protected override void Apply(IDomainEvent<Guid> @event)
{
switch (@event)
{
case CatalogItemCreated catalogItemCreated: OnCatalogItemCreated(catalogItemCreated); break;
case CatalogItemUpdated catalogItemUpdated: OnCatalogItemUpdated(catalogItemUpdated); break;
case CatalogItemDeleted catalogItemDeleted:
IsDeleted = catalogItemDeleted.IsDeleted;
break;
}
}
// On Catalog Item Created Event
private void OnCatalogItemCreated(CatalogItemCreated catalogItemCreated)
{
Id = catalogItemCreated.AggregateId; // Must have ID
Name = catalogItemCreated.Name;
Description= catalogItemCreated.Description;
Price = catalogItemCreated.Price;
AvailableStock = catalogItemCreated.AvailableStock;
RestockThreshold = catalogItemCreated.RestockThreshold;
MaxStockThreshold = catalogItemCreated.MaxStockThreshold;
OnReorder = catalogItemCreated.OnReorder;
}
// On Catalog Item Updated Event
private void OnCatalogItemUpdated(CatalogItemUpdated catalogItemUpdated)
{
Name = catalogItemUpdated.Name;
Description = catalogItemUpdated.Description;
Price = catalogItemUpdated.Price;
AvailableStock = catalogItemUpdated.AvailableStock;
RestockThreshold = catalogItemUpdated.RestockThreshold;
MaxStockThreshold = catalogItemUpdated.MaxStockThreshold;
OnReorder = catalogItemUpdated.OnReorder;
}
}
}
Step 5: Organize Application
- Add domain as a reference project in Application project. You may add using project file as follows.
<ItemGroup>
<ProjectReference Include="..\Domain\Domain.csproj" />
</ItemGroup>
- Create CreateCatalogItemDTO class in Common->DTOs folder
CreateCatalogItemDTO.cs
namespace Application.Common.DTOs
{
public class CreateCatalogItemDTO
{
public int Id { get; set; }
public string Name { get; set; }
public string Description { get; set; }
public double Price { get; set; }
// Quantity in stock
public int AvailableStock { get; set; }
// Available stock at which we should reorder
public int RestockThreshold { get; set; }
// Maximum number of units that can be in-stock at any time (due to physicial/logistical constraints in warehouses)
public int MaxStockThreshold { get; set; }
/// <summary>
/// True if item is on reorder
/// </summary>
public bool OnReorder { get; set; }
}
}
- Create UpdateCatalogItemDTO class in Common->DTOs folder
UpdateCatalogItemDTO.cs
namespace Application.Common.DTOs
{
public class UpdateCatalogItemDTO
{
public Guid Id { get; private set; }
public string Name { get; set; }
public string Description { get; set; }
public double Price { get; set; }
// Quantity in stock
public int AvailableStock { get; set; }
// Available stock at which we should reorder
public int RestockThreshold { get; set; }
// Maximum number of units that can be in-stock at any time (due to physicial/logistical constraints in warehouses)
public int MaxStockThreshold { get; set; }
/// <summary>
/// True if item is on reorder
/// </summary>
public bool OnReorder { get; set; }
}
}
- Create IAggregateRepository Interface in Common->Interfaces folder. This interface is used for event sourcing. Here, AppendAsync method is used to append events to store in event store database, RehydreateAsync method is used to read all events using aggregate id and ReadEventsAsync method is used to read events as a log and return in to dictionary.
IAggregateRepository.cs
using Domain.Entities.Common;
namespace Application.Common.Interfaces
{
public interface IAggregateRepository<TA, TKey> where TA : class, IAggregateRoot<TKey>
{
// Append events to store in event store database
Task AppendAsync(TA aggregate, CancellationToken cancellationToken = default);
// Read all events using aggregate ID
Task<TA?> RehydreateAsync(TKey aggregateId, CancellationToken cancellationToken = default);
// Read events as a log and return into a dictionary
Task<Dictionary<int, object>> ReadEventsAsync(TKey aggregateId, CancellationToken cancellationToken = default);
}
}
- Create ICatalogItemRepository interface to insert and update into sql server. Since, we use soft delete so here delete means update of isDelete filed to 1.
ICatalogItemRepository.cs
using Domain.Entities;
namespace Application.Common.Interfaces
{
public interface ICatalogItemRepository
{
Task AddAsync(CatalogItem catalogItem);
Task UpdateAsync(CatalogItem catalogItem);
}
}
- Create PrivateSetterContractResolver class in Common->Resolvers folder. This is a Custom Contract Resolver to Set Private property when Rehydrate Events
PrivateSetterContractResolver.cs
using Newtonsoft.Json;
using Newtonsoft.Json.Serialization;
using System.Reflection;
namespace Application.Common.Resolvers
{
/// <summary>
/// Custom Contract Resolver to Set Private property when Rehydrate Events
/// </summary>
public class PrivateSetterContractResolver : DefaultContractResolver
{
protected override JsonProperty CreateProperty(MemberInfo member, MemberSerialization memberSerialization)
{
var jsonProperty = base.CreateProperty(member, memberSerialization);
if(jsonProperty.Writable)
return jsonProperty;
if(member is PropertyInfo propertyInfo)
{
var setter = propertyInfo.GetSetMethod(true);
jsonProperty.Writable = setter != null;
}
return jsonProperty;
}
//EnvironmentVariableTarget jsonProperty = base.CreateProperty();
}
}
- Create CreateCatalogItemCommand and CreateCatalogItemCommandHandler class in Commands->CatalogItems folder.
CreateCatalogItemCommand.cs
using Application.Common.Interfaces;
using Domain.Entities;
using MediatR;
namespace Application.Commands.CatalogItems
{
public class CreateCatalogItemCommand : INotification
{
public CreateCatalogItemCommand(string name, string description, double price, int availableStock, int restockThreshold,
int maxStockThreshold, bool onReorder)
{
Name = name;
Description = description;
Price = price;
AvailableStock = availableStock;
RestockThreshold = restockThreshold;
MaxStockThreshold = maxStockThreshold;
OnReorder = onReorder;
}
public string Name { get; private set; }
public string Description { get; private set; }
public double Price { get; private set; }
// Quantity in stock
public int AvailableStock { get; private set; }
// Available stock at which we should reorder
public int RestockThreshold { get; private set; }
// Maximum number of units that can be in-stock at any time (due to physicial/logistical constraints in warehouses)
public int MaxStockThreshold { get; private set; }
/// <summary>
/// True if item is on reorder
/// </summary>
public bool OnReorder { get; private set; }
}
public class CreateCatalogItemCommandHandler : INotificationHandler<CreateCatalogItemCommand>
{
private readonly IAggregateRepository<CatalogItem, Guid> _aggregateRepository;
private readonly ICatalogItemRepository _catalogItemRepository;
public CreateCatalogItemCommandHandler(IAggregateRepository<CatalogItem, Guid> aggregateRepository,
ICatalogItemRepository catalogItemRepository)
{
_aggregateRepository = aggregateRepository;
_catalogItemRepository = catalogItemRepository;
}
public async Task Handle(CreateCatalogItemCommand notification, CancellationToken cancellationToken)
{
// Insert event into eventstore db
var catalogItem = CatalogItem.Create(notification.Name, notification.Description, notification.Price, notification.AvailableStock,
notification.RestockThreshold, notification.MaxStockThreshold, notification.OnReorder);
await _aggregateRepository.AppendAsync(catalogItem);
// Save data into database
//await _catalogItemAggregateRepository.SaveAsync(catalogItem.Events.FirstOrDefault());
await _catalogItemRepository.AddAsync(catalogItem);
// Dispatch events to any event/service bus to do next actions
// We can also register EventStore db Subscription to receive Event Notification
}
}
}
- Create UpdateCatalogItemCommand and UpdateCatalogItemCommandHandler class in Commands->CatalogItems folder.
UpdateCatalogItemCommand.cs
using Application.Common.Interfaces;
using Domain.Entities;
using MediatR;
namespace Application.Commands.CatalogItems
{
public class UpdateCatalogItemCommand : INotification
{
// Update Catalog Item Command
public UpdateCatalogItemCommand(Guid id, string name, string description, double price, int availableStock, int restockThreshold,
int maxStockThreshold, bool onReorder)
{
CatalogItemId = id;
Name = name;
Description = description;
Price = price;
AvailableStock = availableStock;
RestockThreshold = restockThreshold;
MaxStockThreshold = maxStockThreshold;
OnReorder = onReorder;
}
public Guid CatalogItemId { get; private set; }
public string Name { get; private set; }
public string Description { get; private set; }
public double Price { get; private set; }
// Quantity in stock
public int AvailableStock { get; private set; }
// Available stock at which we should reorder
public int RestockThreshold { get; private set; }
// Maximum number of units that can be in-stock at any time (due to physicial/logistical constraints in warehouses)
public int MaxStockThreshold { get; private set; }
/// <summary>
/// True if item is on reorder
/// </summary>
public bool OnReorder { get; private set; }
}
public class UpdateCatalogItemCommandHandler: INotificationHandler<UpdateCatalogItemCommand>
{
private readonly IAggregateRepository<CatalogItem, Guid> _aggregateRepository;
private readonly ICatalogItemRepository _catalogItemRepository;
public UpdateCatalogItemCommandHandler(IAggregateRepository<CatalogItem, Guid> aggregateRepository,
ICatalogItemRepository catalogItemRepository)
{
_aggregateRepository = aggregateRepository;
_catalogItemRepository = catalogItemRepository;
}
#region INotificationHandler implementation
public async Task Handle(UpdateCatalogItemCommand notification, CancellationToken cancellationToken)
{
var catalogItem = await _aggregateRepository.RehydreateAsync(notification.CatalogItemId, cancellationToken);
if(catalogItem == null)
{
throw new Exception("Invalide catalog item information");
}
catalogItem.Update(notification.CatalogItemId, notification.Name, notification.Description, notification.Price,
notification.AvailableStock, notification.RestockThreshold, notification.MaxStockThreshold, notification.OnReorder);
await _aggregateRepository.AppendAsync(catalogItem, cancellationToken);
// Save data into database
await _catalogItemRepository.UpdateAsync(catalogItem);
}
#endregion
}
}
- Create DeleteCatalogItemCommand and DeleteCatlogItemCommandHandler class in Commands->CatalogItems folder.
DeleteCatalogItemCommand.cs
using Application.Common.Interfaces;
using Domain.Entities;
using MediatR;
namespace Application.Commands.CatalogItems
{
public class DeleteCatalogItemCommand : INotification
{
public DeleteCatalogItemCommand(Guid id)
{
Id = id;
}
public Guid Id { get; private set; }
}
public class DeleteCatlogItemCommandHandler : INotificationHandler<DeleteCatalogItemCommand>
{
private readonly IAggregateRepository<CatalogItem, Guid> _aggregateRepository;
private readonly ICatalogItemRepository _catalogItemRepository;
public DeleteCatlogItemCommandHandler(IAggregateRepository<CatalogItem, Guid> agrregateRepository
, ICatalogItemRepository catalogItemRepository)
{
_aggregateRepository = agrregateRepository;
_catalogItemRepository = catalogItemRepository;
}
public async Task Handle(DeleteCatalogItemCommand notification, CancellationToken cancellationToken)
{
var catalogItem = await _aggregateRepository.RehydreateAsync(notification.Id, cancellationToken);
if (catalogItem == null)
{
throw new Exception("Invalid Catalog Item information");
}
catalogItem.Delete(catalogItem.Id);
await _aggregateRepository.AppendAsync(catalogItem, cancellationToken);
// Save data into database
await _catalogItemRepository.UpdateAsync(catalogItem);
}
}
}
- Create GetCatalogItemLogByIdQuery and GetCatalogItemLogByIdQueryHandler class in Queries->CatalogItems folder
GetCatalogItemLogByIdQuery.cs
using Application.Common.Interfaces;
using Domain.Entities;
using MediatR;
namespace Application.Queries.CatalogItems
{
public class GetCatalogItemLogByIdQuery : IRequest<List<object>>
{
public GetCatalogItemLogByIdQuery(Guid catalogItemId)
{
CatalogItemId = catalogItemId;
}
public Guid CatalogItemId { get; private set; }
}
public class GetCatalogItemLogByIdQueryHandler : IRequestHandler<GetCatalogItemLogByIdQuery, List<object>>
{
private readonly IAggregateRepository<CatalogItem, Guid> _aggregateRepository;
public GetCatalogItemLogByIdQueryHandler(IAggregateRepository<CatalogItem, Guid> aggregateRepository)
{
_aggregateRepository = aggregateRepository;
}
public async Task<List<object>> Handle(GetCatalogItemLogByIdQuery request, CancellationToken cancellationToken)
{
var data = await _aggregateRepository.ReadEventsAsync(request.CatalogItemId, cancellationToken);
return data.Values.ToList();
}
}
}
- Create service collection extension method in root directory.
DependencyInjection.cs
using MediatR;
using Microsoft.Extensions.DependencyInjection;
using System.Reflection;
namespace Application
{
public static class DependencyInjection
{
public static IServiceCollection AddApplication(this IServiceCollection services)
{
//Add MediatR to the Pipe line
services.AddMediatR(Assembly.GetExecutingAssembly());
return services;
}
}
}
Step 6: Organize Infrastructure
- Add Domain and Application project as reference in the Infrastructure project.
- You may add in the project file as follows.
<ItemGroup>
<ProjectReference Include="..\Application\Application.csproj" />
<ProjectReference Include="..\Domain\Domain.csproj" />
</ItemGroup>
- Create ApplicationDbContext class in Persistance folder
ApplicationDbContext.cs
using Domain.Entities;
using Microsoft.EntityFrameworkCore;
using System.Reflection;
namespace Infrastructure.Persistance
{
public class ApplicationDbContext : DbContext
{
public ApplicationDbContext(DbContextOptions<ApplicationDbContext> options) : base(options)
{
}
public DbSet<CatalogItem> CatalogItems { get; set; }
protected override void OnConfiguring(DbContextOptionsBuilder optionsBuilder)
{
}
protected override void OnModelCreating(ModelBuilder builder)
{
base.OnModelCreating(builder);
builder.ApplyConfigurationsFromAssembly(Assembly.GetExecutingAssembly());
}
}
}
- Create CatalogItemRepository class in Persistance folder for insert and update in the sql server.
CatalogItemRepository.cs
using Application.Common.Interfaces;
using Domain.Entities;
namespace Infrastructure.Persistance
{
public class CatalogItemRepository : ICatalogItemRepository
{
private readonly ApplicationDbContext _context;
public CatalogItemRepository(ApplicationDbContext applicationDbContext)
{
_context = applicationDbContext;
}
public async Task AddAsync(CatalogItem catalogItem)
{
await _context.AddAsync(catalogItem);
await _context.SaveChangesAsync();
}
public async Task UpdateAsync(CatalogItem catalogItem)
{
_context.Entry(catalogItem).State = Microsoft.EntityFrameworkCore.EntityState.Modified;
await _context.SaveChangesAsync();
}
}
}
- Create AggregateRepository class in Persistance for inserting into event store db.
AggregateRepository.cs
using Application.Common.Interfaces;
using Application.Common.Resolvers;
using Domain.Entities.Common;
using EventStore.Client;
using Newtonsoft.Json;
using System.Text;
using System.Text.Json;
using System.Text.Json.Serialization;
using JsonSerializer = System.Text.Json.JsonSerializer;
namespace Infrastructure.Persistance
{
public class AggregateRepository<TA, TKey> : IAggregateRepository<TA, TKey> where TA : class, IAggregateRoot<TKey>
{
private readonly EventStoreClient _eventStoreClient;
private readonly string _stramBaseName;
public AggregateRepository(EventStoreClient eventStoreClient)
{
_eventStoreClient = eventStoreClient;
var aggregateType = typeof(TA);
_stramBaseName = aggregateType.Name;
}
public async Task AppendAsync(TA aggregate, CancellationToken cancellationToken = default)
{
if (null == aggregate)
throw new ArgumentNullException(nameof(aggregate));
if (!aggregate.Events.Any())
return;
var streamName = GetStreamName(aggregate.Id);
var eventList = aggregate.Events.Select(Map).ToArray();
var result = await _eventStoreClient.AppendToStreamAsync(streamName, StreamState.Any,
eventList.ToArray(), cancellationToken: cancellationToken);
}
public async Task<Dictionary<int, object>> ReadEventsAsync(TKey aggregateId, CancellationToken cancellationToken = default)
{
var streamName = GetStreamName(aggregateId);
var result = _eventStoreClient.ReadStreamAsync(Direction.Forwards, streamName, StreamPosition.Start);
var events = new Dictionary<int, object>();
var index = 0;
foreach (var data in await result.ToListAsync(cancellationToken: cancellationToken))
{
// Read event metadata to get type information of an event
var eventMetaData = JsonSerializer.Deserialize<EventMeta>(Encoding.UTF8.GetString(data.Event.Metadata.ToArray()));
Type? typeInfo = Type.GetType(eventMetaData.EventType);
if(typeInfo is null)
{
throw new Exception($"Invalid type {eventMetaData.EventType}");
}
var jsonData = Encoding.UTF8.GetString(data.Event.Data.ToArray());
var eventInfo = JsonConvert.DeserializeObject(jsonData, typeInfo, new JsonSerializerSettings()
{
ConstructorHandling = ConstructorHandling.AllowNonPublicDefaultConstructor,
ContractResolver = new PrivateSetterContractResolver()
});
events.Add(index, new
{
Events = eventInfo,
EventType = data.OriginalEvent.EventType
});
index++;
}
return events;
}
/// <summary>
/// Read all events using Aggregate ID
/// </summary>
/// <param name="aggregateId"></param>
/// <param name="cancellationToken"></param>
/// <returns></returns>
/// <exception cref="NotImplementedException"></exception>
public async Task<TA?> RehydreateAsync(TKey key, CancellationToken cancellationToken = default)
{
try
{
var streamName = GetStreamName(key);
var events = new List<IDomainEvent<TKey>>();
var result = _eventStoreClient.ReadStreamAsync(Direction.Forwards, streamName, StreamPosition.Start);
foreach (var data in await result.ToListAsync(cancellationToken))
{
//Read Event Metadata to get Type Information of an event
var eventMetaData = JsonSerializer.Deserialize<EventMeta>
(Encoding.UTF8.GetString(data.Event.Metadata.ToArray()));
Type? typeInfo = Type.GetType(eventMetaData.EventType);
if (typeInfo == null)
{
throw new Exception($"Invalid type {eventMetaData.EventType}");
}
var jsonData = Encoding.UTF8.GetString(data.Event.Data.ToArray());
var eventInfo = JsonConvert.DeserializeObject(jsonData, typeInfo, new JsonSerializerSettings()
{
ConstructorHandling = ConstructorHandling.AllowNonPublicDefaultConstructor,
ContractResolver = new PrivateSetterContractResolver()
});
events.Add((IDomainEvent<TKey>)eventInfo);
}
if (!events.Any())
return null;
var aggregateResult = BaseAggregateRoot<TA, TKey>.Create(events.OrderBy(x => x.AggregateVersion));
return aggregateResult;
}
catch (Exception e)
{
Console.WriteLine(e);
throw;
}
}
// Techical: Expression-bodied member
// Generate stream name format
private string GetStreamName(TKey aggregateKey)
=> $"{_stramBaseName}_{aggregateKey}";
// Map domain event to event data
private static EventData Map(IDomainEvent<TKey> @event)
{
var meta = new EventMeta()
{
EventType = @event.GetType().AssemblyQualifiedName
};
var metaJson = System.Text.Json.JsonSerializer.Serialize(meta);
var metadata = Encoding.UTF8.GetBytes(metaJson);
var eventData = new EventData(
Uuid.NewUuid(),
@event.GetType().Name,
JsonSerializer.SerializeToUtf8Bytes(@event, @event.GetType(), new JsonSerializerOptions()
{
ReferenceHandler = ReferenceHandler.IgnoreCycles
}),
metadata
);
return eventData;
}
/// <summary>
/// Meta data information for an event which will also saved into each Event Payload
/// </summary>
internal struct EventMeta
{
public string EventType { get; set; }
}
}
}
- Add service collection extension method in the root directory. Here sql server, event store db and dependency injection is configured.
DependencyInjection.cs
using Application.Common.Interfaces;
using Domain.Entities;
using Domain.Entities.Common;
using EventStore.Client;
using Infrastructure.Persistance;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
namespace Infrastructure
{
public static class DependencyInjection
{
public static IServiceCollection AddEventStore(this IServiceCollection services, IConfiguration configuration)
{
// Event store database connection
var settings = EventStoreClientSettings
.Create("esdb://127.0.0.1:2113?tls=false&keepAliveTimeout=10000&keepAliveInterval=10000");
var client = new EventStoreClient(settings);
services.AddSingleton(client);
// Register DbContext for SQL Server
services.AddDbContext<ApplicationDbContext>(options =>
{
options.UseSqlServer(
configuration.GetConnectionString("DefaultConnection"),
sqlServerOptionsAction: sqlOptions =>
{
});
});
services.AddScoped<ICatalogItemRepository, CatalogItemRepository>();
services.AddEventsRepository<CatalogItem, Guid>();
return services;
}
private static IServiceCollection AddEventsRepository<TA, TK>(this IServiceCollection services)
where TA : class, IAggregateRoot<TK>
{
return services.AddSingleton(typeof(IAggregateRepository<TA, TK>), typeof(AggregateRepository<TA, TK>));
}
}
}
Step 7: Organize Catalog.API
- Add connection string in the appsettings.json file as follows.
appsettings.json
{
"Logging": {
"LogLevel": {
"Default": "Information",
"Microsoft.AspNetCore": "Warning"
}
},
"AllowedHosts": "*",
"ConnectionStrings": {
"DefaultConnection": "Data Source=localhost;Initial Catalog=CatalogDB;Persist Security Info=False;User ID=sa; Password = yourpassword;Pooling=False;MultipleActiveResultSets=False;Encrypt=False;TrustServerCertificate=False"
}
}
- Add CatalogItemController class in Controllers folder as follows.
CatalogItemController.cs
using Application.Commands.CatalogItems;
using Application.Common.DTOs;
using Application.Queries.CatalogItems;
using MediatR;
using Microsoft.AspNetCore.Mvc;
namespace Catalog.API.Controllers
{
[Route("api/[controller]")]
[ApiController]
public class CatalogItemController : ControllerBase
{
private readonly IMediator _mediator;
public CatalogItemController(IMediator mediator)
{
_mediator = mediator;
}
[HttpPost("Create")]
public async Task<IActionResult> CreateCatalogItemAsync([FromBody] CreateCatalogItemDTO? dto)
{
if (dto == null)
return BadRequest();
var command = new CreateCatalogItemCommand(dto.Name, dto.Description, dto.Price, dto.AvailableStock, dto.RestockThreshold,
dto.MaxStockThreshold, dto.OnReorder);
await _mediator.Publish(command);
return Ok(command);
}
[HttpPatch("update/{id:guid}")]
public async Task<IActionResult> UpdateCatalogItem(Guid id, [FromBody] UpdateCatalogItemDTO? dto)
{
if (dto == null)
return BadRequest();
var updateCommand = new UpdateCatalogItemCommand(id, dto.Name, dto.Description, dto.Price, dto.AvailableStock,
dto.RestockThreshold, dto.MaxStockThreshold, dto.OnReorder);
await _mediator.Publish(updateCommand);
return Ok(updateCommand);
}
/// <summary>
/// Delete a catalog item which is soft delete not hard delete
/// </summary>
/// <param name="id"></param>
/// <returns></returns>
[HttpDelete("delete/{id:guid}")]
public async Task<IActionResult> DeleteCatalogItem(Guid id)
{
var deleteCatalogCommand = new DeleteCatalogItemCommand(id);
await _mediator.Publish(deleteCatalogCommand);
return Ok();
}
[HttpGet("log/{id:guid}")]
public async Task<IActionResult> GetLog(Guid id)
{
return Ok(await _mediator.Send(new GetCatalogItemLogByIdQuery(id)));
}
}
}
- Modify Program.cs as follows.
Program.cs
using Application;
using Infrastructure;
var builder = WebApplication.CreateBuilder(args);
// Add services to the container.
var environment = Environment.GetEnvironmentVariable("ASPNETCORE_ENVIRONMENT");
builder.Configuration.SetBasePath(Directory.GetCurrentDirectory())
.AddJsonFile("appsettings.json", optional: false, reloadOnChange: true)
.AddJsonFile($"appsettings.{environment}.json", optional: true)
.AddEnvironmentVariables();
//Register Application layer and Event Store layer from Infrastructure here
builder.Services.AddApplication().AddEventStore(builder.Configuration);
builder.Services.AddControllers();
// Learn more about configuring Swagger/OpenAPI at https://aka.ms/aspnetcore/swashbuckle
builder.Services.AddEndpointsApiExplorer();
builder.Services.AddSwaggerGen();
var app = builder.Build();
// Configure the HTTP request pipeline.
if (app.Environment.IsDevelopment())
{
app.UseSwagger();
app.UseSwaggerUI();
}
app.UseHttpsRedirection();
app.UseAuthorization();
app.MapControllers();
app.Run();
Step 8: Add Migration
- Set Catalog.API as startup project and go to package manager console.
- Select Infrastructure as Default project and run the following commands.
PM> Add-Migration init-mig
PM> Update-Database -Verbose
Step 9: Run application and perform CRUD operation using swagger. See the impact in sql server and EventStoreDB as follows.
Swagger UI: Create, update and delete CatalogItem using SwaggerUI
EventStoreDB UI: Browse event store db to see the impact