Building Scalable Kafka Streaming Data Pipelines with Azure Functions

β€’ Azure Functions, Kafka, Event Streaming, Data Pipelines, .NET, Java, Event-Driven Architecture, Code Night, New Zealand β€’ 22 min read

Welcome to this comprehensive session from Code Night New Zealand! Today we’re diving deep into building scalable Kafka streaming data pipelines with Azure Functions. This session covers everything from the mechanics behind the scenes to production-ready implementations with practical examples in both .NET and Java.

Understanding Kafka Streaming with Azure Functions

Behind the Scenes: How It Works

When working with Azure Functions and Kafka, there’s a sophisticated mechanism working behind the scenes that handles many complexities for developers:

πŸ”§ Automatic Scaling: Functions automatically scale based on the number of incoming events
βš–οΈ Load Balancing: Automatic rebalancing of events across function instances
πŸ›‘οΈ Buffering: Smart buffering when you can’t process events at the expected rate
πŸ“Š Throughput Management: Built-in handling for high-volume data streams

The Architecture Behind Azure Functions Kafka Integration

β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚ Kafka       β”‚    β”‚ Kafka           β”‚    β”‚ Channel      β”‚    β”‚ Azure       β”‚
β”‚ Broker(s)   │───▢│ Listener        │───▢│              │───▢│ Function    β”‚
β”‚             β”‚    β”‚                 β”‚    β”‚              β”‚    β”‚ (Your Code) β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜    β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜    β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜    β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

Key Components:

  • Kafka Listener: Responsible for pulling events from the broker
  • Channel: Makes events available for function execution
  • Function: Where your business logic executes

Processing Modes: Single vs Batch

Single Mode Processing

Events are processed individually, one at a time:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
[FunctionName("SingleEventProcessor")]
public static void Run(
    [KafkaTrigger("BrokerList", "topic-name", ConsumerGroup = "my-group")] 
    string kafkaEvent,
    ILogger log)
{
    log.LogInformation($"Processing single event: {kafkaEvent}");
    
    try
    {
        // Process individual event
        var data = ProcessMessage(kafkaEvent);
        // Handle business logic
    }
    catch (Exception ex)
    {
        log.LogError($"Error processing event: {ex.Message}");
        throw; // Important: Re-throw to handle retry logic
    }
}

Multiple events processed together for better performance:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
[FunctionName("BatchEventProcessor")]
public static void Run(
    [KafkaTrigger("BrokerList", "topic-name", 
        ConsumerGroup = "my-group", IsBatched = true)] 
    string[] kafkaEvents,
    ILogger log)
{
    log.LogInformation($"Processing batch of {kafkaEvents.Length} events");
    
    try
    {
        foreach (var eventData in kafkaEvents)
        {
            try
            {
                // Process each event in the batch
                var data = ProcessMessage(eventData);
                // Handle business logic
            }
            catch (Exception ex)
            {
                log.LogError($"Error processing individual event: {ex.Message}");
                // Handle individual event errors
            }
        }
    }
    catch (Exception ex)
    {
        log.LogError($"Error processing batch: {ex.Message}");
        throw; // Critical: Handle batch-level errors
    }
}

Fine-Tuning Configuration

Azure Functions allows extensive configuration through the host.json file for optimal performance:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
{
  "version": "2.0",
  "extensions": {
    "kafka": {
      "maxBatchSize": 100,
      "subscriberIntervalInSeconds": 1,
      "channelCapacity": 1000,
      "maxPollIntervalInMilliseconds": 300000,
      "channelFullRetryIntervalInMilliseconds": 50
    }
  }
}

Configuration Parameters Explained

maxBatchSize

  • Purpose: Maximum number of events per batch
  • Impact: Higher values = better throughput, more memory usage
  • Recommendation: Start with 100, tune based on performance testing

subscriberIntervalInSeconds

  • Purpose: How frequently the Kafka listener polls for events
  • Impact: Lower values = faster event processing, higher CPU usage
  • Recommendation: 1-5 seconds for most scenarios

channelCapacity

  • Purpose: Number of events buffered in memory
  • Impact: Higher values = more scaling capability, more memory usage
  • Recommendation: Balance based on available memory and expected throughput

maxPollIntervalInMilliseconds

  • Purpose: Timeout for processing events before retry
  • Impact: Must accommodate external API calls and processing time
  • Recommendation: Set based on your slowest external dependency

Kafka Topics and Partitioning

Understanding Partitions and Consumer Groups

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
// Single Function - Single Worker
[FunctionName("UserProcessor")]
public static void ProcessUser(
    [KafkaTrigger("BrokerList", "user-events", 
        ConsumerGroup = "user-processing-group")] 
    string userEvent,
    ILogger log)
{
    // Processes events from all partitions
}

