Monitor Pattern - Part 5 - Azure Durable Functions

2022, Sep 05

The Monitor pattern is implemented as a long-running operation that consumes an external endpoint, checking for a state change, different from the async HTTP API pattern, which reversely exposes an endpoint for an external client to monitor a long-running operation.

This is a series where I am explaining the components of durable functions1 and the different application patterns of implementation.

Check the PlayGoKids repository for this article demo.

Understanding the Flow

Monitor

In this flow, the orchestration initiates a long-running asynchronous process and then periodically polls to see if the operation is complete. This is orchestrated by using a timer and loop.

This example was a slight modification of Microsoft's example, to reflect a Real World implementation. It is a durable function created to monitor an Image Processing task, that can take some time for processing.

The Monitor_HttpStart is the HTTP Trigger.

Starter Function (aka DurableClient)

[FunctionName($"{nameof(Monitor)}_HttpStart")]
public async Task<HttpResponseMessage> HttpStart(
    [HttpTrigger(AuthorizationLevel.Anonymous, "get", "post")] HttpRequestMessage req,
    [DurableClient] IDurableOrchestrationClient starter,
    ILogger log)
{
    var imageDto = await req.Content.ReadAsAsync<ImageDto>();

    var instanceId = await starter.StartNewAsync(nameof(Constants.RunOrchestrator), imageDto);

    log.LogInformation($"Started orchestration with ID = '{instanceId}'.");

    return starter.CreateCheckStatusResponse(req, instanceId);
}

The ImageDto holds image data:

public class ImageDto
{
    public string FileName { get; set; }
    public byte[] File { get; set; }
}

This is an HTTPTrigger function that can be triggered from this CURL request:

curl --location --request POST 'http://localhost:7207/api/Monitor_HttpStart' \
--header 'Content-Type: application/json' \
--data-raw '{
    "fileName": "test.txt",
    "file": [12,34,56]
}'

Orchestrator Function

The Orchestrator is responsible for calling all the Activities. On this Monitor pattern note that we are dealing with a while loop, checking for a time represented by isProcessWithinTime.

Depending on the jobStatus, the Orchestrator sleeps during the pollingInterval, getting retriggered automatically because a Timer is created with context.CreateTimer:

public class Orchestrator
{
    [FunctionName(nameof(Constants.RunOrchestrator))]
    public async Task RunOrchestrator(
        [OrchestrationTrigger] IDurableOrchestrationContext context, ILogger log)
    {
        var imageDto = context.GetInput<ImageDto>();
        var pollingInterval = GetPollingInterval();
        var expiryTime = GetExpiryTime(context);

        await context.CallActivityAsync(nameof(Constants.RunProcessImageActivity), imageDto);

        var isProcessWithinTime = context.CurrentUtcDateTime < expiryTime;

        while (isProcessWithinTime)
        {
            var jobStatus = await context.CallActivityAsync<ImageStatus>(nameof(Constants.RunGetStatusImageActivity), imageDto.FileName);

            if (jobStatus == ImageStatus.Processed)
            {
                // Perform an action when a condition is met.
                await context.CallActivityAsync(nameof(Constants.RunSendAlertActivity), imageDto.FileName);
                break;
            }

            // Orchestration sleeps until this time.
            var nextCheck = context.CurrentUtcDateTime.AddSeconds(pollingInterval);
            await context.CreateTimer(nextCheck, CancellationToken.None);
        }

        if (!isProcessWithinTime)
        {
            // Operation has timed out
            log.LogWarning($"The image process operation for {imageDto.FileName} has timed out.");
        }
    }

    private DateTime GetExpiryTime(IDurableOrchestrationContext context)
    {
        return context.CurrentUtcDateTime.AddMinutes(5); //Define the expiry time
    }

    private int GetPollingInterval()
    {
        return 15; //Define the polling interval
    }
}

In case it times out, Logging is recorded.

Activity Functions

The activities in this demo are basically placeholders for Real World actions:

public class Activity
{
    private readonly IImageProcessor _imageProcessor;
    private readonly ILogger<Activity> _logger;

    public Activity(IImageProcessor imageProcessor, ILogger<Activity> logger)
    {
        _imageProcessor = imageProcessor;
        _logger = logger;
    }

    [FunctionName(nameof(Constants.RunProcessImageActivity))]
    public void RunProcessImageActivity([ActivityTrigger] ImageDto imageDto)
    {
        _logger.LogInformation($"{nameof(RunProcessImageActivity)} for {imageDto.FileName}");

        _imageProcessor.Process(imageDto.FileName, imageDto.File);
    }

    [FunctionName(nameof(Constants.RunGetStatusImageActivity))]
    public ImageStatus RunGetStatusImageActivity([ActivityTrigger] string fileName)
    {
        _logger.LogInformation($"{nameof(RunGetStatusImageActivity)} for {fileName}");

        return _imageProcessor.GetStatus(fileName);
    }

    [FunctionName(nameof(Constants.RunSendAlertActivity))]
    public void RunSendAlertActivity([ActivityTrigger] string fileName)
    {
        _logger.LogInformation($"{nameof(RunSendAlertActivity)} for {fileName}");

        //TODO: send alert
    }
}

The ImageProcessor is just a dummy that represents an interface that would make a call to an external service:

public class ImageProcessor : IImageProcessor
{
    public void Process(string fileName, byte[] file)
    {
        //TODO: process image
    }

    public ImageStatus GetStatus(string fileName)
    {
        //TODO: check status dynamically
        return ImageStatus.Processed;
    }
}

This is an example of how to use the Monitor pattern, applied to monitor the processing of an image.

In the next article of the series let's check out how to use the Human interaction pattern.

Full Series

Major components

Function chaining

Fan out/fan in

Async HTTP APIs

Monitor

Human interaction

Aggregator (stateful entities)


  1. Azure Durable Functions link