Application of Event Sourcing using asp.net core, DDD, EF, EventStoreDB and SQL Server

20 minute read

Introduction

Event sourcing is a most important pattern to design a microservice based application. If you are working with multiple services in a microservice based application, you have to use event driven architecture. In this article I will discuss and apply event sourcing using asp.net core, DDD and EventStoreDB.

Domain Driven Design (DDD)_
Domain-driven design (DDD) is a major software design approach, focusing on modeling software to match a domain according to input from that domain’s experts. Under domain-driven design, the structure and language of software code (class names, class methods, class variables) should match the business domain. (Wikipedia)

Event Sourcing
Event sourcing is a technique to store all events of an Object to get all of its versions. Event sourcing pattern is used to implement microservice based application. Using this pattern, we can track the changes of an object in its lifecycle.

EventStoreDB EventStoreDB specially built for Event Sourcing. It is a NoSQL database. This is a one-way database – we can only insert data into database.

Implementation
Let’s implement Event Sourcing using DDD and EventStoreDB.

Tools and Technologies Used

  • Visual Studio 2022
  • .NET 6.0
  • ASP.NET Core Web API
  • Visual C#
  • DDD
  • EventStoreDB
  • SQL Server

Step 1: Install EventStoreDB

  • You can install EventStoreDB using EventStoreDB documentation. Visit the following link : https://developers.eventstore.com/server/v20.10/installation.html#quick-start

  • Or, you can run docker image of EventStoreDB as below.

docker run --name esdb-node -it -p 2113:2113 -p 1113:1113 eventstore/eventstore:latest --insecure --run-projections=All --enable-external-tcp --enable-atom-pub-over-http
  • Here I used docker images for EventStoreDB. After running the above command, browse EventStoreDB using the following link.

http://localhost:2113/web/index.html#/dashboard

Step 2: Create solution and projects

  • Create a solution name EventSourcing.
  • Add a new web api projects name - Catalog.API
  • Add three class library projects name – Application, Domain, Infrastructure

Step 3: Install nuget packages

  • Install following nuget packages in Catalog.API
PM> Install-Package Microsoft.EntityFrameworkCore.Tools
  • Install following nuget packages in Application
PM> Install-Package MediatR
PM> Install-Package MediatR.Extensions.Microsoft.DependencyInjection
PM> Install-Package Microsoft.Extensions.DependencyInjection.Abstractions
PM> Install-Package Newtonsoft.Json
  • Install following nuget packages in Infrastructure
PM> Install-Package EventStore.Client.Grpc.Streams
PM> Install-Package Microsoft.EntityFrameworkCore
PM> Install-Package Microsoft.Extensions.Configuration.Abstractions
PM> Install-Package Microsoft.Extensions.DependencyInjection.Abstractions
PM> Install-Package Newtonsoft.Json

Step 4: Organize Domain

  • Create IBaseEntity interface for Base entity in Entities->Common folder.

IBaseEntity.cs

namespace Domain.Entities.Common
{
    /// Ref: Coverience in C#
    /// <summary>
    /// Interface for Base Entity
    /// </summary>
    /// <typeparam name="TKey"></typeparam>
    public interface IBaseEntity<out TKey>
    {
        TKey Id { get; }
    }
}

  • Create IDomainEvent interface for domain event in Entities->Common folder

IDomainEvent.cs

namespace Domain.Entities.Common
{

    /// <summary>
    /// Interface for Domain Event
    /// </summary>
    /// <typeparam name="TKey"></typeparam>
    public interface IDomainEvent<out TKey>
    {
        public long AggregateVersion { get; }
        TKey AggregateId { get; }
        DateTime TimeStamp { get; }
    }
}

  • Create IAggregateRoot interface for Aggregate Root in Entities->Common folder. IAggregateRoot is the combination of IBaseEntity and IAggregateRoot

IAggregateRoot.cs

namespace Domain.Entities.Common
{
    /// <summary>
    /// Interface for AggregateRoot
    /// IAggregateRoot is the combination of IBaseEntity and IAggregateRoot
    /// </summary>
    /// <typeparam name="TKey"></typeparam>
    public interface IAggregateRoot<out TKey> : IBaseEntity<TKey>
    {
        long Version { get; }
        IReadOnlyCollection<IDomainEvent<TKey>> Events { get;  }
        void ClearEvents();
    }
}

  • Create BaseEntity class for Base Aggregate Root in Entities->Common folder. BaseEntity’s properties is shared in all entity classes.

BaseEntity.cs

namespace Domain.Entities.Common
{
    /// <summary>
    /// Base class for BaseAggregateRoot class
    /// Shared for all entities
    /// </summary>
    /// <typeparam name="TKey"></typeparam>
    public abstract class BaseEntity<TKey> : IBaseEntity<TKey>
    {
        protected BaseEntity() { }
        protected BaseEntity(TKey id) => Id = id;

        //Implementation of interface
        public TKey Id { get; protected set; }
    }
}

  • Create BaseDomainEvent class for all domain event class in Entities->Common folder.

BaseDomainEvent.cs

