Welcome to this comprehensive session from Code Night New Zealand! Today we’re diving deep into building scalable Kafka streaming data pipelines with Azure Functions. This session covers everything from the mechanics behind the scenes to production-ready implementations with practical examples in both .NET and Java.
Understanding Kafka Streaming with Azure Functions
Behind the Scenes: How It Works
When working with Azure Functions and Kafka, there’s a sophisticated mechanism working behind the scenes that handles many complexities for developers:
π§ Automatic Scaling: Functions automatically scale based on the number of incoming events
βοΈ Load Balancing: Automatic rebalancing of events across function instances
π‘οΈ Buffering: Smart buffering when you can’t process events at the expected rate
π Throughput Management: Built-in handling for high-volume data streams
The Architecture Behind Azure Functions Kafka Integration
βββββββββββββββ βββββββββββββββββββ ββββββββββββββββ βββββββββββββββ
β Kafka β β Kafka β β Channel β β Azure β
β Broker(s) βββββΆβ Listener βββββΆβ βββββΆβ Function β
β β β β β β β (Your Code) β
βββββββββββββββ βββββββββββββββββββ ββββββββββββββββ βββββββββββββββ
Key Components:
- Kafka Listener: Responsible for pulling events from the broker
- Channel: Makes events available for function execution
- Function: Where your business logic executes
Processing Modes: Single vs Batch
Single Mode Processing
Events are processed individually, one at a time:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
| [FunctionName("SingleEventProcessor")]
public static void Run(
[KafkaTrigger("BrokerList", "topic-name", ConsumerGroup = "my-group")]
string kafkaEvent,
ILogger log)
{
log.LogInformation($"Processing single event: {kafkaEvent}");
try
{
// Process individual event
var data = ProcessMessage(kafkaEvent);
// Handle business logic
}
catch (Exception ex)
{
log.LogError($"Error processing event: {ex.Message}");
throw; // Important: Re-throw to handle retry logic
}
}
|
Batch Mode Processing (Recommended for Throughput)
Multiple events processed together for better performance:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
| [FunctionName("BatchEventProcessor")]
public static void Run(
[KafkaTrigger("BrokerList", "topic-name",
ConsumerGroup = "my-group", IsBatched = true)]
string[] kafkaEvents,
ILogger log)
{
log.LogInformation($"Processing batch of {kafkaEvents.Length} events");
try
{
foreach (var eventData in kafkaEvents)
{
try
{
// Process each event in the batch
var data = ProcessMessage(eventData);
// Handle business logic
}
catch (Exception ex)
{
log.LogError($"Error processing individual event: {ex.Message}");
// Handle individual event errors
}
}
}
catch (Exception ex)
{
log.LogError($"Error processing batch: {ex.Message}");
throw; // Critical: Handle batch-level errors
}
}
|
Fine-Tuning Configuration
Azure Functions allows extensive configuration through the host.json file for optimal performance:
1
2
3
4
5
6
7
8
9
10
11
12
| {
"version": "2.0",
"extensions": {
"kafka": {
"maxBatchSize": 100,
"subscriberIntervalInSeconds": 1,
"channelCapacity": 1000,
"maxPollIntervalInMilliseconds": 300000,
"channelFullRetryIntervalInMilliseconds": 50
}
}
}
|
Configuration Parameters Explained
maxBatchSize
- Purpose: Maximum number of events per batch
- Impact: Higher values = better throughput, more memory usage
- Recommendation: Start with 100, tune based on performance testing
subscriberIntervalInSeconds
- Purpose: How frequently the Kafka listener polls for events
- Impact: Lower values = faster event processing, higher CPU usage
- Recommendation: 1-5 seconds for most scenarios
channelCapacity
- Purpose: Number of events buffered in memory
- Impact: Higher values = more scaling capability, more memory usage
- Recommendation: Balance based on available memory and expected throughput
maxPollIntervalInMilliseconds
- Purpose: Timeout for processing events before retry
- Impact: Must accommodate external API calls and processing time
- Recommendation: Set based on your slowest external dependency
Kafka Topics and Partitioning
Understanding Partitions and Consumer Groups
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
| // Single Function - Single Worker
[FunctionName("UserProcessor")]
public static void ProcessUser(
[KafkaTrigger("BrokerList", "user-events",
ConsumerGroup = "user-processing-group")]
string userEvent,
ILogger log)
{
// Processes events from all partitions
}
// Multiple Functions - Cost Optimization
public class UserProcessors
{
[FunctionName("UserProcessor1")]
public static void ProcessUser1(
[KafkaTrigger("BrokerList", "user-events",
ConsumerGroup = "user-processing-group")]
string userEvent,
ILogger log)
{
log.LogInformation("**** Processing in Function 1");
// Business logic here
}
[FunctionName("UserProcessor2")]
public static void ProcessUser2(
[KafkaTrigger("BrokerList", "user-events",
ConsumerGroup = "user-processing-group")]
string userEvent,
ILogger log)
{
log.LogInformation("---- Processing in Function 2");
// Business logic here
}
[FunctionName("UserProcessor3")]
public static void ProcessUser3(
[KafkaTrigger("BrokerList", "user-events",
ConsumerGroup = "user-processing-group")]
string userEvent,
ILogger log)
{
log.LogInformation("==== Processing in Function 3");
// Business logic here
}
}
|
Automatic Rebalancing Benefits
π Dynamic Scaling: As load increases, Azure Functions automatically:
- Spawns new worker instances
- Redistributes partition assignments
- Balances load across available workers
- Scales down when load decreases
π° Cost Optimization: Multiple functions in single worker reduce memory footprint and costs
Event Consumption and Pointer Management
How Azure Functions Manages Event Pointers
Step 1: Pointer created and persisted in Azure Storage for each partition
Step 2: New messages received, function triggered with batch
Step 3: Function completes (with or without exceptions), pointer advances
Step 4: If conditions prevent completion, pointer doesn’t advance, messages reprocessed
Critical Considerations
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
| [FunctionName("ReliableEventProcessor")]
public static void ProcessEvents(
[KafkaTrigger("BrokerList", "events", ConsumerGroup = "reliable-group", IsBatched = true)]
string[] events,
ILogger log)
{
try
{
foreach (var eventData in events)
{
try
{
// Parse and validate event
var parsedEvent = JsonSerializer.Deserialize<EventModel>(eventData);
// Process business logic
await ProcessBusinessLogic(parsedEvent);
// Call external systems (idempotent operations recommended)
await CallExternalAPI(parsedEvent);
log.LogInformation($"Successfully processed event: {parsedEvent.Id}");
}
catch (Exception ex)
{
log.LogError($"Failed to process individual event: {ex.Message}");
// Decide: continue with batch or fail entire batch
// For critical events, consider dead letter queue
}
}
}
catch (Exception ex)
{
log.LogError($"Batch processing failed: {ex.Message}");
throw; // This will prevent pointer advancement and trigger retry
}
}
// Ensure idempotent external operations
private static async Task CallExternalAPI(EventModel eventData)
{
// Use upsert operations where possible
var result = await httpClient.PutAsync($"/api/users/{eventData.UserId}", content);
if (!result.IsSuccessStatusCode)
{
throw new ExternalSystemException($"API call failed: {result.StatusCode}");
}
}
|
Kafka Extensions and Bindings
Available Extension Models
β
Isolated Worker Model (Recommended)
- Runs in separate process
- Better performance and reliability
- Current and future support
β In-Process Model (Legacy)
- Support ends November 2026
- Should migrate to isolated model
- Not recommended for new projects
Kafka Trigger Configuration
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
| // .NET Isolated Worker Example
using Microsoft.Azure.Functions.Worker;
using Microsoft.Extensions.Logging;
public class KafkaFunctions
{
private readonly ILogger<KafkaFunctions> _logger;
public KafkaFunctions(ILogger<KafkaFunctions> logger)
{
_logger = logger;
}
[Function("ProcessKafkaMessage")]
public void ProcessMessage(
[KafkaTrigger("BrokerList", "my-topic",
ConsumerGroup = "my-consumer-group",
IsBatched = false)] string message)
{
_logger.LogInformation($"Received message: {message}");
try
{
var data = GetMessage(message);
_logger.LogInformation($"Parsed: Key={data.Key}, Value={data.Value}, Partition={data.Partition}");
}
catch (Exception ex)
{
_logger.LogError(ex, "Error processing message");
throw;
}
}
[Function("ProcessKafkaBatch")]
public void ProcessBatch(
[KafkaTrigger("BrokerList", "my-topic",
ConsumerGroup = "my-consumer-group",
IsBatched = true)] string[] messages)
{
_logger.LogInformation($"Processing batch of {messages.Length} messages");
foreach (var message in messages)
{
try
{
// Process individual message
var data = GetMessage(message);
// Handle business logic
}
catch (Exception ex)
{
_logger.LogError(ex, $"Error in batch processing: {message}");
// Continue processing other messages or fail entire batch
}
}
}
private MessageData GetMessage(string message)
{
// Parse message and extract metadata
return JsonSerializer.Deserialize<MessageData>(message);
}
}
|
Kafka Output Binding (Producer)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
| public class KafkaProducerFunctions
{
[Function("HttpToKafka")]
[KafkaOutput("BrokerList", "output-topic")]
public static string ProduceSingleMessage(
[HttpTrigger(AuthorizationLevel.Function, "post")]
HttpRequestData req)
{
var body = req.ReadAsString();
// Create Kafka message
var kafkaMessage = new KafkaEventData
{
Timestamp = DateTime.UtcNow,
Key = Guid.NewGuid().ToString(),
Value = body,
Headers = new Dictionary<string, string>
{
["source"] = "http-api",
["correlation-id"] = Guid.NewGuid().ToString()
}
};
return JsonSerializer.Serialize(kafkaMessage);
}
[Function("BatchProducer")]
[KafkaOutput("BrokerList", "output-topic")]
public static KafkaEventData[] ProduceBatchMessages(
[HttpTrigger(AuthorizationLevel.Function, "post")]
HttpRequestData req)
{
var inputData = req.ReadAsString();
var items = JsonSerializer.Deserialize<string[]>(inputData);
return items.Select((item, index) => new KafkaEventData
{
Key = $"batch-key-{index}",
Value = item,
Timestamp = DateTime.UtcNow,
Headers = new Dictionary<string, string>
{
["batch-id"] = Guid.NewGuid().ToString(),
["item-index"] = index.ToString()
}
}).ToArray();
}
}
public class KafkaEventData
{
public string Key { get; set; }
public string Value { get; set; }
public DateTime Timestamp { get; set; }
public Dictionary<string, string> Headers { get; set; }
// These are typically ignored but required by the binding
public int Partition { get; set; }
public long Offset { get; set; }
public string Topic { get; set; }
}
|
Retry Mechanisms and Error Handling
Java Retry Configuration
Azure Functions provides built-in retry mechanisms for Java:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
| @FunctionName("KafkaProcessorWithRetry")
@FixedDelayRetry(maxRetryCount = 3, delayInterval = "00:00:02")
public void processWithFixedRetry(
@KafkaTrigger(
name = "kafkaTrigger",
topic = "my-topic",
brokerList = "BrokerList",
consumerGroup = "retry-group"
) String message,
final ExecutionContext context) {
context.getLogger().info("Processing message with fixed retry: " + message);
try {
// Simulate processing that might fail
if (Math.random() < 0.3) {
throw new RuntimeException("Simulated processing error");
}
// Successful processing
context.getLogger().info("Successfully processed: " + message);
} catch (Exception ex) {
context.getLogger().severe("Processing failed: " + ex.getMessage());
throw ex; // Re-throw to trigger retry
}
}
@FunctionName("KafkaProcessorWithExponentialBackoff")
@ExponentialBackoffRetry(
maxRetryCount = 5,
minimumInterval = "00:00:01",
maximumInterval = "00:00:30"
)
public void processWithExponentialRetry(
@KafkaTrigger(
name = "kafkaTrigger",
topic = "my-topic",
brokerList = "BrokerList",
consumerGroup = "exponential-group"
) String message,
final ExecutionContext context) {
context.getLogger().info("Processing with exponential backoff: " + message);
// Processing logic with automatic exponential backoff on failures
processMessage(message, context);
}
|
.NET Retry with Polly
For .NET, implement retry using Polly:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
| using Polly;
using Polly.Extensions.Http;
public class ResilientKafkaProcessor
{
private readonly HttpClient _httpClient;
private readonly IAsyncPolicy<HttpResponseMessage> _retryPolicy;
public ResilientKafkaProcessor(HttpClient httpClient)
{
_httpClient = httpClient;
_retryPolicy = Policy
.HandleResult<HttpResponseMessage>(r => !r.IsSuccessStatusCode)
.Or<HttpRequestException>()
.WaitAndRetryAsync(
retryCount: 3,
sleepDurationProvider: retryAttempt =>
TimeSpan.FromSeconds(Math.Pow(2, retryAttempt)),
onRetry: (outcome, timespan, retryCount, context) =>
{
Console.WriteLine($"Retry {retryCount} after {timespan} seconds");
});
}
[Function("ResilientKafkaProcessor")]
public async Task ProcessMessage(
[KafkaTrigger("BrokerList", "resilient-topic",
ConsumerGroup = "resilient-group")] string message,
ILogger log)
{
try
{
var eventData = JsonSerializer.Deserialize<EventModel>(message);
// Process with retry policy
var response = await _retryPolicy.ExecuteAsync(async () =>
{
return await _httpClient.PostAsJsonAsync("/api/process", eventData);
});
if (response.IsSuccessStatusCode)
{
log.LogInformation($"Successfully processed event: {eventData.Id}");
}
}
catch (Exception ex)
{
log.LogError(ex, $"Failed to process message after retries: {message}");
// Consider dead letter queue here
throw;
}
}
}
|
Hosting and Scaling Considerations
Recommended Hosting Plans
β
Premium Plan (Elastic Premium)
- Plan Types: EP1, EP2, EP3 (not P1, P2, P3)
- Benefits: Event-driven scaling, perpetual warm instances
- Scaling: Up to 100 instances (Windows), 20-100 instances (Linux)
- Use Case: Production workloads, predictable pricing
1
2
3
4
5
6
7
8
9
10
| # Azure CLI - Create Premium Function App
az functionapp create \
--resource-group myResourceGroup \
--consumption-plan-location westus2 \
--runtime dotnet-isolated \
--functions-version 4 \
--name myKafkaFunctionApp \
--os-type linux \
--plan myPremiumPlan \
--storage-account mystorageaccount
|
β
Dedicated Plan (App Service Plan)
- Benefits: Predictable costs, always-on capability
- Scaling: Manual or CPU/memory-based autoscale
- Use Case: Consistent workloads, cost optimization
Essential Configuration
Enable runtime scale monitoring for Premium plans:
1
2
3
4
5
6
7
8
9
10
11
12
13
| {
"version": "2.0",
"extensionBundle": {
"id": "Microsoft.Azure.Functions.ExtensionBundle",
"version": "[3.*, 4.0.0)"
},
"extensions": {
"kafka": {
"maxBatchSize": 100,
"subscriberIntervalInSeconds": 1
}
}
}
|
Application Settings:
FUNCTIONS_WORKER_RUNTIME=dotnet-isolated
AzureWebJobsStorage=<storage-connection-string>
SCALE_CONTROLLER_LOGGING_ENABLED=AppInsights:Verbose
WEBSITE_RUN_FROM_PACKAGE=1
BrokerList=<kafka-broker-connection-string>
Production-Ready Data Pipeline Architecture
Complete Pipeline Implementation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
| // Raw Event Consumer and Transformer
public class DataPipelineFunctions
{
[Function("RawEventConsumer")]
[KafkaOutput("BrokerList", "public-topic")]
public static string ProcessRawEvent(
[KafkaTrigger("BrokerList", "private-topic",
ConsumerGroup = "transformation-group", IsBatched = true)]
string[] rawEvents,
ILogger log)
{
var transformedEvents = new List<string>();
try
{
foreach (var rawEvent in rawEvents)
{
try
{
// Parse raw event
var sourceEvent = JsonSerializer.Deserialize<SourceEventModel>(rawEvent);
// Transform to company schema
var transformedEvent = new PublicEventModel
{
Id = sourceEvent.ExternalId,
EntityType = MapEntityType(sourceEvent.Type),
Timestamp = sourceEvent.CreatedAt,
Data = TransformData(sourceEvent.Payload),
Metadata = new EventMetadata
{
Source = "external-system",
CorrelationId = Guid.NewGuid().ToString(),
SchemaVersion = "1.0"
}
};
transformedEvents.Add(JsonSerializer.Serialize(transformedEvent));
log.LogInformation($"Transformed event: {sourceEvent.ExternalId}");
}
catch (Exception ex)
{
log.LogError($"Failed to transform event: {ex.Message}");
// Send to dead letter queue
await SendToDeadLetterQueue(rawEvent, ex.Message);
}
}
return JsonSerializer.Serialize(transformedEvents);
}
catch (Exception ex)
{
log.LogError(ex, "Batch transformation failed");
throw;
}
}
[Function("PublicEventConsumer")]
public static async Task ProcessPublicEvent(
[KafkaTrigger("BrokerList", "public-topic",
ConsumerGroup = "delivery-group")]
string publicEvent,
ILogger log)
{
try
{
var eventData = JsonSerializer.Deserialize<PublicEventModel>(publicEvent);
// Deliver to external system
using var httpClient = new HttpClient();
var response = await httpClient.PostAsJsonAsync(
"https://external-system.com/api/events",
eventData);
if (response.IsSuccessStatusCode)
{
log.LogInformation($"Successfully delivered event: {eventData.Id}");
}
else
{
throw new ExternalDeliveryException(
$"Delivery failed: {response.StatusCode}");
}
}
catch (Exception ex)
{
log.LogError(ex, $"Failed to process public event: {publicEvent}");
// Implement retry logic or dead letter queue
throw;
}
}
private static async Task SendToDeadLetterQueue(string failedEvent, string errorMessage)
{
// Implementation for dead letter queue
var deadLetterEvent = new DeadLetterEvent
{
OriginalEvent = failedEvent,
ErrorMessage = errorMessage,
Timestamp = DateTime.UtcNow,
RetryCount = 0
};
// Send to DLQ topic or storage
}
}
// Data Models
public class SourceEventModel
{
public string ExternalId { get; set; }
public string Type { get; set; }
public DateTime CreatedAt { get; set; }
public Dictionary<string, object> Payload { get; set; }
}
public class PublicEventModel
{
public string Id { get; set; }
public string EntityType { get; set; }
public DateTime Timestamp { get; set; }
public Dictionary<string, object> Data { get; set; }
public EventMetadata Metadata { get; set; }
}
public class EventMetadata
{
public string Source { get; set; }
public string CorrelationId { get; set; }
public string SchemaVersion { get; set; }
}
|
Dead Letter Queue Implementation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
| public class DeadLetterQueueHandler
{
[Function("DeadLetterProcessor")]
public static async Task ProcessDeadLetters(
[KafkaTrigger("BrokerList", "dead-letter-queue",
ConsumerGroup = "dlq-processor")]
string deadLetterEvent,
ILogger log)
{
try
{
var dlqEvent = JsonSerializer.Deserialize<DeadLetterEvent>(deadLetterEvent);
// Analyze error and decide on action
if (dlqEvent.RetryCount < 3 && IsRetryableError(dlqEvent.ErrorMessage))
{
// Increment retry count and reprocess
dlqEvent.RetryCount++;
await ReprocessEvent(dlqEvent.OriginalEvent);
}
else
{
// Send to manual review queue or alert operations
await AlertOperationsTeam(dlqEvent);
}
}
catch (Exception ex)
{
log.LogError(ex, "Failed to process dead letter event");
}
}
private static bool IsRetryableError(string errorMessage)
{
// Logic to determine if error is retryable
return errorMessage.Contains("timeout") ||
errorMessage.Contains("rate limit") ||
errorMessage.Contains("temporary");
}
private static async Task ReprocessEvent(string originalEvent)
{
// Logic to send event back to processing pipeline
}
private static async Task AlertOperationsTeam(DeadLetterEvent dlqEvent)
{
// Send alert to operations team for manual intervention
}
}
public class DeadLetterEvent
{
public string OriginalEvent { get; set; }
public string ErrorMessage { get; set; }
public DateTime Timestamp { get; set; }
public int RetryCount { get; set; }
}
|
Key Considerations and Best Practices
Delivery Guarantees
β Important: Azure Functions Kafka extension provides “at least once” delivery, not “at most once”
Implications:
- Expect Duplicates: Design systems to handle duplicate messages
- Idempotent Operations: Use upsert operations where possible
- Deduplication Logic: Implement client-side deduplication if needed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
| public class IdempotentProcessor
{
private readonly IMemoryCache _processedEvents;
[Function("IdempotentEventProcessor")]
public async Task ProcessEvent(
[KafkaTrigger("BrokerList", "events", ConsumerGroup = "idempotent-group")]
string eventData,
ILogger log)
{
var eventId = ExtractEventId(eventData);
// Check if already processed
if (_processedEvents.TryGetValue(eventId, out _))
{
log.LogInformation($"Event {eventId} already processed, skipping");
return;
}
try
{
// Process event
await ProcessBusinessLogic(eventData);
// Mark as processed
_processedEvents.Set(eventId, true, TimeSpan.FromHours(1));
log.LogInformation($"Successfully processed event: {eventId}");
}
catch (Exception ex)
{
log.LogError(ex, $"Failed to process event: {eventId}");
throw;
}
}
}
|
Exception Handling Requirements
π¨ Critical: Every function should include a try-catch block at the highest level
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
| [Function("ProductionKafkaProcessor")]
public static async Task ProcessEvent(
[KafkaTrigger("BrokerList", "production-topic",
ConsumerGroup = "production-group")] string eventData,
ILogger log)
{
try
{
// All business logic wrapped in try-catch
var parsedEvent = JsonSerializer.Deserialize<EventModel>(eventData);
await ValidateEvent(parsedEvent);
await ProcessBusinessLogic(parsedEvent);
await NotifyDownstreamSystems(parsedEvent);
log.LogInformation($"Successfully processed: {parsedEvent.Id}");
}
catch (ValidationException vex)
{
log.LogWarning($"Validation failed: {vex.Message}");
// Don't throw - skip invalid events
}
catch (TransientException tex)
{
log.LogWarning($"Transient error: {tex.Message}");
throw; // Retry transient errors
}
catch (Exception ex)
{
log.LogError(ex, "Unhandled exception in event processing");
// Decide: throw for retry or handle gracefully
throw;
}
}
|
Current Limitations
Auto-Commit Only
Currently, Azure Functions only supports enable.auto.commit = true:
Workaround for Manual Control
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
| // Current: Auto-commit after function completion
[Function("AutoCommitProcessor")]
public static void ProcessWithAutoCommit(
[KafkaTrigger("BrokerList", "topic", ConsumerGroup = "auto-group")]
string[] events,
ILogger log)
{
// Process all events - offset commits automatically
foreach (var evt in events)
{
ProcessEvent(evt);
}
// Offset committed here automatically
}
// If you need manual commit control, consider:
// 1. Using smaller batch sizes
// 2. Processing in smaller chunks
// 3. Using external Kafka client libraries for more control
|
Batch Size Optimization
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
| // Test different batch sizes for your workload
// Start with these settings and adjust based on performance testing
// High Throughput - Large Batches
{
"kafka": {
"maxBatchSize": 1000,
"subscriberIntervalInSeconds": 1,
"channelCapacity": 5000
}
}
// Low Latency - Small Batches
{
"kafka": {
"maxBatchSize": 10,
"subscriberIntervalInSeconds": 1,
"channelCapacity": 100
}
}
// Balanced - Medium Batches
{
"kafka": {
"maxBatchSize": 100,
"subscriberIntervalInSeconds": 2,
"channelCapacity": 1000
}
}
|
Memory and CPU Optimization
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
| public class OptimizedKafkaProcessor
{
[Function("OptimizedProcessor")]
public static async Task ProcessOptimized(
[KafkaTrigger("BrokerList", "high-volume-topic",
ConsumerGroup = "optimized-group", IsBatched = true)]
string[] events,
ILogger log)
{
// Use parallel processing for CPU-intensive tasks
var tasks = events.Select(async eventData =>
{
try
{
return await ProcessEventAsync(eventData);
}
catch (Exception ex)
{
log.LogError(ex, $"Failed to process event: {eventData}");
return null;
}
});
var results = await Task.WhenAll(tasks);
var successCount = results.Count(r => r != null);
log.LogInformation($"Processed {successCount}/{events.Length} events successfully");
}
private static async Task<ProcessResult> ProcessEventAsync(string eventData)
{
// Optimize JSON parsing
using var document = JsonDocument.Parse(eventData);
var root = document.RootElement;
// Process with minimal allocations
var result = new ProcessResult
{
Id = root.GetProperty("id").GetString(),
ProcessedAt = DateTime.UtcNow
};
return result;
}
}
|
Monitoring and Observability
Application Insights Integration
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
| public class MonitoredKafkaProcessor
{
private readonly ILogger<MonitoredKafkaProcessor> _logger;
private readonly TelemetryClient _telemetryClient;
public MonitoredKafkaProcessor(
ILogger<MonitoredKafkaProcessor> logger,
TelemetryClient telemetryClient)
{
_logger = logger;
_telemetryClient = telemetryClient;
}
[Function("MonitoredProcessor")]
public async Task ProcessWithMonitoring(
[KafkaTrigger("BrokerList", "monitored-topic",
ConsumerGroup = "monitoring-group", IsBatched = true)]
string[] events,
ILogger log)
{
using var operation = _telemetryClient.StartOperation<DependencyTelemetry>("ProcessKafkaBatch");
operation.Telemetry.Properties["BatchSize"] = events.Length.ToString();
var stopwatch = Stopwatch.StartNew();
var successCount = 0;
var errorCount = 0;
try
{
foreach (var eventData in events)
{
try
{
await ProcessSingleEvent(eventData);
successCount++;
}
catch (Exception ex)
{
errorCount++;
_telemetryClient.TrackException(ex, new Dictionary<string, string>
{
["EventData"] = eventData,
["BatchSize"] = events.Length.ToString()
});
}
}
}
finally
{
stopwatch.Stop();
// Track custom metrics
_telemetryClient.TrackMetric("Kafka.BatchProcessingTime", stopwatch.ElapsedMilliseconds);
_telemetryClient.TrackMetric("Kafka.SuccessfulEvents", successCount);
_telemetryClient.TrackMetric("Kafka.FailedEvents", errorCount);
_telemetryClient.TrackMetric("Kafka.BatchSize", events.Length);
operation.Telemetry.Success = errorCount == 0;
operation.Telemetry.ResultCode = errorCount == 0 ? "200" : "500";
}
}
}
|
Testing Strategies
Unit Testing Kafka Functions
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
| [TestClass]
public class KafkaFunctionTests
{
private Mock<ILogger> _loggerMock;
private KafkaProcessor _processor;
[TestInitialize]
public void Setup()
{
_loggerMock = new Mock<ILogger>();
_processor = new KafkaProcessor(_loggerMock.Object);
}
[TestMethod]
public async Task ProcessEvent_ValidEvent_ProcessesSuccessfully()
{
// Arrange
var testEvent = JsonSerializer.Serialize(new EventModel
{
Id = "test-123",
Type = "user.created",
Data = new { UserId = "user-123", Email = "test@example.com" }
});
// Act & Assert
await _processor.ProcessEvent(testEvent, _loggerMock.Object);
// Verify logging
_loggerMock.Verify(
x => x.Log(
LogLevel.Information,
It.IsAny<EventId>(),
It.Is<It.IsAnyType>((v, t) => v.ToString().Contains("Successfully processed")),
It.IsAny<Exception>(),
It.IsAny<Func<It.IsAnyType, Exception, string>>()),
Times.Once);
}
[TestMethod]
public async Task ProcessBatch_MixedValidInvalidEvents_ProcessesValidEventsOnly()
{
// Arrange
var validEvent = JsonSerializer.Serialize(new EventModel { Id = "valid" });
var invalidEvent = "invalid-json";
var events = new[] { validEvent, invalidEvent };
// Act
await _processor.ProcessBatch(events, _loggerMock.Object);
// Assert
_loggerMock.Verify(
x => x.Log(
LogLevel.Error,
It.IsAny<EventId>(),
It.Is<It.IsAnyType>((v, t) => v.ToString().Contains("Failed to process")),
It.IsAny<Exception>(),
It.IsAny<Func<It.IsAnyType, Exception, string>>()),
Times.Once);
}
}
|
Integration Testing
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
| [TestClass]
public class KafkaIntegrationTests
{
private TestServer _testServer;
private HttpClient _client;
[TestInitialize]
public void Setup()
{
var builder = new WebHostBuilder()
.UseStartup<TestStartup>()
.ConfigureServices(services =>
{
services.AddSingleton<IKafkaProducer, TestKafkaProducer>();
});
_testServer = new TestServer(builder);
_client = _testServer.CreateClient();
}
[TestMethod]
public async Task HttpToKafka_ValidRequest_ProducesMessage()
{
// Arrange
var testData = new { Message = "Test message", UserId = "123" };
var content = new StringContent(
JsonSerializer.Serialize(testData),
Encoding.UTF8,
"application/json");
// Act
var response = await _client.PostAsync("/api/produce", content);
// Assert
Assert.AreEqual(HttpStatusCode.OK, response.StatusCode);
// Verify message was produced (using test double)
var producer = _testServer.Services.GetRequiredService<IKafkaProducer>() as TestKafkaProducer;
Assert.AreEqual(1, producer.ProducedMessages.Count);
}
}
|
Real-World Use Cases
Example: Salesforce to Dynamics 365 Integration
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
| public class CRMIntegrationPipeline
{
[Function("SalesforceEventProcessor")]
[KafkaOutput("BrokerList", "crm-integration-topic")]
public static string ProcessSalesforceEvent(
[KafkaTrigger("BrokerList", "salesforce-events",
ConsumerGroup = "crm-integration")]
string salesforceEvent,
ILogger log)
{
try
{
// Parse Salesforce webhook event
var sfEvent = JsonSerializer.Deserialize<SalesforceEvent>(salesforceEvent);
// Transform to internal format
var integrationEvent = new CRMIntegrationEvent
{
Id = Guid.NewGuid().ToString(),
SourceSystem = "Salesforce",
TargetSystem = "Dynamics365",
EntityType = MapEntityType(sfEvent.ObjectType),
Operation = MapOperation(sfEvent.EventType),
Data = TransformSalesforceData(sfEvent.Data),
Timestamp = DateTime.UtcNow,
CorrelationId = sfEvent.Id
};
log.LogInformation($"Transformed Salesforce event: {sfEvent.Id} -> {integrationEvent.Id}");
return JsonSerializer.Serialize(integrationEvent);
}
catch (Exception ex)
{
log.LogError(ex, $"Failed to process Salesforce event: {salesforceEvent}");
throw;
}
}
[Function("DynamicsIntegrationProcessor")]
public static async Task ProcessDynamicsIntegration(
[KafkaTrigger("BrokerList", "crm-integration-topic",
ConsumerGroup = "dynamics-integration")]
string integrationEvent,
ILogger log)
{
try
{
var crmEvent = JsonSerializer.Deserialize<CRMIntegrationEvent>(integrationEvent);
// Call Dynamics 365 API
using var httpClient = new HttpClient();
// Add authentication headers
httpClient.DefaultRequestHeaders.Authorization =
new AuthenticationHeaderValue("Bearer", await GetDynamicsToken());
var dynamicsEndpoint = GetDynamicsEndpoint(crmEvent.EntityType, crmEvent.Operation);
var response = await httpClient.PostAsJsonAsync(dynamicsEndpoint, crmEvent.Data);
if (response.IsSuccessStatusCode)
{
log.LogInformation($"Successfully synced to Dynamics: {crmEvent.CorrelationId}");
}
else
{
throw new DynamicsIntegrationException(
$"Dynamics API call failed: {response.StatusCode}");
}
}
catch (Exception ex)
{
log.LogError(ex, $"Failed to process Dynamics integration: {integrationEvent}");
throw;
}
}
}
|
Conclusion
Building scalable Kafka streaming data pipelines with Azure Functions provides a powerful, managed solution for event-driven architectures. Key takeaways:
π Automatic Scaling: Functions handle scaling and rebalancing automatically
βοΈ Load Distribution: Built-in partition management and consumer group balancing
π‘οΈ Resilience: Retry mechanisms and error handling patterns
π° Cost Optimization: Multiple functions per worker for resource efficiency
π Production Ready: Comprehensive monitoring and observability options
Best Practices Summary
- Always use try-catch blocks at the highest level of your functions
- Design for idempotency - expect duplicate messages
- Implement proper error handling with dead letter queues
- Use batch processing for better throughput when possible
- Monitor and tune configuration based on your specific workload
- Choose appropriate hosting plans (Premium or Dedicated)
- Test thoroughly with realistic data volumes and error scenarios
When to Use Azure Functions for Kafka
**β
Ideal Scenarios:
- Event-driven microservices architectures
- Real-time data processing pipelines
- System integration and data synchronization
- Serverless stream processing
- Auto-scaling event consumers
**β Consider Alternatives When:
- Need manual commit control (until feature is available)
- Extremely high throughput requirements (millions of messages/second)
- Complex stream processing operations (consider Azure Stream Analytics)
- Real-time analytics with low latency requirements
The Azure Functions Kafka extension provides an excellent balance of managed infrastructure, automatic scaling, and developer productivity for most streaming data scenarios.
Resources
Have you implemented Kafka streaming pipelines with Azure Functions? Share your experiences and challenges in the comments below!
Join the conversation! Share your thoughts and connect with other readers.