// Multiple Functions - Cost Optimization
public class UserProcessors
{
    [FunctionName("UserProcessor1")]
    public static void ProcessUser1(
        [KafkaTrigger("BrokerList", "user-events", 
            ConsumerGroup = "user-processing-group")] 
        string userEvent,
        ILogger log)
    {
        log.LogInformation("**** Processing in Function 1");
        // Business logic here
    }

    [FunctionName("UserProcessor2")]
    public static void ProcessUser2(
        [KafkaTrigger("BrokerList", "user-events", 
            ConsumerGroup = "user-processing-group")] 
        string userEvent,
        ILogger log)
    {
        log.LogInformation("---- Processing in Function 2");
        // Business logic here
    }

    [FunctionName("UserProcessor3")]
    public static void ProcessUser3(
        [KafkaTrigger("BrokerList", "user-events", 
            ConsumerGroup = "user-processing-group")] 
        string userEvent,
        ILogger log)
    {
        log.LogInformation("==== Processing in Function 3");
        // Business logic here
    }
}

Automatic Rebalancing Benefits

πŸ”„ Dynamic Scaling: As load increases, Azure Functions automatically:

  1. Spawns new worker instances
  2. Redistributes partition assignments
  3. Balances load across available workers
  4. Scales down when load decreases

πŸ’° Cost Optimization: Multiple functions in single worker reduce memory footprint and costs

Event Consumption and Pointer Management

How Azure Functions Manages Event Pointers

Step 1: Pointer created and persisted in Azure Storage for each partition Step 2: New messages received, function triggered with batch Step 3: Function completes (with or without exceptions), pointer advances Step 4: If conditions prevent completion, pointer doesn’t advance, messages reprocessed

Critical Considerations

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
[FunctionName("ReliableEventProcessor")]
public static void ProcessEvents(
    [KafkaTrigger("BrokerList", "events", ConsumerGroup = "reliable-group", IsBatched = true)] 
    string[] events,
    ILogger log)
{
    try
    {
        foreach (var eventData in events)
        {
            try
            {
                // Parse and validate event
                var parsedEvent = JsonSerializer.Deserialize<EventModel>(eventData);
                
                // Process business logic
                await ProcessBusinessLogic(parsedEvent);
                
                // Call external systems (idempotent operations recommended)
                await CallExternalAPI(parsedEvent);
                
                log.LogInformation($"Successfully processed event: {parsedEvent.Id}");
            }
            catch (Exception ex)
            {
                log.LogError($"Failed to process individual event: {ex.Message}");
                // Decide: continue with batch or fail entire batch
                // For critical events, consider dead letter queue
            }
        }
    }
    catch (Exception ex)
    {
        log.LogError($"Batch processing failed: {ex.Message}");
        throw; // This will prevent pointer advancement and trigger retry
    }
}

// Ensure idempotent external operations
private static async Task CallExternalAPI(EventModel eventData)
{
    // Use upsert operations where possible
    var result = await httpClient.PutAsync($"/api/users/{eventData.UserId}", content);
    
    if (!result.IsSuccessStatusCode)
    {
        throw new ExternalSystemException($"API call failed: {result.StatusCode}");
    }
}

Kafka Extensions and Bindings

Available Extension Models

  • Runs in separate process
  • Better performance and reliability
  • Current and future support

❌ In-Process Model (Legacy)

  • Support ends November 2026
  • Should migrate to isolated model
  • Not recommended for new projects

Kafka Trigger Configuration

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
// .NET Isolated Worker Example
using Microsoft.Azure.Functions.Worker;
using Microsoft.Extensions.Logging;

public class KafkaFunctions
{
    private readonly ILogger<KafkaFunctions> _logger;

    public KafkaFunctions(ILogger<KafkaFunctions> logger)
    {
        _logger = logger;
    }

    [Function("ProcessKafkaMessage")]
    public void ProcessMessage(
        [KafkaTrigger("BrokerList", "my-topic", 
            ConsumerGroup = "my-consumer-group",
            IsBatched = false)] string message)
    {
        _logger.LogInformation($"Received message: {message}");
        
        try
        {
            var data = GetMessage(message);
            _logger.LogInformation($"Parsed: Key={data.Key}, Value={data.Value}, Partition={data.Partition}");
        }
        catch (Exception ex)
        {
            _logger.LogError(ex, "Error processing message");
            throw;
        }
    }