namespace Domain.Entities.Common
{
    /// <summary>
    /// Base domain event for all domain event
    /// </summary>
    /// <typeparam name="TA"></typeparam>
    /// <typeparam name="TKey"></typeparam>
    public class BaseDomainEvent<TA, TKey> : IDomainEvent<TKey> where TA : IAggregateRoot<TKey>
    {
        protected BaseDomainEvent() { }
        public BaseDomainEvent(TA aggregateRoot)
        {
            if(aggregateRoot is null)
            {
                throw new ArgumentNullException(nameof(aggregateRoot));
            }

            AggregateId = aggregateRoot.Id;
            AggregateVersion = aggregateRoot.Version;
            TimeStamp = DateTime.Now;
        }
        //Implementation of IDomainEvent
        public long AggregateVersion { get; private set; }

        //Implementation of IDomainEvent
        public TKey AggregateId { get; private set; }

        //Implementation of IDomainEvent
        public DateTime TimeStamp { get; private set; }
    }
}

  • Create BaseAggregateRoot class for all aggregate root in Entities->Common folder. Here _event queue is used to queue all events of the aggregate root. AddEvent method used to add new event.

BaseAggregateRoot.cs


using System.Collections.Immutable;
using System.Reflection;

namespace Domain.Entities.Common
{
    public abstract class BaseAggregateRoot<TA, TKey> : BaseEntity<TKey>, IAggregateRoot<TKey> where TA : IAggregateRoot<TKey>
    {

        // Queuing all events
        private readonly Queue<IDomainEvent<TKey>> _events = new Queue<IDomainEvent<TKey>>();

        protected BaseAggregateRoot() { }
        protected BaseAggregateRoot(TKey id) : base(id)
        {

        }

        protected void AddEvent(IDomainEvent<TKey> @event)
        {
            if(@event == null)
            {
                throw new ArgumentNullException(nameof(@event));
            }

            _events.Enqueue(@event);
            Apply(@event);
            Version++;
        }

        /// <summary>
        /// This method is implemented in the derived class
        /// Apply this method to implement different events
        /// </summary>
        /// <param name="event"></param>
        
        protected abstract void Apply(IDomainEvent<TKey> @event);

        // Implementation of IAggregateRoot
        // Aggregate version
        public long Version{ get; private set; }

        // Implementation of IAggregateRoot
        public IReadOnlyCollection<IDomainEvent<TKey>> Events => _events.ToImmutableArray();

        // Implementation of IAggregateRoot
        public void ClearEvents()
        {
            _events.Clear();
        }

        #region Factory

        private static readonly ConstructorInfo CTor;

        static BaseAggregateRoot()
        {
            var aggregateType = typeof(TA);
            CTor = aggregateType.GetConstructor(BindingFlags.Instance | BindingFlags.NonPublic | BindingFlags.Public,
                null, new Type[0], new ParameterModifier[0]);
            if (null == CTor)
                throw new InvalidOperationException($"Unable to find required private parameterless constructor for Aggregate of type '{aggregateType.Name}'");
        }


        /// <summary>
        /// Create Base Aggregate root when Rehydrate all Events
        /// </summary>
        /// <param name="events"></param>
        /// <returns></returns>
        /// <exception cref="ArgumentNullException"></exception>
        public static TA Create(IEnumerable<IDomainEvent<TKey>> events)
        {
            if (null == events || !events.Any())
                throw new ArgumentNullException(nameof(events));
            var result = (TA)CTor.Invoke(new object[0]);

            // Problem is in here
            var baseAggregate = result as BaseAggregateRoot<TA, TKey>;

            if (baseAggregate != null)
                foreach (var @event in events)
                    baseAggregate.AddEvent(@event);

            result.ClearEvents();

            return result;
        }

        #endregion  
    }
}

  • Create CatalogItemCreated Event class in Events->CatalogItem folder

CatalogItemCreated.cs


using Domain.Entities.Common;

namespace Domain.Events.CatalogItem
{
    /// <summary>
    /// Catalog item created event
    /// </summary>
    public class CatalogItemCreated: BaseDomainEvent<Entities.CatalogItem, Guid>
    {

        private CatalogItemCreated()
        {

        }
        public CatalogItemCreated(Entities.CatalogItem catalogItem) : base(catalogItem)
        {
            //Id = catalogItem.Id;
            Name = catalogItem.Name;
            Description = catalogItem.Description;
            Price = catalogItem.Price;
            AvailableStock = catalogItem.AvailableStock;
            RestockThreshold = catalogItem.RestockThreshold;
            MaxStockThreshold = catalogItem.MaxStockThreshold;
            OnReorder = catalogItem.OnReorder;
            IsDeleted = catalogItem.IsDeleted;
            
        }

        //public Guid Id { get; set; }

        public string Name { get; private set; }

        public string Description { get; private set; }
        public double Price { get; private set; }
        // Quantity in stock
        public int AvailableStock { get; private set; }
        // Available stock at which we should reorder
        public int RestockThreshold { get; private set; }
        // Maximum number of units that can be in-stock at any time (due to physicial/logistical constraints in warehouses)
        public int MaxStockThreshold { get; private set; }
        /// <summary>
        /// True if item is on reorder
        /// </summary>
        public bool OnReorder { get; private set; }

        public bool IsDeleted { get; private set; }

    }
}
 
  • Create CatalogItemDeleted Event class in Events->CatalogItem folder

CatalogItemDeleted.cs


using Domain.Entities.Common;

namespace Domain.Events.CatalogItem
{
    /// <summary>
    /// Catalog item created event
    /// </summary>
    public class CatalogItemDeleted: BaseDomainEvent<Entities.CatalogItem, Guid>
    {

