Messaging with MassTransit and RabbitMQ in .NET 7
Messaging plays a crucial role in enabling loosely coupled and scalable architectures. MassTransit and RabbitMQ provide a powerful combination for building reliable and robust messaging systems. In this post let's explore an example of how to implement messaging in .NET leveraging these technologies.
MassTransit
Just to bring some context, let's start with a brief introduction to MassTransit.
MassTransit1 is an open-source distributed application framework for .NET. It simplifies the development of message-based applications by providing abstractions and utilities for building robust, scalable, and loosely coupled systems. MassTransit supports multiple transport providers, including RabbitMQ, Azure Service Bus, and more. It also provides support for various messaging patterns like publish/subscribe, request/response, and routing.
RabbitMQ
Similar to MassTransit, here's a brief introduction to RabbitMQ.
RabbitMQ2 is a popular open-source message broker that implements the Advanced Message Queuing Protocol (AMQP). It provides a reliable and scalable messaging infrastructure with support for various messaging patterns like publish/subscribe, request/response, and routing. RabbitMQ3 acts as the backbone for MassTransit, handling the reliable delivery of messages between producers and consumers.
The example
Docker compose
RabbitMQ will run on a docker container. The following docker-compose file4 will be used to spin up the container.
version: "3.9"
services:
rabbitmq:
image: masstransit/rabbitmq:latest
ports:
- "5672:5672"
- "15672:15672"
- "15692:15692"
This solution uses docker, but if you prefer, you can use a SaaS RabbitMQ provider like CloudAMQP.
The entity and events
The example will use a simple entity called Product
and two record events ProductCreatedEvent
and ProductUpdatedEvent
. The ProductCreatedEvent
event will be published when a product is created, and the ProductUpdatedEvent
event will be published when a product is modified/updated.
public class Product
{
public int Id { get; init; }
public string Name { get; init; } = string.Empty;
public string Sku { get; init; } = string.Empty;
public DateTime CreatedAt { get; init; }
public DateTime UpdatedAt { get; init; }
public ProductCreatedEvent ToProductCreatedEvent() {
return new ProductCreatedEvent
{
Id = Id,
Name = Name,
Sku = Sku,
CreatedAt = CreatedAt
};
}
public ProductUpdatedEvent ToProductUpdatedEvent()
{
return new ProductUpdatedEvent()
{
Id = Id,
Name = Name,
Sku = Sku,
UpdatedAt = UpdatedAt
};
}
}
public record ProductCreatedEvent
{
public int Id { get; init; }
public string Name { get; init; } = string.Empty;
public string Sku { get; init; } = string.Empty;
public DateTime CreatedAt { get; init; }
}
public record ProductUpdatedEvent
{
public int Id { get; init; }
public string Name { get; init; } = string.Empty;
public string Sku { get; init; } = string.Empty;
public DateTime UpdatedAt { get; init; }
}
Publisher
The Publisher, which is an API, will expose three endpoints, the first for creating a product, the second for updating a product, and the third for scheduling a product creation. The endpoints will publish the corresponding event to RabbitMQ.
To start, this is how the middleware is initialized:
using Microsoft.Extensions.Hosting;
using Publisher.Extensions;
var host = new HostBuilder()
.ConfigureFunctionsWorkerDefaults()
.ConfigureServices((context, services) =>
{
services.AddMassTransitPublisher(context.Configuration);
})
.Build();
host.Run();
The extension AddMassTransitPublisher
contains the details:
public static class ServiceCollectionExtensions
{
public static IServiceCollection AddMassTransitPublisher(this IServiceCollection services, IConfiguration configuration)
{
services.AddMassTransit(x =>
{
x.AddDelayedMessageScheduler();
x.SetKebabCaseEndpointNameFormatter();
x.UsingRabbitMq((ctx, cfg) =>
{
cfg.Host(configuration.GetValue<string>("RabbitMq"));
cfg.UseDelayedMessageScheduler();
cfg.UseMessageRetry(retry => retry.Interval(3, TimeSpan.FromSeconds(5)));
cfg.ConfigureEndpoints(ctx, new KebabCaseEndpointNameFormatter("dev", false));
});
});
return services;
}
}
NOTE: Retries are configured to retry 3 times with a 5 seconds interval.
RabbitMQ connection string is retrieved from the local.settings.json
file:
{
"IsEncrypted": false,
"Values": {
"AzureWebJobsStorage": "UseDevelopmentStorage=true",
"FUNCTIONS_WORKER_RUNTIME": "dotnet-isolated",
"RabbitMq": "amqp://guest:guest@localhost:5672"
}
}
In this example, we have Azure Function endpoints.
public class Endpoints
{
private readonly IPublishEndpoint _publisher;
private readonly IMessageScheduler _scheduler;
private readonly ILogger _logger;
public Endpoints(
ILoggerFactory loggerFactory,
IPublishEndpoint publisher,
IMessageScheduler scheduler)
{
_publisher = publisher;
_scheduler = scheduler;
_logger = loggerFactory.CreateLogger<Endpoints>();
}
[Function(nameof(ProductCreated))]
public async Task ProductCreated([HttpTrigger(AuthorizationLevel.Function, "post")] HttpRequestData req)
{
_logger.LogInformation("C# HTTP trigger function processed a request.");
var productCreatedEvent = await req.ReadFromJsonAsync<ProductCreatedEvent>();
if (productCreatedEvent != null)
{
await _publisher.Publish(productCreatedEvent, CancellationToken.None);
_logger.LogInformation(
$"Product created: {productCreatedEvent.Id} - {productCreatedEvent.Name} - {productCreatedEvent.Sku}");
}
}
[Function(nameof(ProductUpdated))]
public async Task ProductUpdated([HttpTrigger(AuthorizationLevel.Function, "put")] HttpRequestData req)
{
_logger.LogInformation("C# HTTP trigger function processed a request.");
var productUpdatedEvent = await req.ReadFromJsonAsync<ProductUpdatedEvent>();
if (productUpdatedEvent != null)
{
await _publisher.Publish(productUpdatedEvent, CancellationToken.None);
_logger.LogInformation(
$"Product updated: {productUpdatedEvent.Id} - {productUpdatedEvent.Name} - {productUpdatedEvent.Sku}");
}
}
[Function(nameof(ProductCreatedScheduled))]
public async Task ProductCreatedScheduled([HttpTrigger(AuthorizationLevel.Function, "post")] HttpRequestData req)
{
_logger.LogInformation("C# HTTP trigger function processed a request.");
var productCreatedEvent = await req.ReadFromJsonAsync<ProductCreatedEvent>();
if (productCreatedEvent != null)
{
await _scheduler.SchedulePublish(DateTime.UtcNow + TimeSpan.FromSeconds(20), productCreatedEvent);
_logger.LogInformation(
$"Product created scheduled: {productCreatedEvent.Id} - {productCreatedEvent.Name} - {productCreatedEvent.Sku}");
}
}
}
NOTE:
IPublishEndpoint
will be used to Publish the messages to the broker, andIMessageScheduler
will be used to schedule the messages.
Subscriber
The Subscriber, which is a console application, will consume the events from RabbitMQ and print the event details to the console.
To start, this is how the middleware is initialized:
using Subscriber.Extensions;
IHost host = Host.CreateDefaultBuilder(args)
.ConfigureServices((context, services) =>
{
services.AddMassTransitSubscriber(context.Configuration);
})
.Build();
host.Run();
The extension AddMassTransitSubscriber
contains the details:
public static class ServiceCollectionExtensions
{
public static IServiceCollection AddMassTransitSubscriber(this IServiceCollection services, IConfiguration configuration)
{
services.AddMassTransit(x =>
{
x.AddDelayedMessageScheduler();
x.AddConsumer<QueueProductCreatedConsumer>(typeof(QueueProductCreatedConsumerDefinition));
x.AddConsumer<QueueProductUpdatedConsumer>(typeof(QueueProductUpdatedConsumerDefinition));
x.SetKebabCaseEndpointNameFormatter();
x.UsingRabbitMq((ctx, cfg) =>
{
cfg.Host(configuration.GetConnectionString("RabbitMq"));
cfg.UseDelayedMessageScheduler();
cfg.ServiceInstance(instance =>
{
instance.ConfigureJobServiceEndpoints();
instance.ConfigureEndpoints(ctx, new KebabCaseEndpointNameFormatter("dev", false));
});
});
});
return services;
}
}
NOTE: Consumers are added to the container, and the
QueueProductCreatedConsumerDefinition
andQueueProductUpdatedConsumerDefinition
are used to configure the consumers.
RabbitMQ connection string is retrieved from the appsettings.json
file:
{
"Logging": {
"LogLevel": {
"Default": "Information",
"Microsoft.Hosting.Lifetime": "Information"
}
},
"ConnectionStrings": {
"RabbitMq": "amqp://guest:guest@localhost:5672"
}
}
The class QueueProductCreatedConsumer
implements the IConsumer<ProductCreatedEvent>
interface, and the class QueueProductUpdatedConsumer
implements the IConsumer<ProductUpdatedEvent>
interface.
public class QueueProductCreatedConsumer : IConsumer<ProductCreatedEvent>
{
private readonly ILogger<QueueProductCreatedConsumer> _logger;
public QueueProductCreatedConsumer(ILogger<QueueProductCreatedConsumer> logger)
{
_logger = logger;
}
public Task Consume(ConsumeContext<ProductCreatedEvent> context)
{
var id = context.Message.Id;
var name = context.Message.Name;
var sku = context.Message.Sku;
_logger.LogInformation($"Received - Product created: {id} - {name} - {sku}");
return Task.CompletedTask;
}
}
public class QueueProductCreatedConsumerDefinition : ConsumerDefinition<QueueProductCreatedConsumer>
{
protected override void ConfigureConsumer(
IReceiveEndpointConfigurator endpointConfigurator,
IConsumerConfigurator<QueueProductCreatedConsumer> consumerConfigurator)
{
consumerConfigurator.UseMessageRetry(retry => retry.Interval(3, TimeSpan.FromSeconds(5)));
}
}
public class QueueProductUpdatedConsumer : IConsumer<ProductUpdatedEvent>
{
private readonly ILogger<QueueProductUpdatedConsumer> _logger;
public QueueProductUpdatedConsumer(ILogger<QueueProductUpdatedConsumer> logger)
{
_logger = logger;
}
public Task Consume(ConsumeContext<ProductUpdatedEvent> context)
{
var id = context.Message.Id;
var name = context.Message.Name;
var sku = context.Message.Sku;
_logger.LogInformation($"Received - Product updated: {id} - {name} - {sku}");
return Task.CompletedTask;
}
}
public class QueueProductUpdatedConsumerDefinition : ConsumerDefinition<QueueProductUpdatedConsumer>
{
protected override void ConfigureConsumer(
IReceiveEndpointConfigurator endpointConfigurator,
IConsumerConfigurator<QueueProductUpdatedConsumer> consumerConfigurator)
{
consumerConfigurator.UseMessageRetry(retry => retry.Interval(3, TimeSpan.FromSeconds(5)));
}
}
NOTE: In both consumers, the ConsumerDefinition is used to configure the retry policy, which means that if the consumer fails to process the message, it will retry 3 times with a 5 seconds interval. This could be configured differently for each consumer if needed.
Testing
To test the application, start the Container, then the Publisher and Subscriber projects.
Run the following command to start the docker-compose:
docker compose up
By using one of the .http requests product-created.http
of the Publisher project, trigger it from VStudio:
POST http://localhost:7276/api/ProductCreated
{
"id": 1,
"name": "Milk",
"sku": "MILK01"
}
You should be able to see both consoles, Publisher and Subscriber, with the following outputs:
This will be reflected on RabbitMq:
RabbitMQ URL: http://localhost:15672, use guest/guest to log in.
The queued messages will be zeroed, as the messages were consumed by the Subscriber, but you can see a spike on Message rates.
This example is available on PlayGoKids repository