Messaging with MassTransit and Azure Service Bus in .NET 7

2023, Aug 01

Azure Service Bus is a fully managed enterprise integration message broker. In this post, let's explore an example of how to implement messaging in .NET leveraging Azure Service Bus and MassTransit, with topics and subscriptions.

MassTransit

Just to bring some context, let's start with a brief introduction to MassTransit.

MassTransit1 is an open-source distributed application framework for .NET. It simplifies the development of message-based applications by providing abstractions and utilities for building robust, scalable, and loosely coupled systems. MassTransit supports multiple transport providers, including RabbitMQ, Azure Service Bus, and more. It also provides support for various messaging patterns like publish/subscribe, request/response, and routing.

Azure Service Bus

Similar to MassTransit, here's a brief introduction to Azure Service Bus.

Azure Service Bus2 provides reliable and secure asynchronous messaging between applications and services. Azure Service Bus supports a set of cloud-based, message-oriented middleware technologies including reliable message queuing and durable publish/subscribe messaging.

The example

This example will target Azure Service Bus Topics and Subscriptions3, which is not something common to see with MassTransit examples. The idea is to have publishers and subscribers, where the publishers will publish messages to a Topic, and the subscribers will subscribe from the Subscription and receive the messages.

Azure Service Bus ARM Template

First of all the Azure Service Bus needs to be provisioned, by using the following ARM template:

File: azuredeploy.json

{
  "$schema": "https://schema.management.azure.com/schemas/2019-04-01/deploymentTemplate.json#",
  "contentVersion": "1.0.0.0",
  "parameters": {
    "serviceBusNamespaceName": {
      "type": "string",
      "metadata": {
        "description": "Name of the Service Bus namespace"
      }
    },
    "serviceBusTopicName1": {
      "type": "string",
      "metadata": {
        "description": "Name of the Topic"
      }
    },
    "serviceBusTopicName2": {
      "type": "string",
      "metadata": {
        "description": "Name of the Topic"
      }
    },
    "serviceBusSubscriptionName1": {
      "type": "string",
      "metadata": {
        "description": "Name of the Subscription"
      }
    },
    "serviceBusSubscriptionName2": {
      "type": "string",
      "metadata": {
        "description": "Name of the Subscription"
      }
    },
    "location": {
      "type": "string",
      "defaultValue": "[resourceGroup().location]",
      "metadata": {
        "description": "Location for all resources."
      }
    }
  },
  "resources": [
    {
      "apiVersion": "2022-10-01-preview",
      "name": "[parameters('serviceBusNamespaceName')]",
      "type": "Microsoft.ServiceBus/namespaces",
      "location": "[parameters('location')]",
      "sku": {
        "name": "Standard"
      },
      "properties": {},
      "resources": [
        {
          "apiVersion": "2022-10-01-preview",
          "name": "[parameters('serviceBusTopicName1')]",
          "type": "topics",
          "dependsOn": [
            "[resourceId('Microsoft.ServiceBus/namespaces/', parameters('serviceBusNamespaceName'))]"
          ],
          "properties": {},
          "resources": [
            {
              "apiVersion": "2022-10-01-preview",
              "name": "[parameters('serviceBusSubscriptionName1')]",
              "type": "Subscriptions",
              "dependsOn": [
                "[parameters('serviceBusTopicName1')]"
              ],
              "properties": {}
            }
          ]
        },
        {
          "apiVersion": "2022-10-01-preview",
          "name": "[parameters('serviceBusTopicName2')]",
          "type": "topics",
          "dependsOn": [
            "[resourceId('Microsoft.ServiceBus/namespaces/', parameters('serviceBusNamespaceName'))]"
          ],
          "properties": {},
          "resources": [
            {
              "apiVersion": "2022-10-01-preview",
              "name": "[parameters('serviceBusSubscriptionName2')]",
              "type": "Subscriptions",
              "dependsOn": [
                "[parameters('serviceBusTopicName2')]"
              ],
              "properties": {}
            }
          ]
        }
      ]
    }
  ]
}

with the parameters:

File: azuredeploy.parameters.json