        private CatalogItemDeleted()
        {

        }
        public CatalogItemDeleted(Entities.CatalogItem catalogItem) : base(catalogItem)
        {
            IsDeleted = catalogItem.IsDeleted;
        }
        public bool IsDeleted { get; private set; }

    }
}


  • Create CatalogItemUpdated Event class in Events->CatalogItem folder

CatalogItemUpdated.cs

using Domain.Entities.Common;

namespace Domain.Events.CatalogItem
{
    /// <summary>
    /// Catalog item created event
    /// </summary>
    public class CatalogItemUpdated: BaseDomainEvent<Entities.CatalogItem, Guid>
    {

        private CatalogItemUpdated()
        {

        }
        public CatalogItemUpdated(Entities.CatalogItem catalogItem) : base(catalogItem)
        {
            //Id = catalogItem.Id;
            Name = catalogItem.Name;
            Description = catalogItem.Description;
            Price = catalogItem.Price;
            AvailableStock = catalogItem.AvailableStock;
            RestockThreshold = catalogItem.RestockThreshold;
            MaxStockThreshold = catalogItem.MaxStockThreshold;
            OnReorder = catalogItem.OnReorder;
        }

        //public Guid Id { get; set; }
        public string Name { get; private set; }

        public string Description { get; private set; }
        public double Price { get; private set; }
        // Quantity in stock
        public int AvailableStock { get; private set; }
        // Available stock at which we should reorder
        public int RestockThreshold { get; private set; }
        // Maximum number of units that can be in-stock at any time (due to physicial/logistical constraints in warehouses)
        public int MaxStockThreshold { get; private set; }
        /// <summary>
        /// True if item is on reorder
        /// </summary>
        public bool OnReorder { get; private set; }

    }
}

  • Create CatalogItem Aggregate Root class in Entities folder. Here we used DDD so CatalogItem is the main domain class here. Create, update, delete is performed using this class.

CatalogItem.cs

using Domain.Entities.Common;
using Domain.Events.CatalogItem;

namespace Domain.Entities
{
    public class CatalogItem : BaseAggregateRoot<CatalogItem, Guid>
    {
        private CatalogItem()
        {

        }

        public CatalogItem(Guid id, string name, string description, double price, int availableStock, 
            int restockThreshold, int maxStockThreshold, bool onReorder) : base(id)
        {
            Id = id;
            Name = name;
            Description = description;
            Price = price;
            AvailableStock = availableStock;
            RestockThreshold = restockThreshold;
            MaxStockThreshold = maxStockThreshold;
            OnReorder = onReorder;

            if (Version > 0)
            {
                throw new Exception("Catalog item already created");
            }

            if (string.IsNullOrEmpty(name))
            {
                //Validation Exception will be placed here
                throw new Exception("Name Can not be Empty");
            }

            if (price <= 0)
            {
                //Validation Exception will be placed here
                throw new Exception("Price must be positive value");
            }

            // Add CatalogItem Event Here to create
            AddEvent(new CatalogItemCreated(this));
        }

        public string Name { get; private set; }

        public string Description { get; private set; }

        public double Price { get; private set; }

        // Quantity in stock
        public int AvailableStock { get; private set; }

        // Available stock at which we should reorder
        public int RestockThreshold { get; private set; }

        // Maximum number of units that can be in-stock at any time (due to physicial/logistical constraints in warehouses)
        public int MaxStockThreshold { get; private set; }

        /// <summary>
        /// True if item is on reorder
        /// </summary>
        public bool OnReorder { get; private set; }
        public bool IsDeleted { get; private set; } = false;

        public static CatalogItem Create(string name, string description, double price, int availableStock, 
            int restockThreshold, int maxStockThreshold, bool onReorder)
        {
            return new CatalogItem(Guid.NewGuid(), name, description, price, availableStock, restockThreshold, maxStockThreshold, onReorder); ;
        }

        public void Update(Guid id, string name, string description, double price, int availableStock,
         int restockThreshold, int maxStockThreshold, bool onReorder)
        {
            Id = id;
            Name = name;
            Description = description;
            Price = price;
            AvailableStock = availableStock;
            RestockThreshold = restockThreshold;
            MaxStockThreshold = maxStockThreshold;
            OnReorder = onReorder;

            AddEvent(new CatalogItemUpdated(this));
        }

        public void Delete(Guid id)
        {
            Id = id;
            IsDeleted = true;
            AddEvent(new CatalogItemDeleted(this));
        }



        protected override void Apply(IDomainEvent<Guid> @event)
        {
            switch (@event)
            {
                case CatalogItemCreated catalogItemCreated: OnCatalogItemCreated(catalogItemCreated); break;
                case CatalogItemUpdated catalogItemUpdated: OnCatalogItemUpdated(catalogItemUpdated); break;
                case CatalogItemDeleted catalogItemDeleted: 
                    IsDeleted = catalogItemDeleted.IsDeleted;
                    break;

            }
        }

        // On Catalog Item Created Event
        private void OnCatalogItemCreated(CatalogItemCreated catalogItemCreated)
        {

            Id = catalogItemCreated.AggregateId; // Must have ID
            Name = catalogItemCreated.Name;
            Description= catalogItemCreated.Description;
            Price = catalogItemCreated.Price;
            AvailableStock = catalogItemCreated.AvailableStock;
            RestockThreshold = catalogItemCreated.RestockThreshold;
            MaxStockThreshold = catalogItemCreated.MaxStockThreshold;
            OnReorder = catalogItemCreated.OnReorder;
        }

