Open Telemetry Kafka Context Propagator with .NET

2024, Mar 30

In distributed applications, how can we transition Open Telemetry Baggage, Activity Context between applications? Leveraging Propagators. They enable the seamless propagation of tracing information across various services and systems, allowing for end-to-end observability.

A bit of Context

Let's bring a scenario of a Producer and a Consumer. The producer pushes a Message and then the consumer receives it, but how is the Open Telemetry context shared?

Here is the glue, basically through Kafka message headers.

Below I share a Producer and Consumer samples for your reference.

Producer

The code below shows a typical implementation of a Kafka producer in .NET:

var deliveryResult = await _producer.ProduceAsync(topicName, new Message<string, string> {
    Key = eventEnvelope.TransactionId,
    Value = topicMessageSerializeObject,
    Headers = contextHeader,
});

The contextHeader is the glue, and the way of how this is created is shown below:

// A message is serialized to string
var topicMessageSerializeObject = JsonConvert.SerializeObject(eventEnvelopTopicMessage)

// The Activity Context is retrieved
ActivityContext contextToInject = default;
if (activity != null)
{
    contextToInject = activity.Context;
}
else if (Activity.Current != null)
{
    contextToInject = Activity.Current.Context;
}

// Kafka headers are created
var contextHeader = new Headers();

// The context is injected to the header
_propagator.Inject(new PropagationContext(contextToInject, Baggage.Current), contextHeader, InjectTraceContextIntoHeader);

// Tags are added to the tracing
activity?.SetTag("messaging.system", "kafka");
activity?.SetTag("messaging.destination_kind", "topic");
activity?.SetTag("messaging.operation", "publish");
activity?.SetTag("messaging.destination", topicName);
activity?.SetTag("message", topicMessageSerializeObject);

The _propagator object needs to be declared as a static TextMapPropagator:

private static readonly TextMapPropagator _propagator = Propagators.DefaultTextMapPropagator;

The context also needs to be injected, and for that the method InjectTraceContextIntoHeader is available:

private void InjectTraceContextIntoHeader(Headers headers, string key, string value)
{
    try
    {
        headers.Add(key, Encoding.UTF8.GetBytes(value)); // adding as byte array
    }
    catch (Exception ex)
    {
        _logger.LogError(ex, "Failed to inject trace context.");
    }
}

Consumer

The code below should be added to the consumer listener:

// Message is deserialized into a KafkaTopicData
var topicMessageFromPrivateTopic = JsonConvert.DeserializeObject<KafkaTopicData>(eventData);

// PropagationContext is extracted to identify the message headers
var parentContext = _propagator.Extract(default, topicMessageFromPrivateTopic, ExtractTraceContextFromHeader);
Baggage.Current = parentContext.Baggage;

// A new activity is started with the parent context
var activityName = $"Topic receive";
using var source = OpenTelemetryExtensions.CreateActivitySource();
using var activity = source.StartActivity(activityName, ActivityKind.Consumer, parentContext.ActivityContext);

// Tags are added to the trace
activity?.SetTag("message", topicMessageFromPrivateTopic.Value);
activity?.SetTag("messaging.system", "kafka");
activity?.SetTag("messaging.operation", "receive");

Nothing special about the KafkaTopicData, it is an object that maps the Kafka key, value and headers:

public class KafkaTopicData
{
    public int Offset { get; set; }
    public int Partition { get; set; }
    public string? Topic { get; set; }
    public string? Value { get; set; }
    public DateTime? TimeStamp { get; set; }
    public string? Key { get; set; }
    public List<Header>? Headers { get; set; }

}

public class Header
{
    public string? Key { get; set; }
    public string? Value { get; set; }
}

The method ExtractTraceContextFromHeader is responsible for extracting the headers and re-creating the context:

private IEnumerable<string> ExtractTraceContextFromHeader(KafkaTopicData? data, string key)
{
    try
    {
        if (data?.Headers != null)
        {
            var header = data.Headers.FirstOrDefault(h => h.Key == key);
            if (header != null && header.Value != null)
            {
                var bytes = Convert.FromBase64String(header.Value);
                return [Encoding.UTF8.GetString(bytes)];
            }
        }
    }
    catch (FormatException ex)
    {
        _logger.LogError(ex, "Error while decoding base64 string.");
        throw;
    }
    catch (Exception ex)
    {
        _logger.LogError(ex, $"Error while extracting trace context: {ex.Message}");
    }

    return Enumerable.Empty<string>();
}

Remember that the _propagator object needs to be declared as a static TextMapPropagator:

private static readonly TextMapPropagator _propagator = Propagators.DefaultTextMapPropagator;

Hope that this article helps you set up Open Telemetry Propagators in your solution.

References: Open Telemetry Semantic Conventions link