{
  "$schema": "https://schema.management.azure.com/schemas/2019-04-01/deploymentParameters.json#",
  "contentVersion": "1.0.0.0",
  "parameters": {
    "serviceBusNamespaceName": {
      "value": "any-name-you-wish"
    },
    "serviceBusTopicName1": {
      "value": "product-created"
    },
    "serviceBusTopicName2": {
      "value": "product-updated"
    },
    "serviceBusSubscriptionName1": {
      "value": "product-created-sub"
    },
    "serviceBusSubscriptionName2": {
      "value": "product-updated-sub"
    }
  }
}

To deploy this service, the easiest way is to click the repo "Deploy button":

Easy Deploy

which will lead you to the following page on Azure Portal:

Azure Deployment

From there you can simply provide the params and deploy the service, by clicking Review+Create and then Create.

The entity and events

The example will use a simple entity called Product and two record events ProductCreatedEvent and ProductUpdatedEvent. The ProductCreatedEvent event will be published when a product is created, and the ProductUpdatedEvent event will be published when a product is modified/updated.

public class Product
{
    public int Id { get; init; }
    public string Name { get; init; } = string.Empty;
    public string Sku { get; init; } = string.Empty;
    public DateTime CreatedAt { get; init; }
    public DateTime UpdatedAt { get; init; }

    public ProductCreatedEvent ToProductCreatedEvent()  {
        return new ProductCreatedEvent
        {
            Id = Id,
            Name = Name,
            Sku = Sku,
            CreatedAt = CreatedAt
        };
    }

    public ProductUpdatedEvent ToProductUpdatedEvent()
    {
        return new ProductUpdatedEvent()
        {
            Id = Id,
            Name = Name,
            Sku = Sku,
            UpdatedAt = UpdatedAt
        };
    }
}
public record ProductCreatedEvent
{
    public int Id { get; init; }
    public string Name { get; init; } = string.Empty;
    public string Sku { get; init; } = string.Empty;
    public DateTime CreatedAt { get; init; }
}
public record ProductUpdatedEvent
{
    public int Id { get; init; }
    public string Name { get; init; } = string.Empty;
    public string Sku { get; init; } = string.Empty;
    public DateTime UpdatedAt { get; init; }
}

Publisher

The Publisher, which is an API, will expose three endpoints, the first for creating a product, the second for updating a product, and the third for scheduling a product creation. The endpoints will publish the corresponding event to an Azure Service Bus Topic.

To start, this is how the middleware is initialized:

using Microsoft.Extensions.Hosting;
using Publisher.Extensions;

var host = new HostBuilder()
    .ConfigureFunctionsWorkerDefaults()
    .ConfigureServices((context, services) =>
    {
        services.AddMassTransitPublisher(context.Configuration);
    })
    .Build();

host.Run();

The extension AddMassTransitPublisher contains the details:

public static class ServiceCollectionExtensions
{
    public static IServiceCollection AddMassTransitPublisher(this IServiceCollection services, IConfiguration configuration)
    {
        services.AddMassTransit(x =>
        {
            x.AddServiceBusMessageScheduler();
            x.SetKebabCaseEndpointNameFormatter();

            x.UsingAzureServiceBus((ctx, cfg) =>
            {
                cfg.Host(configuration.GetValue<string>("AzServiceBus"));

                cfg.UseServiceBusMessageScheduler();
                cfg.UseMessageRetry(retry => retry.Interval(3, TimeSpan.FromSeconds(5)));

                cfg.Message<ProductCreatedEvent>(m => m.SetEntityName(Constants.ProductCreatedTopic));
                cfg.Message<ProductUpdatedEvent>(m => m.SetEntityName(Constants.ProductUpdatedTopic));
            });
        });

        return services;
    }
}

NOTE: Retries are configured to retry 3 times with a 5 seconds interval. Another thing to notice is that to indicate the Topic to publish the message to, we use the SetEntityName method.

The Service Bus connection string is retrieved from the local.settings.json file:

{
  "IsEncrypted": false,
  "Values": {
    "AzureWebJobsStorage": "UseDevelopmentStorage=true",
    "FUNCTIONS_WORKER_RUNTIME": "dotnet-isolated",
    "AzServiceBus": "Endpoint=sb://yourinstance.servicebus.windows.net/;SharedAccessKeyName=TheNameOfYourKey;SharedAccessKey=TheKeyItself"
  }
}

In this example, we have Azure Function endpoints.