        // On Catalog Item Updated Event
        private void OnCatalogItemUpdated(CatalogItemUpdated catalogItemUpdated)
        {
            Name = catalogItemUpdated.Name;
            Description = catalogItemUpdated.Description;
            Price = catalogItemUpdated.Price;
            AvailableStock = catalogItemUpdated.AvailableStock;
            RestockThreshold = catalogItemUpdated.RestockThreshold;
            MaxStockThreshold = catalogItemUpdated.MaxStockThreshold;
            OnReorder = catalogItemUpdated.OnReorder;
        }
    }
}

Step 5: Organize Application

  • Add domain as a reference project in Application project. You may add using project file as follows.
  <ItemGroup>
    <ProjectReference Include="..\Domain\Domain.csproj" />
  </ItemGroup>
  • Create CreateCatalogItemDTO class in Common->DTOs folder

CreateCatalogItemDTO.cs


namespace Application.Common.DTOs
{
    public class CreateCatalogItemDTO
    {
        public int Id { get; set; }

        public string Name { get; set; }

        public string Description { get; set; }

        public double Price { get; set; }

        // Quantity in stock
        public int AvailableStock { get; set; }

        // Available stock at which we should reorder
        public int RestockThreshold { get; set; }

        // Maximum number of units that can be in-stock at any time (due to physicial/logistical constraints in warehouses)
        public int MaxStockThreshold { get; set; }

        /// <summary>
        /// True if item is on reorder
        /// </summary>
        public bool OnReorder { get; set; }
    }
}

  • Create UpdateCatalogItemDTO class in Common->DTOs folder

UpdateCatalogItemDTO.cs


namespace Application.Common.DTOs
{
    public class UpdateCatalogItemDTO
    {
        public Guid Id { get; private set; }

        public string Name { get; set; }

        public string Description { get; set; }

        public double Price { get; set; }

        // Quantity in stock
        public int AvailableStock { get; set; }

        // Available stock at which we should reorder
        public int RestockThreshold { get; set; }

        // Maximum number of units that can be in-stock at any time (due to physicial/logistical constraints in warehouses)
        public int MaxStockThreshold { get; set; }

        /// <summary>
        /// True if item is on reorder
        /// </summary>
        public bool OnReorder { get; set; }
    }
}

  • Create IAggregateRepository Interface in Common->Interfaces folder. This interface is used for event sourcing. Here, AppendAsync method is used to append events to store in event store database, RehydreateAsync method is used to read all events using aggregate id and ReadEventsAsync method is used to read events as a log and return in to dictionary.

IAggregateRepository.cs

using Domain.Entities.Common;

namespace Application.Common.Interfaces
{
    public interface IAggregateRepository<TA, TKey> where TA : class, IAggregateRoot<TKey>
    {
        // Append events to store in event store database
        Task AppendAsync(TA aggregate, CancellationToken cancellationToken = default);

        // Read all events using aggregate ID
        Task<TA?> RehydreateAsync(TKey aggregateId, CancellationToken cancellationToken = default);

        // Read events as a log and return into a dictionary
        Task<Dictionary<int, object>> ReadEventsAsync(TKey aggregateId, CancellationToken cancellationToken = default);
    }
}

  • Create ICatalogItemRepository interface to insert and update into sql server. Since, we use soft delete so here delete means update of isDelete filed to 1.

ICatalogItemRepository.cs


using Domain.Entities;

namespace Application.Common.Interfaces
{
    public interface ICatalogItemRepository
    {
        Task AddAsync(CatalogItem catalogItem);
        Task UpdateAsync(CatalogItem catalogItem);
    }
}

  • Create PrivateSetterContractResolver class in Common->Resolvers folder. This is a Custom Contract Resolver to Set Private property when Rehydrate Events

PrivateSetterContractResolver.cs


using Newtonsoft.Json;
using Newtonsoft.Json.Serialization;
using System.Reflection;

namespace Application.Common.Resolvers
{
    /// <summary>
    /// Custom Contract Resolver to Set Private property when Rehydrate Events
    /// </summary>
    public class PrivateSetterContractResolver : DefaultContractResolver
    {
        protected override JsonProperty CreateProperty(MemberInfo member, MemberSerialization memberSerialization)
        {
            var jsonProperty = base.CreateProperty(member, memberSerialization);

            if(jsonProperty.Writable)
                return jsonProperty;

            if(member is PropertyInfo propertyInfo)
            {
                var setter = propertyInfo.GetSetMethod(true);
                jsonProperty.Writable = setter != null;
            }

            return jsonProperty;
        }
        //EnvironmentVariableTarget jsonProperty = base.CreateProperty();
    }
}

  • Create CreateCatalogItemCommand and CreateCatalogItemCommandHandler class in Commands->CatalogItems folder.

CreateCatalogItemCommand.cs


using Application.Common.Interfaces;
using Domain.Entities;
using MediatR;

namespace Application.Commands.CatalogItems
{
    public class CreateCatalogItemCommand : INotification
    {
        public CreateCatalogItemCommand(string name, string description, double price, int availableStock, int restockThreshold, 
            int maxStockThreshold, bool onReorder)
        {
            Name = name;
            Description = description;
            Price = price;
            AvailableStock = availableStock;
            RestockThreshold = restockThreshold;
            MaxStockThreshold = maxStockThreshold;
            OnReorder = onReorder;
        }