    [Function("ProcessKafkaBatch")]
    public void ProcessBatch(
        [KafkaTrigger("BrokerList", "my-topic", 
            ConsumerGroup = "my-consumer-group",
            IsBatched = true)] string[] messages)
    {
        _logger.LogInformation($"Processing batch of {messages.Length} messages");
        
        foreach (var message in messages)
        {
            try
            {
                // Process individual message
                var data = GetMessage(message);
                // Handle business logic
            }
            catch (Exception ex)
            {
                _logger.LogError(ex, $"Error in batch processing: {message}");
                // Continue processing other messages or fail entire batch
            }
        }
    }

    private MessageData GetMessage(string message)
    {
        // Parse message and extract metadata
        return JsonSerializer.Deserialize<MessageData>(message);
    }
}

Kafka Output Binding (Producer)

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
public class KafkaProducerFunctions
{
    [Function("HttpToKafka")]
    [KafkaOutput("BrokerList", "output-topic")]
    public static string ProduceSingleMessage(
        [HttpTrigger(AuthorizationLevel.Function, "post")] 
        HttpRequestData req)
    {
        var body = req.ReadAsString();
        
        // Create Kafka message
        var kafkaMessage = new KafkaEventData
        {
            Timestamp = DateTime.UtcNow,
            Key = Guid.NewGuid().ToString(),
            Value = body,
            Headers = new Dictionary<string, string>
            {
                ["source"] = "http-api",
                ["correlation-id"] = Guid.NewGuid().ToString()
            }
        };

        return JsonSerializer.Serialize(kafkaMessage);
    }

    [Function("BatchProducer")]
    [KafkaOutput("BrokerList", "output-topic")]
    public static KafkaEventData[] ProduceBatchMessages(
        [HttpTrigger(AuthorizationLevel.Function, "post")] 
        HttpRequestData req)
    {
        var inputData = req.ReadAsString();
        var items = JsonSerializer.Deserialize<string[]>(inputData);

        return items.Select((item, index) => new KafkaEventData
        {
            Key = $"batch-key-{index}",
            Value = item,
            Timestamp = DateTime.UtcNow,
            Headers = new Dictionary<string, string>
            {
                ["batch-id"] = Guid.NewGuid().ToString(),
                ["item-index"] = index.ToString()
            }
        }).ToArray();
    }
}

public class KafkaEventData
{
    public string Key { get; set; }
    public string Value { get; set; }
    public DateTime Timestamp { get; set; }
    public Dictionary<string, string> Headers { get; set; }
    
    // These are typically ignored but required by the binding
    public int Partition { get; set; }
    public long Offset { get; set; }
    public string Topic { get; set; }
}

Retry Mechanisms and Error Handling

Java Retry Configuration

Azure Functions provides built-in retry mechanisms for Java:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
@FunctionName("KafkaProcessorWithRetry")
@FixedDelayRetry(maxRetryCount = 3, delayInterval = "00:00:02")
public void processWithFixedRetry(
    @KafkaTrigger(
        name = "kafkaTrigger",
        topic = "my-topic",
        brokerList = "BrokerList",
        consumerGroup = "retry-group"
    ) String message,
    final ExecutionContext context) {
    
    context.getLogger().info("Processing message with fixed retry: " + message);
    
    try {
        // Simulate processing that might fail
        if (Math.random() < 0.3) {
            throw new RuntimeException("Simulated processing error");
        }
        
        // Successful processing
        context.getLogger().info("Successfully processed: " + message);
    } catch (Exception ex) {
        context.getLogger().severe("Processing failed: " + ex.getMessage());
        throw ex; // Re-throw to trigger retry
    }
}

@FunctionName("KafkaProcessorWithExponentialBackoff")
@ExponentialBackoffRetry(
    maxRetryCount = 5,
    minimumInterval = "00:00:01",
    maximumInterval = "00:00:30"
)
public void processWithExponentialRetry(
    @KafkaTrigger(
        name = "kafkaTrigger",
        topic = "my-topic",
        brokerList = "BrokerList",
        consumerGroup = "exponential-group"
    ) String message,
    final ExecutionContext context) {
    
    context.getLogger().info("Processing with exponential backoff: " + message);
    
    // Processing logic with automatic exponential backoff on failures
    processMessage(message, context);
}

.NET Retry with Polly

For .NET, implement retry using Polly:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
using Polly;
using Polly.Extensions.Http;

public class ResilientKafkaProcessor
{
    private readonly HttpClient _httpClient;
    private readonly IAsyncPolicy<HttpResponseMessage> _retryPolicy;

    public ResilientKafkaProcessor(HttpClient httpClient)
    {
        _httpClient = httpClient;
        
        _retryPolicy = Policy
            .HandleResult<HttpResponseMessage>(r => !r.IsSuccessStatusCode)
            .Or<HttpRequestException>()
            .WaitAndRetryAsync(
                retryCount: 3,
                sleepDurationProvider: retryAttempt => 
                    TimeSpan.FromSeconds(Math.Pow(2, retryAttempt)),
                onRetry: (outcome, timespan, retryCount, context) =>
                {
                    Console.WriteLine($"Retry {retryCount} after {timespan} seconds");
                });
    }