public class Endpoints
{
    private readonly IPublishEndpoint _publisher;
    private readonly IMessageScheduler _scheduler;
    private readonly ILogger _logger;

    public Endpoints(
        ILoggerFactory loggerFactory, 
        IPublishEndpoint publisher,
        IMessageScheduler scheduler)
    {
        _publisher = publisher;
        _scheduler = scheduler;
        _logger = loggerFactory.CreateLogger<Endpoints>();
    }

    [Function(nameof(ProductCreated))]
    public async Task ProductCreated([HttpTrigger(AuthorizationLevel.Function, "post")] HttpRequestData req)
    {
        _logger.LogInformation("C# HTTP trigger function processed a request.");

        var productCreatedEvent = await req.ReadFromJsonAsync<ProductCreatedEvent>();

        if (productCreatedEvent != null)
        {
            await _publisher.Publish(productCreatedEvent, CancellationToken.None);

            _logger.LogInformation(
                $"Product created: {productCreatedEvent.Id} - {productCreatedEvent.Name} - {productCreatedEvent.Sku}");
        }
    }

    [Function(nameof(ProductUpdated))]
    public async Task ProductUpdated([HttpTrigger(AuthorizationLevel.Function, "put")] HttpRequestData req)
    {
        _logger.LogInformation("C# HTTP trigger function processed a request.");

        var productUpdatedEvent = await req.ReadFromJsonAsync<ProductUpdatedEvent>();

        if (productUpdatedEvent != null)
        {
            await _publisher.Publish(productUpdatedEvent, CancellationToken.None);

            _logger.LogInformation(
                $"Product updated: {productUpdatedEvent.Id} - {productUpdatedEvent.Name} - {productUpdatedEvent.Sku}");
        }
    }

    [Function(nameof(ProductCreatedScheduled))]
    public async Task ProductCreatedScheduled([HttpTrigger(AuthorizationLevel.Function, "post")] HttpRequestData req)
    {
        _logger.LogInformation("C# HTTP trigger function processed a request.");

        var productCreatedEvent = await req.ReadFromJsonAsync<ProductCreatedEvent>();

        if (productCreatedEvent != null)
        {
            await _scheduler.SchedulePublish(DateTime.UtcNow + TimeSpan.FromSeconds(20), productCreatedEvent);

            _logger.LogInformation(
                $"Product created scheduled: {productCreatedEvent.Id} - {productCreatedEvent.Name} - {productCreatedEvent.Sku}");
        }
    }
}

NOTE: IPublishEndpoint will be used to Publish the messages to the broker, and IMessageScheduler will be used to schedule the messages.

Subscriber

The Subscriber, which is a console application, will consume the events from Azure Service Bus Subscription and print the event details to the console.

To start, this is how the middleware is initialized:

using Subscriber.Extensions;

IHost host = Host.CreateDefaultBuilder(args)
    .ConfigureServices((context, services) =>
    {
        services.AddMassTransitSubscriber(context.Configuration);
    })
    .Build();

host.Run();

The extension AddMassTransitSubscriber contains the details:

public static class ServiceCollectionExtensions
{
    public static IServiceCollection AddMassTransitSubscriber(this IServiceCollection services, IConfiguration configuration)
    {
        services.AddMassTransit(x =>
        {
            x.AddServiceBusMessageScheduler();
            x.AddConsumer<QueueProductCreatedConsumer>(typeof(QueueProductCreatedConsumerDefinition));
            x.AddConsumer<QueueProductUpdatedConsumer>(typeof(QueueProductUpdatedConsumerDefinition));

            x.SetKebabCaseEndpointNameFormatter();

            x.UsingAzureServiceBus((ctx, cfg) =>
            {
                cfg.Host(configuration.GetConnectionString("AzServiceBus"));
                cfg.UseServiceBusMessageScheduler();

                cfg.Message<ProductCreatedEvent>(m => m.SetEntityName(Constants.ProductCreatedTopic));
                cfg.Message<ProductUpdatedEvent>(m => m.SetEntityName(Constants.ProductUpdatedTopic));

                cfg.SubscriptionEndpoint<ProductCreatedEvent>(Constants.ProductCreatedSubscription, configurator =>
                {
                    configurator.ConfigureConsumer<QueueProductCreatedConsumer>(ctx);
                });

                cfg.SubscriptionEndpoint<ProductUpdatedEvent>(Constants.ProductUpdatedSubscription, configurator =>
                {
                    configurator.ConfigureConsumer<QueueProductUpdatedConsumer>(ctx);
                });
            });
        });

        return services;
    }
}