        public string Name { get; private set; }

        public string Description { get; private set; }

        public double Price { get; private set; }

        // Quantity in stock
        public int AvailableStock { get; private set; }

        // Available stock at which we should reorder
        public int RestockThreshold { get; private set; }

        // Maximum number of units that can be in-stock at any time (due to physicial/logistical constraints in warehouses)
        public int MaxStockThreshold { get; private set; }

        /// <summary>
        /// True if item is on reorder
        /// </summary>
        public bool OnReorder { get; private set; }
    }

    public class CreateCatalogItemCommandHandler : INotificationHandler<CreateCatalogItemCommand>
    {
        private readonly IAggregateRepository<CatalogItem, Guid> _aggregateRepository;
        private readonly ICatalogItemRepository _catalogItemRepository;

        public CreateCatalogItemCommandHandler(IAggregateRepository<CatalogItem, Guid> aggregateRepository, 
            ICatalogItemRepository catalogItemRepository)
        {
            _aggregateRepository = aggregateRepository;
            _catalogItemRepository = catalogItemRepository;
        }

        public async Task Handle(CreateCatalogItemCommand notification, CancellationToken cancellationToken)
        {

    

            // Insert event into eventstore db
            var catalogItem = CatalogItem.Create(notification.Name, notification.Description, notification.Price, notification.AvailableStock,
                notification.RestockThreshold, notification.MaxStockThreshold, notification.OnReorder);
            await _aggregateRepository.AppendAsync(catalogItem);

            // Save data into database
            //await _catalogItemAggregateRepository.SaveAsync(catalogItem.Events.FirstOrDefault());
            await _catalogItemRepository.AddAsync(catalogItem);

            // Dispatch events to any event/service bus to do next actions
            // We can also register EventStore db Subscription to receive Event Notification
        }
    }
}

  • Create UpdateCatalogItemCommand and UpdateCatalogItemCommandHandler class in Commands->CatalogItems folder.

UpdateCatalogItemCommand.cs


using Application.Common.Interfaces;
using Domain.Entities;
using MediatR;

namespace Application.Commands.CatalogItems
{
    public class UpdateCatalogItemCommand : INotification
    {
        // Update Catalog Item Command
        public UpdateCatalogItemCommand(Guid id, string name, string description, double price, int availableStock, int restockThreshold,
            int maxStockThreshold, bool onReorder)
        {
            CatalogItemId = id;
            Name = name;
            Description = description;
            Price = price;
            AvailableStock = availableStock;
            RestockThreshold = restockThreshold;
            MaxStockThreshold = maxStockThreshold;
            OnReorder = onReorder;
        }

        public Guid CatalogItemId { get; private set; }

        public string Name { get; private set; }

        public string Description { get; private set; }

        public double Price { get; private set; }

        // Quantity in stock
        public int AvailableStock { get; private set; }

        // Available stock at which we should reorder
        public int RestockThreshold { get; private set; }

        // Maximum number of units that can be in-stock at any time (due to physicial/logistical constraints in warehouses)
        public int MaxStockThreshold { get; private set; }

        /// <summary>
        /// True if item is on reorder
        /// </summary>
        public bool OnReorder { get; private set; }

    }

    public class UpdateCatalogItemCommandHandler: INotificationHandler<UpdateCatalogItemCommand>
    {
        private readonly IAggregateRepository<CatalogItem, Guid> _aggregateRepository;
        private readonly ICatalogItemRepository _catalogItemRepository;
        public UpdateCatalogItemCommandHandler(IAggregateRepository<CatalogItem, Guid> aggregateRepository, 
            ICatalogItemRepository catalogItemRepository)
        { 
            _aggregateRepository = aggregateRepository;
            _catalogItemRepository = catalogItemRepository;
        }

        #region INotificationHandler implementation
        public async Task Handle(UpdateCatalogItemCommand notification, CancellationToken cancellationToken)
        {
            var catalogItem = await _aggregateRepository.RehydreateAsync(notification.CatalogItemId, cancellationToken);

            if(catalogItem == null)
            {
                throw new Exception("Invalide catalog item information");
            }

            catalogItem.Update(notification.CatalogItemId, notification.Name, notification.Description, notification.Price,
                notification.AvailableStock, notification.RestockThreshold, notification.MaxStockThreshold, notification.OnReorder);

            await _aggregateRepository.AppendAsync(catalogItem, cancellationToken);

            // Save data into database

            await _catalogItemRepository.UpdateAsync(catalogItem);


        }

        #endregion
    }


}

  • Create DeleteCatalogItemCommand and DeleteCatlogItemCommandHandler class in Commands->CatalogItems folder.

DeleteCatalogItemCommand.cs


using Application.Common.Interfaces;
using Domain.Entities;
using MediatR;

namespace Application.Commands.CatalogItems
{
    public class DeleteCatalogItemCommand : INotification
    {
        public DeleteCatalogItemCommand(Guid id)
        {
            Id = id;
        }
        public Guid Id { get; private set; }
    }

    public class DeleteCatlogItemCommandHandler : INotificationHandler<DeleteCatalogItemCommand>
    {
        private readonly IAggregateRepository<CatalogItem, Guid> _aggregateRepository;
        private readonly ICatalogItemRepository _catalogItemRepository;