    [Function("ResilientKafkaProcessor")]
    public async Task ProcessMessage(
        [KafkaTrigger("BrokerList", "resilient-topic", 
            ConsumerGroup = "resilient-group")] string message,
        ILogger log)
    {
        try
        {
            var eventData = JsonSerializer.Deserialize<EventModel>(message);
            
            // Process with retry policy
            var response = await _retryPolicy.ExecuteAsync(async () =>
            {
                return await _httpClient.PostAsJsonAsync("/api/process", eventData);
            });

            if (response.IsSuccessStatusCode)
            {
                log.LogInformation($"Successfully processed event: {eventData.Id}");
            }
        }
        catch (Exception ex)
        {
            log.LogError(ex, $"Failed to process message after retries: {message}");
            // Consider dead letter queue here
            throw;
        }
    }
}

Hosting and Scaling Considerations

βœ… Premium Plan (Elastic Premium)

  • Plan Types: EP1, EP2, EP3 (not P1, P2, P3)
  • Benefits: Event-driven scaling, perpetual warm instances
  • Scaling: Up to 100 instances (Windows), 20-100 instances (Linux)
  • Use Case: Production workloads, predictable pricing
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
# Azure CLI - Create Premium Function App
az functionapp create \
  --resource-group myResourceGroup \
  --consumption-plan-location westus2 \
  --runtime dotnet-isolated \
  --functions-version 4 \
  --name myKafkaFunctionApp \
  --os-type linux \
  --plan myPremiumPlan \
  --storage-account mystorageaccount

βœ… Dedicated Plan (App Service Plan)

  • Benefits: Predictable costs, always-on capability
  • Scaling: Manual or CPU/memory-based autoscale
  • Use Case: Consistent workloads, cost optimization

Essential Configuration

Enable runtime scale monitoring for Premium plans:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
{
  "version": "2.0",
  "extensionBundle": {
    "id": "Microsoft.Azure.Functions.ExtensionBundle",
    "version": "[3.*, 4.0.0)"
  },
  "extensions": {
    "kafka": {
      "maxBatchSize": 100,
      "subscriberIntervalInSeconds": 1
    }
  }
}

Application Settings:

FUNCTIONS_WORKER_RUNTIME=dotnet-isolated
AzureWebJobsStorage=<storage-connection-string>
SCALE_CONTROLLER_LOGGING_ENABLED=AppInsights:Verbose
WEBSITE_RUN_FROM_PACKAGE=1
BrokerList=<kafka-broker-connection-string>

Production-Ready Data Pipeline Architecture

Complete Pipeline Implementation

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
// Raw Event Consumer and Transformer
public class DataPipelineFunctions
{
    [Function("RawEventConsumer")]
    [KafkaOutput("BrokerList", "public-topic")]
    public static string ProcessRawEvent(
        [KafkaTrigger("BrokerList", "private-topic", 
            ConsumerGroup = "transformation-group", IsBatched = true)] 
        string[] rawEvents,
        ILogger log)
    {
        var transformedEvents = new List<string>();
        
        try
        {
            foreach (var rawEvent in rawEvents)
            {
                try
                {
                    // Parse raw event
                    var sourceEvent = JsonSerializer.Deserialize<SourceEventModel>(rawEvent);
                    
                    // Transform to company schema
                    var transformedEvent = new PublicEventModel
                    {
                        Id = sourceEvent.ExternalId,
                        EntityType = MapEntityType(sourceEvent.Type),
                        Timestamp = sourceEvent.CreatedAt,
                        Data = TransformData(sourceEvent.Payload),
                        Metadata = new EventMetadata
                        {
                            Source = "external-system",
                            CorrelationId = Guid.NewGuid().ToString(),
                            SchemaVersion = "1.0"
                        }
                    };
                    
                    transformedEvents.Add(JsonSerializer.Serialize(transformedEvent));
                    log.LogInformation($"Transformed event: {sourceEvent.ExternalId}");
                }
                catch (Exception ex)
                {
                    log.LogError($"Failed to transform event: {ex.Message}");
                    // Send to dead letter queue
                    await SendToDeadLetterQueue(rawEvent, ex.Message);
                }
            }
            
            return JsonSerializer.Serialize(transformedEvents);
        }
        catch (Exception ex)
        {
            log.LogError(ex, "Batch transformation failed");
            throw;
        }
    }

