Now let us look at the next section the Handler, Handler is the execution of the command which was sent by the process services.
So as we saw earlier, in the ProcessServices, we send the command in the bus filling it with input data received in the api along with the ContextInfo.
We saw how to configure the default destination end point is the Handler end point for the command getting executed in the Handler end point.
In the Handlers.Default.BusGamma is invoked by the end point, and the Handler for the command is executed.
public async Task Handle(AddCustomerCommand message, IMessageHandlerContext context)
{
_logger.LogTrace($"Executing AddCustomerCommandHandler: {nameof(AddCustomerCommandHandler)}");
AddCustomerPostBusDataPacket packet = _flexHost.GetFlexiFlowDataPacket<AddCustomerPostBusDataPacket>();
//Fill your data to datapacket here
packet.cmd = message;
FlexiPluginSequenceBase<AddCustomerPostBusDataPacket> sequence = _flexHost.GetFlexiPluginSequence<AddCustomerPostBusSequence, AddCustomerPostBusDataPacket>();
await FlexiFlow.Run(sequence, packet, new BusGammaHandlerContextBridge(context, message.HostContextInfo));
}
Here also, it will create a DataPacket and use FlexiFlow for execution. The reason it is done is this bus gamma is an implementation of NServiceBus and we kept it separate and decoupled from the actual persistence logic so that it is easy to swap the bus when we need it.
public partial class AddCustomerPlugin : FlexiPluginBase, IFlexiPlugin<AddCustomerPostBusDataPacket>
{
public override string Id { get; set; } = "39f682d485cdecf2c3b66da16f866f05";
public override string FriendlyName { get; set; } = "AddCustomerPlugin";
protected string OnRaiseEventCondition = "";
IFlexRepositoryBridge _repo;
readonly ILogger<AddCustomerPlugin> _logger;
Customer _model;
IFlexHost _flexHost;
IWriteDbConnectionProviderBridge _connectionProvider;
/// <summary>
///
/// </summary>
/// <param name="repo"></param>
/// <param name="logger"></param>
public AddCustomerPlugin(IFlexRepositoryBridge repo, ILogger<AddCustomerPlugin> logger, IFlexHost flexHost, IWriteDbConnectionProviderBridge connectionProvider)
{
_repo = repo;
_logger = logger;
_flexHost = flexHost;
_connectionProvider = connectionProvider;
}
/// <summary>
///
/// </summary>
/// <param name="packet"></param>
public async Task Execute(AddCustomerPostBusDataPacket packet)
{
_connectionProvider.ConfigureDbConnectionString(packet.cmd.HostContextInfo);
_repo.InitializeConnection(_connectionProvider);
_model = _flexHost.GetDomainModel<Customer>().AddCustomer(packet.cmd);
_repo.InsertOrUpdate(_model);
await _repo.SaveAsync();
_logger.LogDebug("Customer inserted into Database: " + _model.Id);
//TODO: Specify your condition to raise event here...
//TODO: Set the value of OnRaiseEventCondition according to your business logic
OnRaiseEventCondition = CONDITION_ONSUCCESS;
RaiseEventCondition raiseEventCondition = new RaiseEventCondition(OnRaiseEventCondition);
await raiseEventCondition.RaiseEvent<AddCustomerPlugin>(this, new object[] { packet.FlexServiceBusContext });
}
}
The handler plugin calls the domain model methods to execute the logic for the domain and persistence
public partial class Customer : DomainModelBridge
{
#region "Public Methods"
/// <summary>
///
/// </summary>
/// <param name="cmd"></param>
/// <returns></returns>
public virtual Customer AddCustomer(AddCustomerCommand cmd)
{
Guard.AgainstNull("Customer command cannot be empty", cmd);
this.Convert(cmd.model);
this.CreatedBy = cmd.HostContextInfo.UserId;
this.LastModifiedBy = cmd.HostContextInfo.UserId;
//Map any other field not handled by Automapper config
this.SetAdded(cmd.Id);
//Set your appropriate SetAdded for the inner object here
return this;
}
#endregion
#region "Private Methods"
#endregion
}
The Bus will keep retrying the message still the data is saved or the execution succeeds without any exceptions and we can configure the messages for the retries. So that you don't lose any messages and we can configure like, Okay Try it five times then wait for 10 minutes, then try it another five times and then ultimately if it fails for half an hour, then the data can go to the database.
So I recommend you go through the eventual consistency articles. We will also give some reference links in our website.
That will help you to create better application more resilient and more scalable applications for your customers after the processing has happened. It will raise the event based on the condition.
public async Task Execute(AddCustomerPostBusDataPacket packet)
{
_connectionProvider.ConfigureDbConnectionString(packet.cmd.HostContextInfo);
_repo.InitializeConnection(_connectionProvider);
_model = _flexHost.GetDomainModel<Customer>().AddCustomer(packet.cmd);
_repo.InsertOrUpdate(_model);
await _repo.SaveAsync();
_logger.LogDebug("Customer inserted into Database: " + _model.Id);
//TODO: Specify your condition to raise event here...
//TODO: Set the value of OnRaiseEventCondition according to your business logic
OnRaiseEventCondition = CONDITION_ONSUCCESS;
RaiseEventCondition raiseEventCondition = new RaiseEventCondition(OnRaiseEventCondition);
await raiseEventCondition.RaiseEvent<AddCustomerPlugin>(this, new object[] { packet.FlexServiceBusContext });
}
If we see customer added event is there so we have a condition called condition OnSuccess. This file for each condition also You will have a different file.
So ideally it is one condition, but you may have some conditional parameters were based on some condition.
You are raising one event based on some other condition you may raising other event.
So the events will be raised accordingly. we will see condition OnSuccess. we need to enable this here otherwise, it won't get executed.
public partial class AddCustomerPlugin : FlexiPluginBase, IFlexiPlugin<AddCustomerPostBusDataPacket>
{
const string CONDITION_ONSUCCESS = "OnSuccess";
/// <summary>
///
/// </summary>
/// <returns></returns>
private async Task OnSuccess(IFlexServiceBusContextBridge serviceBusContext)
{
CustomerAdded @event = new CustomerAdded
{
HostContextInfo = serviceBusContext.HostContextInfo,
//Add your properties here
Id = _model.Id
};
await serviceBusContext.Publish(@event);
}
}
So this is even next we will see how this event raised is handled by the Subscriber.
If you notice here, whenever we are raising an event, we are passing the context info here also, so that the subscriber since the whole process is eventually that it is as synchronized and eventually so we need the context info in other place also and that's where the context info need to be passed here.
Subscriber need it when they can use the contact info, this is very useful when you are developing a multi-tenant application.
Only on one configuration in your startup, your entire application can be switched from a Single tenant into multi-tenant and multi-tenant to Single tenant without changing any line of code inside anywhere in your business logic and the Bus publishes the event so this context is very important to maintain the transaction or the entire process that is being executed here and the context that we passed is the context that is available to us from the answer this bus context so that's all about the Handler.
So now let us look at the subscriber.