        public DeleteCatlogItemCommandHandler(IAggregateRepository<CatalogItem, Guid> agrregateRepository
            , ICatalogItemRepository catalogItemRepository)
        {
            _aggregateRepository = agrregateRepository;
            _catalogItemRepository = catalogItemRepository;
        }
        public async Task Handle(DeleteCatalogItemCommand notification, CancellationToken cancellationToken)
        {
            var catalogItem = await _aggregateRepository.RehydreateAsync(notification.Id, cancellationToken);
            if (catalogItem == null)
            {
                throw new Exception("Invalid Catalog Item information");
            }

            catalogItem.Delete(catalogItem.Id);
            await _aggregateRepository.AppendAsync(catalogItem, cancellationToken);

            // Save data into database
            await _catalogItemRepository.UpdateAsync(catalogItem);
        }
    }
}

  • Create GetCatalogItemLogByIdQuery and GetCatalogItemLogByIdQueryHandler class in Queries->CatalogItems folder

GetCatalogItemLogByIdQuery.cs


using Application.Common.Interfaces;
using Domain.Entities;
using MediatR;

namespace Application.Queries.CatalogItems
{
    public class GetCatalogItemLogByIdQuery : IRequest<List<object>>
    {
        public GetCatalogItemLogByIdQuery(Guid catalogItemId)
        {
            CatalogItemId = catalogItemId;
        }

        public Guid CatalogItemId { get; private set; }
    }

    public class GetCatalogItemLogByIdQueryHandler : IRequestHandler<GetCatalogItemLogByIdQuery, List<object>>
    {
        private readonly IAggregateRepository<CatalogItem, Guid> _aggregateRepository;

        public GetCatalogItemLogByIdQueryHandler(IAggregateRepository<CatalogItem, Guid> aggregateRepository)
        {
            _aggregateRepository = aggregateRepository;
        }

        public async Task<List<object>> Handle(GetCatalogItemLogByIdQuery request, CancellationToken cancellationToken)
        {
            var data = await _aggregateRepository.ReadEventsAsync(request.CatalogItemId, cancellationToken);
            return data.Values.ToList();
        }
    }
}

  • Create service collection extension method in root directory.

DependencyInjection.cs


using MediatR;
using Microsoft.Extensions.DependencyInjection;
using System.Reflection;

namespace Application
{
    public static class DependencyInjection
    {
        public static IServiceCollection AddApplication(this IServiceCollection services)
        {
            //Add MediatR to the Pipe line
            services.AddMediatR(Assembly.GetExecutingAssembly());
            return services;
        }
    }
}

Step 6: Organize Infrastructure

  • Add Domain and Application project as reference in the Infrastructure project.
  • You may add in the project file as follows.

  <ItemGroup>
    <ProjectReference Include="..\Application\Application.csproj" />
    <ProjectReference Include="..\Domain\Domain.csproj" />
  </ItemGroup>

  • Create ApplicationDbContext class in Persistance folder

ApplicationDbContext.cs


using Domain.Entities;
using Microsoft.EntityFrameworkCore;
using System.Reflection;

namespace Infrastructure.Persistance
{
    public class ApplicationDbContext : DbContext
    {
        public ApplicationDbContext(DbContextOptions<ApplicationDbContext> options) : base(options)
        {

        }

        public DbSet<CatalogItem> CatalogItems { get; set; }

        protected override void OnConfiguring(DbContextOptionsBuilder optionsBuilder)
        {

        }

        protected override void OnModelCreating(ModelBuilder builder)
        {
            base.OnModelCreating(builder);
            builder.ApplyConfigurationsFromAssembly(Assembly.GetExecutingAssembly());
        }
    }
}


  • Create CatalogItemRepository class in Persistance folder for insert and update in the sql server.

CatalogItemRepository.cs


using Application.Common.Interfaces;
using Domain.Entities;

namespace Infrastructure.Persistance
{
    public class CatalogItemRepository : ICatalogItemRepository
    {
        private readonly ApplicationDbContext _context;

        public CatalogItemRepository(ApplicationDbContext applicationDbContext)
        {
            _context = applicationDbContext;
        }
        public async Task AddAsync(CatalogItem catalogItem)
        {
            await _context.AddAsync(catalogItem);
            await _context.SaveChangesAsync();
        }

        public async Task UpdateAsync(CatalogItem catalogItem)
        {
            _context.Entry(catalogItem).State = Microsoft.EntityFrameworkCore.EntityState.Modified;
            await _context.SaveChangesAsync();
        }
    }
}

  • Create AggregateRepository class in Persistance for inserting into event store db.

AggregateRepository.cs


using Application.Common.Interfaces;
using Application.Common.Resolvers;
using Domain.Entities.Common;
using EventStore.Client;
using Newtonsoft.Json;
using System.Text;
using System.Text.Json;
using System.Text.Json.Serialization;
using JsonSerializer = System.Text.Json.JsonSerializer;

namespace Infrastructure.Persistance
{
    public class AggregateRepository<TA, TKey> : IAggregateRepository<TA, TKey> where TA : class, IAggregateRoot<TKey>
    {
        private readonly EventStoreClient _eventStoreClient;
        private readonly string _stramBaseName;

