Aggregator Pattern - Part 7 - Azure Durable Functions
The Aggregator pattern provides a way to aggregate event data over time in a single entity. This pattern allows data to be aggregated from different sources over a period of time.
This is a series where I am explaining the components of durable functions1 and the different application patterns of implementation.
Check the PlayGoKids repository for this article demo.
Understanding the Flow
In this example, the aggregator pattern is used to help us count candidate votes as if we were in an election.
Event data, coming from a Queue vote-queue
starts the aggregation process. Then a Durable Entity context is used to aggregate voting data.
The QueueTrigger is the Queue Trigger, as the name indicates.
Starter Function (aka DurableClient)
[FunctionName("QueueTrigger")]
public async Task Run(
[QueueTrigger(Constants.Queue)] string candidate,
[DurableClient] IDurableEntityClient entityClient)
{
var entityId = new EntityId(Constants.Entity, Constants.Entity);
await entityClient.SignalEntityAsync(entityId, "add", candidate);
}
The entityId is initialized and then the entity client calls SignalEntityAsync
, which dispatches the counter.
Submitting a message as "Sponge Bob" to the queue, triggers the function, which increments the durable entity.
Durable entity
The VotingCounter
holds the entity model, which is the model to be stored on the state.
[JsonObject(MemberSerialization = MemberSerialization.OptIn)]
public class VotingCounter : IVotingCounter
{
[JsonProperty("voting")]
public Dictionary<string, int> Voting { get; set; } = new();
public void Add(string candidate)
{
if (this.Voting.ContainsKey(candidate))
{
this.Voting[candidate]++;
}
else
{
this.Voting.Add(candidate, 1);
}
}
public void Reset(string candidate) => this.Voting[candidate] = 0;
public Dictionary<string, int> Get() => this.Voting;
}
It is worth mentioning that the attribute JsonObject
is optional, as OptIn
is set by default. The interface IVotingCounter
was also implemented by the model.
public interface IVotingCounter
{
Dictionary<string, int> Voting { get; set; }
void Add(string candidate);
void Reset(string candidate);
Dictionary<string, int> Get();
}
Context
The context holds the durable entity state. The function Counter
has the actions to manipulate the durable entity VotingCounter
.
The operations implemented are: add
, reset
, get
and delete
.
In this example we use the operations add and get.
[FunctionName("Counter")]
public void Counter([EntityTrigger] IDurableEntityContext ctx)
{
var vote = ctx.GetState<VotingCounter>() ?? new VotingCounter();
switch (ctx.OperationName.ToLowerInvariant())
{
case "add":
var candidateToAdd = ctx.GetInput<string>();
vote.Add(candidateToAdd);
ctx.SetState(vote);
break;
case "reset":
var candidateToReset = ctx.GetInput<string>();
vote.Reset(candidateToReset);
ctx.SetState(vote);
break;
case "get":
ctx.Return(vote);
break;
case "delete":
ctx.DeleteState();
break;
}
}
To retrieve the voting results, call the HTTP trigger HTTPTrigger
, as below:
curl --location --request GET 'http://localhost:7007/api/HTTPTrigger'
This will call the function, which reads the entity state of the model VotingCounter
:
[FunctionName("HTTPTrigger")]
public async Task<IActionResult> GetEntities(
[HttpTrigger(AuthorizationLevel.Anonymous, "get")] HttpRequestMessage req,
[DurableClient] IDurableEntityClient entityClient)
{
var entity =
await entityClient.ReadEntityStateAsync<VotingCounter>(new EntityId(Constants.Entity,
Constants.Entity));
return (ActionResult) new OkObjectResult(entity);
}
The result looks like this, and we have a winner!
{
"entityExists": true,
"entityState": {
"voting": {
"\"Sponge Bob\"": 3,
"\"Mr. Krabs\"": 1,
"\"Patrick Star\"": 1
}
}
}
This is an example of how to use the Aggregator pattern, applied to count votes.
This is an example of how to use the Aggregator pattern, being the last article of this series. Hope you have learned a bit more about Azure Durable Functions and application patterns on this journey.
Full Series
Aggregator (stateful entities)