    [Function("PublicEventConsumer")]
    public static async Task ProcessPublicEvent(
        [KafkaTrigger("BrokerList", "public-topic", 
            ConsumerGroup = "delivery-group")] 
        string publicEvent,
        ILogger log)
    {
        try
        {
            var eventData = JsonSerializer.Deserialize<PublicEventModel>(publicEvent);
            
            // Deliver to external system
            using var httpClient = new HttpClient();
            var response = await httpClient.PostAsJsonAsync(
                "https://external-system.com/api/events", 
                eventData);

            if (response.IsSuccessStatusCode)
            {
                log.LogInformation($"Successfully delivered event: {eventData.Id}");
            }
            else
            {
                throw new ExternalDeliveryException(
                    $"Delivery failed: {response.StatusCode}");
            }
        }
        catch (Exception ex)
        {
            log.LogError(ex, $"Failed to process public event: {publicEvent}");
            // Implement retry logic or dead letter queue
            throw;
        }
    }

    private static async Task SendToDeadLetterQueue(string failedEvent, string errorMessage)
    {
        // Implementation for dead letter queue
        var deadLetterEvent = new DeadLetterEvent
        {
            OriginalEvent = failedEvent,
            ErrorMessage = errorMessage,
            Timestamp = DateTime.UtcNow,
            RetryCount = 0
        };
        
        // Send to DLQ topic or storage
    }
}

// Data Models
public class SourceEventModel
{
    public string ExternalId { get; set; }
    public string Type { get; set; }
    public DateTime CreatedAt { get; set; }
    public Dictionary<string, object> Payload { get; set; }
}

public class PublicEventModel
{
    public string Id { get; set; }
    public string EntityType { get; set; }
    public DateTime Timestamp { get; set; }
    public Dictionary<string, object> Data { get; set; }
    public EventMetadata Metadata { get; set; }
}

public class EventMetadata
{
    public string Source { get; set; }
    public string CorrelationId { get; set; }
    public string SchemaVersion { get; set; }
}

Dead Letter Queue Implementation

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
public class DeadLetterQueueHandler
{
    [Function("DeadLetterProcessor")]
    public static async Task ProcessDeadLetters(
        [KafkaTrigger("BrokerList", "dead-letter-queue", 
            ConsumerGroup = "dlq-processor")] 
        string deadLetterEvent,
        ILogger log)
    {
        try
        {
            var dlqEvent = JsonSerializer.Deserialize<DeadLetterEvent>(deadLetterEvent);
            
            // Analyze error and decide on action
            if (dlqEvent.RetryCount < 3 && IsRetryableError(dlqEvent.ErrorMessage))
            {
                // Increment retry count and reprocess
                dlqEvent.RetryCount++;
                await ReprocessEvent(dlqEvent.OriginalEvent);
            }
            else
            {
                // Send to manual review queue or alert operations
                await AlertOperationsTeam(dlqEvent);
            }
        }
        catch (Exception ex)
        {
            log.LogError(ex, "Failed to process dead letter event");
        }
    }

    private static bool IsRetryableError(string errorMessage)
    {
        // Logic to determine if error is retryable
        return errorMessage.Contains("timeout") || 
               errorMessage.Contains("rate limit") ||
               errorMessage.Contains("temporary");
    }

    private static async Task ReprocessEvent(string originalEvent)
    {
        // Logic to send event back to processing pipeline
    }

    private static async Task AlertOperationsTeam(DeadLetterEvent dlqEvent)
    {
        // Send alert to operations team for manual intervention
    }
}

public class DeadLetterEvent
{
    public string OriginalEvent { get; set; }
    public string ErrorMessage { get; set; }
    public DateTime Timestamp { get; set; }
    public int RetryCount { get; set; }
}

Key Considerations and Best Practices

Delivery Guarantees

❗ Important: Azure Functions Kafka extension provides “at least once” delivery, not “at most once”

Implications:

  • Expect Duplicates: Design systems to handle duplicate messages
  • Idempotent Operations: Use upsert operations where possible
  • Deduplication Logic: Implement client-side deduplication if needed
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
public class IdempotentProcessor
{
    private readonly IMemoryCache _processedEvents;

    [Function("IdempotentEventProcessor")]
    public async Task ProcessEvent(
        [KafkaTrigger("BrokerList", "events", ConsumerGroup = "idempotent-group")] 
        string eventData,
        ILogger log)
    {
        var eventId = ExtractEventId(eventData);
        
        // Check if already processed
        if (_processedEvents.TryGetValue(eventId, out _))
        {
            log.LogInformation($"Event {eventId} already processed, skipping");
            return;
        }

        try
        {
            // Process event
            await ProcessBusinessLogic(eventData);
            
            // Mark as processed
            _processedEvents.Set(eventId, true, TimeSpan.FromHours(1));
            
            log.LogInformation($"Successfully processed event: {eventId}");
        }
        catch (Exception ex)
        {
            log.LogError(ex, $"Failed to process event: {eventId}");
            throw;
        }
    }
}