        public AggregateRepository(EventStoreClient eventStoreClient)
        {
            _eventStoreClient = eventStoreClient;
            var aggregateType = typeof(TA);
            _stramBaseName = aggregateType.Name;
        }
        public async Task AppendAsync(TA aggregate, CancellationToken cancellationToken = default)
        {
            if (null == aggregate)
                throw new ArgumentNullException(nameof(aggregate));
            if (!aggregate.Events.Any())
                return;

            var streamName = GetStreamName(aggregate.Id);

            var eventList = aggregate.Events.Select(Map).ToArray();

            var result = await _eventStoreClient.AppendToStreamAsync(streamName, StreamState.Any,
                eventList.ToArray(), cancellationToken: cancellationToken);
        }

        public async Task<Dictionary<int, object>> ReadEventsAsync(TKey aggregateId, CancellationToken cancellationToken = default)
        {
            var streamName = GetStreamName(aggregateId);
            var result = _eventStoreClient.ReadStreamAsync(Direction.Forwards, streamName, StreamPosition.Start);

            var events = new Dictionary<int, object>();
            var index = 0;

            foreach (var data in await result.ToListAsync(cancellationToken: cancellationToken))
            {
                // Read event metadata to get type information of an event
                var eventMetaData = JsonSerializer.Deserialize<EventMeta>(Encoding.UTF8.GetString(data.Event.Metadata.ToArray()));
                Type? typeInfo = Type.GetType(eventMetaData.EventType);
                if(typeInfo is null)
                {
                    throw new Exception($"Invalid type {eventMetaData.EventType}");
                }

                var jsonData = Encoding.UTF8.GetString(data.Event.Data.ToArray());
                var eventInfo = JsonConvert.DeserializeObject(jsonData, typeInfo, new JsonSerializerSettings()
                {
                    ConstructorHandling = ConstructorHandling.AllowNonPublicDefaultConstructor,
                    ContractResolver = new PrivateSetterContractResolver()
                });

                events.Add(index, new
                {
                    Events = eventInfo,
                    EventType = data.OriginalEvent.EventType
                });

                index++;

            }

            return events;
        }


        /// <summary>
        /// Read all events using Aggregate ID
        /// </summary>
        /// <param name="aggregateId"></param>
        /// <param name="cancellationToken"></param>
        /// <returns></returns>
        /// <exception cref="NotImplementedException"></exception>
        public async Task<TA?> RehydreateAsync(TKey key, CancellationToken cancellationToken = default)
        {
            try
            {

                var streamName = GetStreamName(key);

                var events = new List<IDomainEvent<TKey>>();

                var result = _eventStoreClient.ReadStreamAsync(Direction.Forwards, streamName, StreamPosition.Start);


                foreach (var data in await result.ToListAsync(cancellationToken))
                {
                    //Read Event Metadata to get Type Information of an event
                    var eventMetaData = JsonSerializer.Deserialize<EventMeta>
                        (Encoding.UTF8.GetString(data.Event.Metadata.ToArray()));

                    Type? typeInfo = Type.GetType(eventMetaData.EventType);

                    if (typeInfo == null)
                    {
                        throw new Exception($"Invalid type {eventMetaData.EventType}");
                    }

                    var jsonData = Encoding.UTF8.GetString(data.Event.Data.ToArray());
                    var eventInfo = JsonConvert.DeserializeObject(jsonData, typeInfo, new JsonSerializerSettings()
                    {
                        ConstructorHandling = ConstructorHandling.AllowNonPublicDefaultConstructor,
                        ContractResolver = new PrivateSetterContractResolver()
                    });


                    events.Add((IDomainEvent<TKey>)eventInfo);
                }

                if (!events.Any())
                    return null;

                var aggregateResult = BaseAggregateRoot<TA, TKey>.Create(events.OrderBy(x => x.AggregateVersion));

                return aggregateResult;
            }
            catch (Exception e)
            {
                Console.WriteLine(e);
                throw;
            }
        }

        // Techical: Expression-bodied member
        // Generate stream name format
        private string GetStreamName(TKey aggregateKey)
            => $"{_stramBaseName}_{aggregateKey}";

        // Map domain event to event data
        private static EventData Map(IDomainEvent<TKey> @event)
        {
            var meta = new EventMeta()
            {
                EventType = @event.GetType().AssemblyQualifiedName
            };

            var metaJson = System.Text.Json.JsonSerializer.Serialize(meta);
            var metadata = Encoding.UTF8.GetBytes(metaJson);

            var eventData = new EventData(
                Uuid.NewUuid(),
                @event.GetType().Name,
                JsonSerializer.SerializeToUtf8Bytes(@event, @event.GetType(), new JsonSerializerOptions()
                {
                    ReferenceHandler = ReferenceHandler.IgnoreCycles
                }),
                metadata
                );
            return eventData;
        }

        /// <summary>
        ///  Meta data information for an event which will also saved into each Event Payload
        /// </summary>
        internal struct EventMeta
        {
            public string EventType { get; set; }
        }
    }
}

  • Add service collection extension method in the root directory. Here sql server, event store db and dependency injection is configured.

DependencyInjection.cs


using Application.Common.Interfaces;
using Domain.Entities;
using Domain.Entities.Common;
using EventStore.Client;
using Infrastructure.Persistance;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;

