Open Telemetry Kafka Context Propagator with .NET
In distributed applications, how can we transition Open Telemetry Baggage, Activity Context between applications? Leveraging Propagators
. They enable the seamless propagation of tracing information across various services and systems, allowing for end-to-end observability.
A bit of Context
Let's bring a scenario of a Producer
and a Consumer
. The producer pushes a Message and then the consumer receives it, but how is the Open Telemetry context shared?
Here is the glue, basically through Kafka message headers.
Below I share a Producer and Consumer samples for your reference.
Producer
The code below shows a typical implementation of a Kafka producer in .NET:
var deliveryResult = await _producer.ProduceAsync(topicName, new Message<string, string> {
Key = eventEnvelope.TransactionId,
Value = topicMessageSerializeObject,
Headers = contextHeader,
});
The contextHeader
is the glue, and the way of how this is created is shown below:
// A message is serialized to string
var topicMessageSerializeObject = JsonConvert.SerializeObject(eventEnvelopTopicMessage)
// The Activity Context is retrieved
ActivityContext contextToInject = default;
if (activity != null)
{
contextToInject = activity.Context;
}
else if (Activity.Current != null)
{
contextToInject = Activity.Current.Context;
}
// Kafka headers are created
var contextHeader = new Headers();
// The context is injected to the header
_propagator.Inject(new PropagationContext(contextToInject, Baggage.Current), contextHeader, InjectTraceContextIntoHeader);
// Tags are added to the tracing
activity?.SetTag("messaging.system", "kafka");
activity?.SetTag("messaging.destination_kind", "topic");
activity?.SetTag("messaging.operation", "publish");
activity?.SetTag("messaging.destination", topicName);
activity?.SetTag("message", topicMessageSerializeObject);
The _propagator
object needs to be declared as a static TextMapPropagator
:
private static readonly TextMapPropagator _propagator = Propagators.DefaultTextMapPropagator;
The context also needs to be injected, and for that the method InjectTraceContextIntoHeader
is available:
private void InjectTraceContextIntoHeader(Headers headers, string key, string value)
{
try
{
headers.Add(key, Encoding.UTF8.GetBytes(value)); // adding as byte array
}
catch (Exception ex)
{
_logger.LogError(ex, "Failed to inject trace context.");
}
}
Consumer
The code below should be added to the consumer listener:
// Message is deserialized into a KafkaTopicData
var topicMessageFromPrivateTopic = JsonConvert.DeserializeObject<KafkaTopicData>(eventData);
// PropagationContext is extracted to identify the message headers
var parentContext = _propagator.Extract(default, topicMessageFromPrivateTopic, ExtractTraceContextFromHeader);
Baggage.Current = parentContext.Baggage;
// A new activity is started with the parent context
var activityName = $"Topic receive";
using var source = OpenTelemetryExtensions.CreateActivitySource();
using var activity = source.StartActivity(activityName, ActivityKind.Consumer, parentContext.ActivityContext);
// Tags are added to the trace
activity?.SetTag("message", topicMessageFromPrivateTopic.Value);
activity?.SetTag("messaging.system", "kafka");
activity?.SetTag("messaging.operation", "receive");
Nothing special about the KafkaTopicData
, it is an object that maps the Kafka key, value and headers:
public class KafkaTopicData
{
public int Offset { get; set; }
public int Partition { get; set; }
public string? Topic { get; set; }
public string? Value { get; set; }
public DateTime? TimeStamp { get; set; }
public string? Key { get; set; }
public List<Header>? Headers { get; set; }
}
public class Header
{
public string? Key { get; set; }
public string? Value { get; set; }
}
The method ExtractTraceContextFromHeader
is responsible for extracting the headers and re-creating the context:
private IEnumerable<string> ExtractTraceContextFromHeader(KafkaTopicData? data, string key)
{
try
{
if (data?.Headers != null)
{
var header = data.Headers.FirstOrDefault(h => h.Key == key);
if (header != null && header.Value != null)
{
var bytes = Convert.FromBase64String(header.Value);
return [Encoding.UTF8.GetString(bytes)];
}
}
}
catch (FormatException ex)
{
_logger.LogError(ex, "Error while decoding base64 string.");
throw;
}
catch (Exception ex)
{
_logger.LogError(ex, $"Error while extracting trace context: {ex.Message}");
}
return Enumerable.Empty<string>();
}
Remember that the _propagator
object needs to be declared as a static TextMapPropagator
:
private static readonly TextMapPropagator _propagator = Propagators.DefaultTextMapPropagator;
Hope that this article helps you set up Open Telemetry Propagators in your solution.
References: Open Telemetry Semantic Conventions link