Exception Handling Requirements

🚨 Critical: Every function should include a try-catch block at the highest level

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
[Function("ProductionKafkaProcessor")]
public static async Task ProcessEvent(
    [KafkaTrigger("BrokerList", "production-topic", 
        ConsumerGroup = "production-group")] string eventData,
    ILogger log)
{
    try
    {
        // All business logic wrapped in try-catch
        var parsedEvent = JsonSerializer.Deserialize<EventModel>(eventData);
        
        await ValidateEvent(parsedEvent);
        await ProcessBusinessLogic(parsedEvent);
        await NotifyDownstreamSystems(parsedEvent);
        
        log.LogInformation($"Successfully processed: {parsedEvent.Id}");
    }
    catch (ValidationException vex)
    {
        log.LogWarning($"Validation failed: {vex.Message}");
        // Don't throw - skip invalid events
    }
    catch (TransientException tex)
    {
        log.LogWarning($"Transient error: {tex.Message}");
        throw; // Retry transient errors
    }
    catch (Exception ex)
    {
        log.LogError(ex, "Unhandled exception in event processing");
        // Decide: throw for retry or handle gracefully
        throw;
    }
}

Current Limitations

Auto-Commit Only

Currently, Azure Functions only supports enable.auto.commit = true:

  • βœ… Automatic offset commits after batch processing
  • ❌ Manual commit control not available
  • πŸ”„ Community Request: Vote for manual commit control

Workaround for Manual Control

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
// Current: Auto-commit after function completion
[Function("AutoCommitProcessor")]
public static void ProcessWithAutoCommit(
    [KafkaTrigger("BrokerList", "topic", ConsumerGroup = "auto-group")] 
    string[] events,
    ILogger log)
{
    // Process all events - offset commits automatically
    foreach (var evt in events)
    {
        ProcessEvent(evt);
    }
    // Offset committed here automatically
}

// If you need manual commit control, consider:
// 1. Using smaller batch sizes
// 2. Processing in smaller chunks
// 3. Using external Kafka client libraries for more control

Performance Optimization Tips

Batch Size Optimization

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
// Test different batch sizes for your workload
// Start with these settings and adjust based on performance testing

// High Throughput - Large Batches
{
  "kafka": {
    "maxBatchSize": 1000,
    "subscriberIntervalInSeconds": 1,
    "channelCapacity": 5000
  }
}

// Low Latency - Small Batches
{
  "kafka": {
    "maxBatchSize": 10,
    "subscriberIntervalInSeconds": 1,
    "channelCapacity": 100
  }
}

// Balanced - Medium Batches
{
  "kafka": {
    "maxBatchSize": 100,
    "subscriberIntervalInSeconds": 2,
    "channelCapacity": 1000
  }
}

Memory and CPU Optimization

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
public class OptimizedKafkaProcessor
{
    [Function("OptimizedProcessor")]
    public static async Task ProcessOptimized(
        [KafkaTrigger("BrokerList", "high-volume-topic", 
            ConsumerGroup = "optimized-group", IsBatched = true)] 
        string[] events,
        ILogger log)
    {
        // Use parallel processing for CPU-intensive tasks
        var tasks = events.Select(async eventData =>
        {
            try
            {
                return await ProcessEventAsync(eventData);
            }
            catch (Exception ex)
            {
                log.LogError(ex, $"Failed to process event: {eventData}");
                return null;
            }
        });

        var results = await Task.WhenAll(tasks);
        var successCount = results.Count(r => r != null);
        
        log.LogInformation($"Processed {successCount}/{events.Length} events successfully");
    }

    private static async Task<ProcessResult> ProcessEventAsync(string eventData)
    {
        // Optimize JSON parsing
        using var document = JsonDocument.Parse(eventData);
        var root = document.RootElement;
        
        // Process with minimal allocations
        var result = new ProcessResult
        {
            Id = root.GetProperty("id").GetString(),
            ProcessedAt = DateTime.UtcNow
        };
        
        return result;
    }
}

Monitoring and Observability

Application Insights Integration

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
public class MonitoredKafkaProcessor
{
    private readonly ILogger<MonitoredKafkaProcessor> _logger;
    private readonly TelemetryClient _telemetryClient;

    public MonitoredKafkaProcessor(
        ILogger<MonitoredKafkaProcessor> logger,
        TelemetryClient telemetryClient)
    {
        _logger = logger;
        _telemetryClient = telemetryClient;
    }