namespace Infrastructure
{
    public static class DependencyInjection
    {
        public static IServiceCollection AddEventStore(this IServiceCollection services, IConfiguration configuration)
        {
            // Event store database connection
            var settings = EventStoreClientSettings
                .Create("esdb://127.0.0.1:2113?tls=false&keepAliveTimeout=10000&keepAliveInterval=10000");

            var client = new EventStoreClient(settings);
            services.AddSingleton(client);

            // Register DbContext for SQL Server

            services.AddDbContext<ApplicationDbContext>(options =>
            {
                options.UseSqlServer(
                    configuration.GetConnectionString("DefaultConnection"),
                    sqlServerOptionsAction: sqlOptions =>
                    {
                    });
            });

            services.AddScoped<ICatalogItemRepository, CatalogItemRepository>();
            services.AddEventsRepository<CatalogItem, Guid>();

            return services;

        }


        private static IServiceCollection AddEventsRepository<TA, TK>(this IServiceCollection services)
    where TA : class, IAggregateRoot<TK>
        {
            return services.AddSingleton(typeof(IAggregateRepository<TA, TK>), typeof(AggregateRepository<TA, TK>));
        }
    }
}

Step 7: Organize Catalog.API

  • Add connection string in the appsettings.json file as follows.

appsettings.json


{
  "Logging": {
    "LogLevel": {
      "Default": "Information",
      "Microsoft.AspNetCore": "Warning"
    }
  },
  "AllowedHosts": "*",
  "ConnectionStrings": {
    "DefaultConnection": "Data Source=localhost;Initial Catalog=CatalogDB;Persist Security Info=False;User ID=sa; Password = yourpassword;Pooling=False;MultipleActiveResultSets=False;Encrypt=False;TrustServerCertificate=False"
  }
}

  • Add CatalogItemController class in Controllers folder as follows.

CatalogItemController.cs


using Application.Commands.CatalogItems;
using Application.Common.DTOs;
using Application.Queries.CatalogItems;
using MediatR;
using Microsoft.AspNetCore.Mvc;

namespace Catalog.API.Controllers
{
    [Route("api/[controller]")]
    [ApiController]
    public class CatalogItemController : ControllerBase
    {

        private readonly IMediator _mediator;

        public CatalogItemController(IMediator mediator)
        {
            _mediator = mediator;
        }

        [HttpPost("Create")]
        public async Task<IActionResult> CreateCatalogItemAsync([FromBody] CreateCatalogItemDTO? dto)
        {
            if (dto == null)
                return BadRequest();

            var command = new CreateCatalogItemCommand(dto.Name, dto.Description, dto.Price, dto.AvailableStock, dto.RestockThreshold,
                dto.MaxStockThreshold, dto.OnReorder);
            await _mediator.Publish(command);

            return Ok(command);
        }

        [HttpPatch("update/{id:guid}")]
        public async Task<IActionResult> UpdateCatalogItem(Guid id, [FromBody] UpdateCatalogItemDTO? dto)
        {
            if (dto == null)
                return BadRequest();

            var updateCommand = new UpdateCatalogItemCommand(id, dto.Name, dto.Description, dto.Price, dto.AvailableStock,
                dto.RestockThreshold, dto.MaxStockThreshold, dto.OnReorder);
            await _mediator.Publish(updateCommand);

            return Ok(updateCommand);
        }

        /// <summary>
        /// Delete a catalog item which is soft delete not hard delete
        /// </summary>
        /// <param name="id"></param>
        /// <returns></returns>
        [HttpDelete("delete/{id:guid}")]
        public async Task<IActionResult> DeleteCatalogItem(Guid id)
        {
            var deleteCatalogCommand = new DeleteCatalogItemCommand(id); 
            await _mediator.Publish(deleteCatalogCommand);
            return Ok();
        }

        [HttpGet("log/{id:guid}")]
        public async Task<IActionResult> GetLog(Guid id)
        {
            return Ok(await _mediator.Send(new GetCatalogItemLogByIdQuery(id)));
        }
    }
}

  • Modify Program.cs as follows.

Program.cs


using Application;
using Infrastructure;

var builder = WebApplication.CreateBuilder(args);

// Add services to the container.

var environment = Environment.GetEnvironmentVariable("ASPNETCORE_ENVIRONMENT");
builder.Configuration.SetBasePath(Directory.GetCurrentDirectory())
    .AddJsonFile("appsettings.json", optional: false, reloadOnChange: true)
    .AddJsonFile($"appsettings.{environment}.json", optional: true)
    .AddEnvironmentVariables();

//Register Application layer and Event Store layer from Infrastructure here
builder.Services.AddApplication().AddEventStore(builder.Configuration);



builder.Services.AddControllers();
// Learn more about configuring Swagger/OpenAPI at https://aka.ms/aspnetcore/swashbuckle
builder.Services.AddEndpointsApiExplorer();
builder.Services.AddSwaggerGen();

var app = builder.Build();

// Configure the HTTP request pipeline.
if (app.Environment.IsDevelopment())
{
    app.UseSwagger();
    app.UseSwaggerUI();
}

app.UseHttpsRedirection();

app.UseAuthorization();

app.MapControllers();

app.Run();

Step 8: Add Migration

  • Set Catalog.API as startup project and go to package manager console.
  • Select Infrastructure as Default project and run the following commands.
PM> Add-Migration init-mig
PM> Update-Database -Verbose

Step 9: Run application and perform CRUD operation using swagger. See the impact in sql server and EventStoreDB as follows.

Swagger UI: Create, update and delete CatalogItem using SwaggerUI

EventStoreDB UI: Browse event store db to see the impact

Source code