NOTE: Consumers are added to the container, and the QueueProductCreatedConsumerDefinition and QueueProductUpdatedConsumerDefinition are used to configure the consumers. We also configured the consumer to consume the messages from the Subscription, and that is done using the SubscriptionEndpoint method. This is possible because the Topic was specified in previous lines using the SetEntityName method.

The Azure Service Bus connection string is retrieved from the appsettings.json file:

{
  "Logging": {
    "LogLevel": {
      "Default": "Information",
      "Microsoft.Hosting.Lifetime": "Information"
    }
  },
  "ConnectionStrings": {
    "AzServiceBus": "Endpoint=sb://yourinstance.servicebus.windows.net/;SharedAccessKeyName=TheNameOfYourKey;SharedAccessKey=TheKeyItself"
  }
}

The class QueueProductCreatedConsumer implements the IConsumer<ProductCreatedEvent> interface, and the class QueueProductUpdatedConsumer implements the IConsumer<ProductUpdatedEvent> interface.

public class QueueProductCreatedConsumer : IConsumer<ProductCreatedEvent>
{
    private readonly ILogger<QueueProductCreatedConsumer> _logger;

    public QueueProductCreatedConsumer(ILogger<QueueProductCreatedConsumer> logger)
    {
        _logger = logger;
    }

    public Task Consume(ConsumeContext<ProductCreatedEvent> context)
    {
        var id = context.Message.Id;
        var name = context.Message.Name;
        var sku = context.Message.Sku;

        _logger.LogInformation($"Received - Product created: {id} - {name} - {sku}");

        return Task.CompletedTask;
    }
}

public class QueueProductCreatedConsumerDefinition : ConsumerDefinition<QueueProductCreatedConsumer>
{
    protected override void ConfigureConsumer(
        IReceiveEndpointConfigurator endpointConfigurator, 
        IConsumerConfigurator<QueueProductCreatedConsumer> consumerConfigurator)
    {
        consumerConfigurator.UseMessageRetry(retry => retry.Interval(3, TimeSpan.FromSeconds(5)));
    }
}
public class QueueProductUpdatedConsumer : IConsumer<ProductUpdatedEvent>
{
    private readonly ILogger<QueueProductUpdatedConsumer> _logger;

    public QueueProductUpdatedConsumer(ILogger<QueueProductUpdatedConsumer> logger)
    {
        _logger = logger;
    }

    public Task Consume(ConsumeContext<ProductUpdatedEvent> context)
    {
        var id = context.Message.Id;
        var name = context.Message.Name;
        var sku = context.Message.Sku;

        _logger.LogInformation($"Received - Product updated: {id} - {name} - {sku}");

        return Task.CompletedTask;
    }
}

public class QueueProductUpdatedConsumerDefinition : ConsumerDefinition<QueueProductUpdatedConsumer>
{
    protected override void ConfigureConsumer(
        IReceiveEndpointConfigurator endpointConfigurator, 
        IConsumerConfigurator<QueueProductUpdatedConsumer> consumerConfigurator)
    {
        consumerConfigurator.UseMessageRetry(retry => retry.Interval(3, TimeSpan.FromSeconds(5)));
    }
}

NOTE: In both consumers, the ConsumerDefinition is used to configure the retry policy, which means that if the consumer fails to process the message, it will retry 3 times with a 5 seconds interval. This could be configured differently for each consumer if needed.

Testing

To test the application, start the Container, then the Publisher and Subscriber projects.

Solution

By using one of the .http requests product-created.http of the Publisher project, trigger it from VStudio:

POST http://localhost:7276/api/ProductCreated

{
    "id": 1,
    "name": "Milk",
    "sku": "MILK01"
}

You should be able to see both consoles, Publisher and Subscriber, with the following outputs:

Publisher

Subscriber

This will be reflected on RabbitMq:

ServiceBus

A single incoming and outgoing message, which indicates that it was submitted to the topic and consumed by the subscription.

This example is available on PlayGoKids repository


  1. MassTransit link
  2. Azure Service Bus link
  3. Azure Service Bus Topics and Subscriptions link