    [Function("MonitoredProcessor")]
    public async Task ProcessWithMonitoring(
        [KafkaTrigger("BrokerList", "monitored-topic", 
            ConsumerGroup = "monitoring-group", IsBatched = true)] 
        string[] events,
        ILogger log)
    {
        using var operation = _telemetryClient.StartOperation<DependencyTelemetry>("ProcessKafkaBatch");
        operation.Telemetry.Properties["BatchSize"] = events.Length.ToString();
        
        var stopwatch = Stopwatch.StartNew();
        var successCount = 0;
        var errorCount = 0;

        try
        {
            foreach (var eventData in events)
            {
                try
                {
                    await ProcessSingleEvent(eventData);
                    successCount++;
                }
                catch (Exception ex)
                {
                    errorCount++;
                    _telemetryClient.TrackException(ex, new Dictionary<string, string>
                    {
                        ["EventData"] = eventData,
                        ["BatchSize"] = events.Length.ToString()
                    });
                }
            }
        }
        finally
        {
            stopwatch.Stop();
            
            // Track custom metrics
            _telemetryClient.TrackMetric("Kafka.BatchProcessingTime", stopwatch.ElapsedMilliseconds);
            _telemetryClient.TrackMetric("Kafka.SuccessfulEvents", successCount);
            _telemetryClient.TrackMetric("Kafka.FailedEvents", errorCount);
            _telemetryClient.TrackMetric("Kafka.BatchSize", events.Length);
            
            operation.Telemetry.Success = errorCount == 0;
            operation.Telemetry.ResultCode = errorCount == 0 ? "200" : "500";
        }
    }
}

Testing Strategies

Unit Testing Kafka Functions

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
[TestClass]
public class KafkaFunctionTests
{
    private Mock<ILogger> _loggerMock;
    private KafkaProcessor _processor;

    [TestInitialize]
    public void Setup()
    {
        _loggerMock = new Mock<ILogger>();
        _processor = new KafkaProcessor(_loggerMock.Object);
    }

    [TestMethod]
    public async Task ProcessEvent_ValidEvent_ProcessesSuccessfully()
    {
        // Arrange
        var testEvent = JsonSerializer.Serialize(new EventModel
        {
            Id = "test-123",
            Type = "user.created",
            Data = new { UserId = "user-123", Email = "test@example.com" }
        });

        // Act & Assert
        await _processor.ProcessEvent(testEvent, _loggerMock.Object);
        
        // Verify logging
        _loggerMock.Verify(
            x => x.Log(
                LogLevel.Information,
                It.IsAny<EventId>(),
                It.Is<It.IsAnyType>((v, t) => v.ToString().Contains("Successfully processed")),
                It.IsAny<Exception>(),
                It.IsAny<Func<It.IsAnyType, Exception, string>>()),
            Times.Once);
    }

    [TestMethod]
    public async Task ProcessBatch_MixedValidInvalidEvents_ProcessesValidEventsOnly()
    {
        // Arrange
        var validEvent = JsonSerializer.Serialize(new EventModel { Id = "valid" });
        var invalidEvent = "invalid-json";
        var events = new[] { validEvent, invalidEvent };

        // Act
        await _processor.ProcessBatch(events, _loggerMock.Object);

        // Assert
        _loggerMock.Verify(
            x => x.Log(
                LogLevel.Error,
                It.IsAny<EventId>(),
                It.Is<It.IsAnyType>((v, t) => v.ToString().Contains("Failed to process")),
                It.IsAny<Exception>(),
                It.IsAny<Func<It.IsAnyType, Exception, string>>()),
            Times.Once);
    }
}

Integration Testing

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
[TestClass]
public class KafkaIntegrationTests
{
    private TestServer _testServer;
    private HttpClient _client;

    [TestInitialize]
    public void Setup()
    {
        var builder = new WebHostBuilder()
            .UseStartup<TestStartup>()
            .ConfigureServices(services =>
            {
                services.AddSingleton<IKafkaProducer, TestKafkaProducer>();
            });

        _testServer = new TestServer(builder);
        _client = _testServer.CreateClient();
    }

    [TestMethod]
    public async Task HttpToKafka_ValidRequest_ProducesMessage()
    {
        // Arrange
        var testData = new { Message = "Test message", UserId = "123" };
        var content = new StringContent(
            JsonSerializer.Serialize(testData), 
            Encoding.UTF8, 
            "application/json");

        // Act
        var response = await _client.PostAsync("/api/produce", content);

        // Assert
        Assert.AreEqual(HttpStatusCode.OK, response.StatusCode);
        
        // Verify message was produced (using test double)
        var producer = _testServer.Services.GetRequiredService<IKafkaProducer>() as TestKafkaProducer;
        Assert.AreEqual(1, producer.ProducedMessages.Count);
    }
}

