What is Akka?
Akka is an actor-based framework for building highly concurrent, distributed, and resilient message-driven applications. While the original Akka is based on scala, Akka.NET is its sister implementation, designed for developers using the .NET platform.
In an Akka system, each actor has its own distinct role and responsibility, contributing to the overall functionality and resilience of the application. Actors are lightweight, independent entities that encapsulate state and behavior, processing messages asynchronously. This design allows for clear separation of concerns, where each actor focuses on a specific task or service. For example, some actors may handle user requests, others might manage database interactions, while others coordinate complex workflows. By delegating responsibilities to individual actors, Akka systems achieve high levels of concurrency, scalability, and fault tolerance, as actors can be easily distributed across multiple nodes and can recover independently from failures.
Akka Persistence extends the core capabilities of Akka by introducing durable state management for actors. It allows actors to persist their internal state so that it can be recovered and restored even after a crash or a restart. This is achieved through event sourcing, where state changes are stored as a sequence of events, and optional snapshots that capture the state at a particular point in time for quicker recovery. By using Akka Persistence, developers can build robust applications that maintain state consistency and reliability across distributed systems, making it an ideal choice for applications requiring high availability and resilience.
How to build a persistent Akka application with RavenDB Persistence
In this guide we’ll set up a very simple console application that uses Akka.NET with RavenDB as a persistence layer, so when our app crashes or restarts we will be able to pick up where we left off.
To do this we will create a simple ping pong app that moves a “ball” back and forth between our actors. Our in-memory state which we will persist will be the number of receives we’ve reached so far. The ping pong example is the basic hello world of actor based systems.
Let’s start by creating a new .NET project and adding the Akka.NET package to it
dotnet new console -n PingPong
Add Akka.Persistence.RavenDB.Hosting
package.
This package includes the base Akka.Persistence.RavenDB
which in turn includes the base Akka Nuget. We use Hosting since it provides a quick and easy way to set up our app and its persistence without having to set up a HOCON config file.
We’ll add the appropriate Nuget package to the project using the following command:
dotnet add package Akka.Persistence.RavenDB.Hosting
Open Program.cs
and add two new actor classes:
ReceiverActor
– this one will be responsible for keeping count of the number of times the ball has reached them, and serve it back.PitcherActor
– this will be a “dumb” actor whose entire responsibility will be to serve the ball back.
Once our receiver reaches a certain number of balls received it will set the TaskCompletionSource
and our program will be terminated.
We also added a little delay between pitches to give us some time to view our current score and play and see at what point we shut it down.
public class Ball
{
public override string ToString() => "ball";
}
public class ReceiverActor : ReceiveActor
{
public ReceiverActor(IActorRef actor, long repeat,
TaskCompletionSource<bool> latch)
{
var received = 0L;
Receive<Ball>(m =>
{
received++;
Console.WriteLine($"| {received:000} ...O|\n");
if (received < repeat)
{
Context.System.Scheduler.ScheduleTellOnceCancelable(
TimeSpan.FromSeconds(1), actor, m, Self);
}
else
{
latch.TrySetResult(true);
}
});
}
}
public class PitcherActor : UntypedActor
{
protected override void OnReceive(object message)
{
Console.WriteLine($"|O |");
if (message is Ball)
Sender.Tell(message);
}
}
Then in our Main()
we’ll kickstart our actors by sending a message to the pitcher
and setting the Sender
as our ReceiverActor
. This way it will send the message back to the receiver, which will in turn serve it back, and in this way the game has started.
var host = new HostBuilder()
.ConfigureServices((context, services) =>
{
services.AddAkka("PingPongActorSystem",
(builder, provider) =>
{
builder.WithActors((system, registry, resolver) =>
{
var ts = new TaskCompletionSource<bool>();
long repeats = 50;
var pitcherActor = system.ActorOf<PitcherActor>("pitcher-actor");
var props = new Props(typeof (ReceiverActor), null, pitcherActor, repeats, ts);
var receiverActor = system.ActorOf(props, "receiver-actor");
//serve the first ball
receiverActor.Tell(new Ball(), pitcherActor);
// wait for ts to complete
ts.Task.Wait();
system.Terminate();
});
});
});
var app = host.Build();
await app.RunAsync();
Try running the program and see that count going up.
Currently our state received
is only saved in-memory, which means that if our system crashes we will lose all of our progress so far.
This is where Akka.Persistence.RavenDB
comes in.
Persisting your actor state with RavenDB
We can choose which events we want to persist and save in RavenDB as the game progresses. In the case of a system restart, Akka will fetch all of these events from the RavenDB database and feed them one by one into the persistence actor, eventually reaching the state we left off at.
First we need to set up a RavenDB instance to act as our database where we will persist all of the events.
For production purposes it is recommended to set up an instance in RavenDB Cloud. For this demo app, we will use the public live test instance. Go to the public instance and make sure that a database named PingPong exists there (see here for the guide).
Now let’s tell Akka to use our RavenDB instance for persistence. This is very simple to do with Akka hosting, you can see the additional code to add in the following snippet.
var host = new HostBuilder()
.ConfigureServices((context, services) => {
services.AddAkka("PingPongActorSystem", (builder, provider) => {
/// <NEW CODE>
builder.WithRavenDbPersistence(
urls: new [] { "http://live-test.ravendb.net" },
databaseName: "PingPong",
// add the certificate path here when using
// a cloud instance or a secured on-perm instance
certificatePath: null,
mode: PersistenceMode.Both);
// </NEW CODE>
builder.WithActors((system, registry, resolver) => {
var ts = new TaskCompletionSource<bool>();
long repeats = 50;
var pitcherActor = system.ActorOf<PitcherActor>("pitcher-actor");
var props = new Props(typeof(ReceiverActor), null, pitcherActor, repeats, ts);
var receiverActor = system.ActorOf(props, "receiver-actor");
//serve the first ball
receiverActor.Tell(new Ball(), pitcherActor);
// wait for ts to complete
ts.Task.Wait();
system.Terminate();
});
});
});
var app = host.Build();
await app.RunAsync();
Now that we’re all wired up, we need to make a few changes to our code to work with persistence:
Change ReceiveActor
to a persistent actor by replacing the inherited ReceiveActor
with ReceivePersistentActor
and implementing the required PersistenceId
. The persistent actor also requires us to change any Receive
case into Command
.
// <NEW CODE>
public class ReceiverActor : ReceivePersistentActor
{
public override string PersistenceId => "ping-pong-receiver-actor";
// </NEW CODE>
public ReceiverActor(IActorRef actor, long repeat,
TaskCompletionSource<bool> latch)
{
var received = 0L;
// <NEW CODE>
Command<Ball>(m =>
// </NEW CODE>
{
received++;
Console.WriteLine($"| {received:000} ...O|\n");
if (received < repeat)
{
Context.System.Scheduler.ScheduleTellOnceCancelable(
TimeSpan.FromSeconds(1), actor, m, Self);
}
else
{
latch.TrySetResult(true);
}
});
}
}
We’re not done yet. We still need to tell our actor what it should persist and how it should recover.
Since our current in-memory state keeps count of the score, this is the data we will want to save in our database. Basically we will catalog each event as the number of times that we needed to increment our count.
We simply need to call Persist()
and Akka will save this as an event tied to this PersistenceId
.
Note we are using Persist(long, ..)
which means this event’s payload will be saved as a `long` in the database. In general, this could be any object we choose. For more complex scenarios, you’ll usually save a more meaningful event, of course.
Add this to the code to save every ball pass:
Command<Ball>(m =>
{
// save an event with payload of 1 in the database
// after success, fire the handler function
Persist(1, _ =>
{
received++;
Console.WriteLine($"| {received:000} ...O|\n");
if (received < repeat)
{
Context.System.Scheduler.ScheduleTellOnceCancelable(
TimeSpan.FromSeconds(1), actor, m, Self);
}
else
{
latch.TrySetResult(true);
}
});
});
Finally we will tell the actor what to do when we recover. In our case each event that is fetched from the database represents a ball receive so all we need to do is increase our receive count for every event we get.
Add this above Command<Ball>
:
Recover<long>(m =>
{
received++;
if (received >= repeat)
{
latch.TrySetResult(true);
}
});
We now have a working journal!
Let’s test our app now that it’s persistent.
In our program code, we’ll set repeats
to 6.
Run the program until it shuts down after the sixth serve.
If we open the RavenDB studio and go to `Events` collection, we can see the list of events and our payloads:
Now set repeats
to 10 and run the app again. This time the app will try to reach 10 receives before it shuts down. However, it will recover the first 6 that we already persisted, and only actually run the last 4. This also simulates a recovery of an app after a crash. As we can see in the terminal, we only start going from the last round we played.
Note that at any point in debugging, if we messed around with the code and our `Events` collection is no longer relevant or faithful to the code, we can delete the entire collection and RavenDB will create a fresh one automatically on the next run. Changing the code and running the application while we still have deprecated data in our database might not always work since Akka will be fed messages in an old format or in an order that is no longer possible in our updated code.
Note that deleting the whole collection is obviously something that you should only do in debug. For production usage, you’ll need to handle messages in the old format, or ensure (via the snapshots features we’ll discuss shortly) that no such messages are retained by the time your new code is deployed.
Our app can work fine like this, but we can make it better by adding snapshots as well.
Using Snapshot to reduce recovery time
Snapshots keep a state that represents a point in time. Consider a process that has a lot of back and forth. Each message that the actor receives will persist an event to RavenDB. Recovery time at that point is linear to the number of events we have. Instead of having to go through all events from the beginning of time, we can persist a snapshot, which allows us to “fast-forward” much of the work.
For example, let’s say we choose to save a snapshot for every 10 events. If we have 12 events of ball passes, instead of having 12 events to go through, we will end up with a snapshot that has a payload of 10 and two additional events. When we start our app again it will first load the snapshot, then the last 2 events.
Let’s add snapshot support to our app.
In our actor, we need to tell Akka what to do with the snapshot. Add this to our actor.
Recover<SnapshotOffer>(offer =>
{
received = (long)offer.Snapshot;
});
Now we can tell Akka to persist a snapshot of our state whenever we reach the limit. You can see the new behavior in the following snippet:
Command<Ball>(m =>
{
Persist(1, _ =>
{
received++;
Console.WriteLine($"| {received:000} ...O|\n");
// <NEW CODE>
const int SnapShotInterval = 10;
if (LastSequenceNr % SnapShotInterval == 0 && LastSequenceNr != 0)
{
SaveSnapshot(received);
}
// </NEW CODE>
if (received < repeat)
{
Context.System.Scheduler.ScheduleTellOnceCancelable(
TimeSpan.FromSeconds(1), actor, m, Self);
}
else
{
latch.TrySetResult(true);
}
});
});
All done!
Run the program again. We’ll let our program pass the ball 15 times by setting repeats
to 15 and then check the studio:
We have a new “Snapshots” collection with 1 saved snapshot. Great!
Note that we added a new snapshot, on top of the already created events. The first time we run the program, we need to go through all the persisted events. The next time we’ll run it, we’ll start from the snapshot and be able to skip running through the older events.
Our Events
collection still has the entire history of the game, as you can see:
You can configure Akka to delete the events that are already included in the snapshot, since they are no longer needed. Or you can keep them around if they are interesting to you over the long term. Check out the Akka.NET documentation for more details on the topic.
Now when we restart our program, our snapshot will be sent to Recover<SnapshotOffer>
where we will set our received
internal state to the snapshot’s payload, then load events 11-15. This is a lot better and quicker than recovering all of our events one by one.
This has been a whirlwind tour of how to use Akka Persistence with RavenDB. If you have any questions, feel free to ask them in our GitHub Discussions group.