Real-World Use Cases

Example: Salesforce to Dynamics 365 Integration

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
public class CRMIntegrationPipeline
{
    [Function("SalesforceEventProcessor")]
    [KafkaOutput("BrokerList", "crm-integration-topic")]
    public static string ProcessSalesforceEvent(
        [KafkaTrigger("BrokerList", "salesforce-events", 
            ConsumerGroup = "crm-integration")] 
        string salesforceEvent,
        ILogger log)
    {
        try
        {
            // Parse Salesforce webhook event
            var sfEvent = JsonSerializer.Deserialize<SalesforceEvent>(salesforceEvent);
            
            // Transform to internal format
            var integrationEvent = new CRMIntegrationEvent
            {
                Id = Guid.NewGuid().ToString(),
                SourceSystem = "Salesforce",
                TargetSystem = "Dynamics365",
                EntityType = MapEntityType(sfEvent.ObjectType),
                Operation = MapOperation(sfEvent.EventType),
                Data = TransformSalesforceData(sfEvent.Data),
                Timestamp = DateTime.UtcNow,
                CorrelationId = sfEvent.Id
            };

            log.LogInformation($"Transformed Salesforce event: {sfEvent.Id} -> {integrationEvent.Id}");
            
            return JsonSerializer.Serialize(integrationEvent);
        }
        catch (Exception ex)
        {
            log.LogError(ex, $"Failed to process Salesforce event: {salesforceEvent}");
            throw;
        }
    }

    [Function("DynamicsIntegrationProcessor")]
    public static async Task ProcessDynamicsIntegration(
        [KafkaTrigger("BrokerList", "crm-integration-topic", 
            ConsumerGroup = "dynamics-integration")] 
        string integrationEvent,
        ILogger log)
    {
        try
        {
            var crmEvent = JsonSerializer.Deserialize<CRMIntegrationEvent>(integrationEvent);
            
            // Call Dynamics 365 API
            using var httpClient = new HttpClient();
            
            // Add authentication headers
            httpClient.DefaultRequestHeaders.Authorization = 
                new AuthenticationHeaderValue("Bearer", await GetDynamicsToken());
            
            var dynamicsEndpoint = GetDynamicsEndpoint(crmEvent.EntityType, crmEvent.Operation);
            var response = await httpClient.PostAsJsonAsync(dynamicsEndpoint, crmEvent.Data);
            
            if (response.IsSuccessStatusCode)
            {
                log.LogInformation($"Successfully synced to Dynamics: {crmEvent.CorrelationId}");
            }
            else
            {
                throw new DynamicsIntegrationException(
                    $"Dynamics API call failed: {response.StatusCode}");
            }
        }
        catch (Exception ex)
        {
            log.LogError(ex, $"Failed to process Dynamics integration: {integrationEvent}");
            throw;
        }
    }
}

Conclusion

Building scalable Kafka streaming data pipelines with Azure Functions provides a powerful, managed solution for event-driven architectures. Key takeaways:

πŸš€ Automatic Scaling: Functions handle scaling and rebalancing automatically
βš–οΈ Load Distribution: Built-in partition management and consumer group balancing
πŸ›‘οΈ Resilience: Retry mechanisms and error handling patterns
πŸ’° Cost Optimization: Multiple functions per worker for resource efficiency
πŸ“Š Production Ready: Comprehensive monitoring and observability options

Best Practices Summary

  1. Always use try-catch blocks at the highest level of your functions
  2. Design for idempotency - expect duplicate messages
  3. Implement proper error handling with dead letter queues
  4. Use batch processing for better throughput when possible
  5. Monitor and tune configuration based on your specific workload
  6. Choose appropriate hosting plans (Premium or Dedicated)
  7. Test thoroughly with realistic data volumes and error scenarios

When to Use Azure Functions for Kafka

**βœ… Ideal Scenarios:

  • Event-driven microservices architectures
  • Real-time data processing pipelines
  • System integration and data synchronization
  • Serverless stream processing
  • Auto-scaling event consumers

**❌ Consider Alternatives When:

  • Need manual commit control (until feature is available)
  • Extremely high throughput requirements (millions of messages/second)
  • Complex stream processing operations (consider Azure Stream Analytics)
  • Real-time analytics with low latency requirements

The Azure Functions Kafka extension provides an excellent balance of managed infrastructure, automatic scaling, and developer productivity for most streaming data scenarios.


Resources

Have you implemented Kafka streaming pipelines with Azure Functions? Share your experiences and challenges in the comments below!

Comments & Discussion

Join the conversation! Share your thoughts and connect with other readers.