Events and Snapshots


  • Akka.Persistence provides two primary methods to persist actor state: Event sourcing and Snapshots.

  • With event sourcing, each state change is stored as a separate event, creating a sequence of events that represents the actor’s history. Snapshots, on the other hand, capture the actor’s state at specific points in time.

  • Upon actor restart, both events and snapshots can be replayed to restore the actor's internal state,
    with snapshots allowing for quicker recovery by avoiding the need to replay all past events.

  • The stored events can be queried via Akka's query interface.
    Learn more about that in Queries.

  • To learn how to configure the events journal and the snapshot-store via the Akka.Persistence.RavenDB plugin, see Integrating with Akka.NET persistence.

  • In this page:


Storing events

Events:
Persistent actors can write messages, called events, into the configured RavenDB database,
which serves as the events journal.

The Events collection:
Each event is stored as a document in the Events collection in append-only mode.

The Event document:
Each event document includes the following fields, among others:

  • id - The event document id, composed of <Events/persistentId/sequenceNr-with-leading-zeros>
  • payload - The actual message content or event data.
  • persistentId - The unique identifier of the actor that persisted this event.
  • sequenceNr - The sequence number for the event, indicating its position in the sequence of events for a particular actor.
    Serves as a unique, gap-less identifier that helps maintain the correct order and consistency of the actor's state.

Replaying events:
Maintaining the event documents in chronological order (based on the sequenceNr field)
enables retrieval and replay in the correct sequence when an actor restarts.

Storing snapshots

Snapshots:

  • Snapshots capture the current state of an actor at a specific point in time,
    representing all the data the actor has accumulated or processed up to that moment.

  • Persistent actors can store these snapshots in the configured RavenDB database,
    which serves as the snapshot-store.

  • After a snapshot is successfully persisted, events can be deleted from the events journal to free up space.

The Snapshots collection:
Each snapshot is stored as a document in the Snapshots collection in append-only mode.

The Snapshot document:
Each snapshot document includes the following fields, among others:

  • id - The snapshot document id, composed of <Snapshots/persistentId/sequenceNr-with-leading-zeros>
  • payload - The actor's state at the time the snapshot was taken.
  • persistentId - The unique identifier of the actor that created the snapshot.
  • sequenceNr - The sequence number indicating the position of the snapshot in the sequence of events.
    Serves as a unique, gap-less identifier that helps maintain the correct order and consistency of the actor's state.

Replaying snapshots:

  • When an actor restarts, instead of replaying the entire event history from the events journal,
    which can be inefficient as this journal grows, the actor's state can be restored from a snapshot
    and then replay only the events that occurred after that snapshot.

  • Replaying snapshots significantly accelerates recovery, reduces network transmission,
    and lowers both actor event replay time and CPU usage.

Storing guidelines

  • The RavenDB plugin designates the Events and Snapshots collections for storing Akka’s data.
    While it’s technically possible to store documents from other sources in these collections,
    you shouldn't do so.

  • The Events and Snapshots collections should be reserved exclusively for Akka’s storage needs.
    It is recommended to place these collections in a separate, designated database.

Global consistency

The consistency requirement:

  • Consistency refers to the property that ensures data is uniform and accurate across all database replicas at a given point in time. In a distributed system, Akka.NET Persistence relies on consistency to accurately restore an actor’s state from its events during recovery, regardless of which node is contacted.

  • Events must be applied (replayed) in the exact order they were generated,
    so consistency is crucial to ensure that no events are missed or processed out of order.

Cluster-wide transactions:

  • RavenDB is a distributed database, allowing writes, reads, and queries to target different nodes across the cluster.

  • To prioritize consistency over availability, the RavenDB plugin uses a cluster-wide transaction for storing events and snapshot documents. This ensures that persisted data is consistently applied across all database instances in the cluster, preventing conflicts and guaranteeing that restoring to the latest state reflects the correct event sequence, as required by Akka.

  • Note that cluster consensus is required for a cluster-wide transaction to execute. This means that a majority of nodes in the database group must be up and connected in order to persist new events & snapshots.

Atomic-guards usage:

  • As with every document created using a cluster-wide transaction in RavenDB, the server creates an Atomic-Guard for each event or snapshot document that is stored to prevent concurrent modifications.

  • The atomic-guard is particularly beneficial in scenarios where an actor recovers its events and snapshots from a node that failed, came back up, but has not yet received the complete replication information from the other nodes in the database group. In such cases, the actor’s state might not be fully up-to-date.

  • If the actor attempts to write a new event using a sequenceNr that already exists, the Atomic-Guard will prevent this action from succeeding. Upon this failure, the actor will restart itself. If, by that time, the node has received all the missing information, the actor will now recover with a fully updated state.

Sample application

The following is a sample application that stores events and snapshots in a RavenDB database.

static void Main(string[] args)
{
    var host = new HostBuilder().ConfigureServices((context, services) =>
    {
        // Configure the RavenDB plugin using Hosting:
        //============================================
        
        services.AddAkka("SalesActorSystem", (builder, provider) =>
        {
            builder.WithRavenDbPersistence(
                urls: new[] { "http://localhost:8080" },
                databaseName: "AkkaStorage_PhoneSales",
                // Use both akka.persistence.journal and akka.persistence.snapshot-store
                mode: PersistenceMode.Both);

            builder.WithActors((system, registry) =>
            {
                var taskCompletion = new TaskCompletionSource<bool>();
                long expectedProfit = 1_500;
                
                // Create actors:
                // ==============
                
                var salesActor = system.ActorOf(Props.Create(() => 
                    new SalesActor(expectedProfit, taskCompletion)), "sales-actor");
                
                var salesSimulatorActor = system.ActorOf(Props.Create(() => 
                    new SalesSimulatorActor(salesActor)), "sales-simulator-actor");
                
                // Exit app when sales reach the 'expectedProfit'
                taskCompletion.Task.Wait();
                system.Terminate();
            });
        });
    });
    
    var app = host.Build();
    app.Run();
}
public class SalesActor: ReceivePersistentActor
{
    // The unique actor id
    public override string PersistenceId => "sales-actor";
    
    // The state that will be persisted in SNAPSHOTS
    private SalesActorState _state;
    
    public SalesActor(long expectedProfit, TaskCompletionSource<bool> taskCompletion)
    {
        _state = new SalesActorState
        {
            totalSales = 0
        }; 
        
        // Process a sale:
        Command<Sale>(saleInfo =>
        {
            if (_state.totalSales < expectedProfit)
            {
                // Persist an EVENT to RavenDB
                // ===========================
                
                // The handler function is executed after the EVENT was saved successfully
                Persist(saleInfo, _ =>
                {
                    // Update the latest state in the actor
                    _state.totalSales += saleInfo.Price;

                    ConsoleHelper.WriteToConsole(ConsoleColor.Black,
                        $"Sale was persisted. Phone brand: {saleInfo.Brand}. Price: {saleInfo.Price}");

                    // Store a SNAPSHOT every 5 sale events
                    // ====================================
                    
                    if (LastSequenceNr != 0 && LastSequenceNr % 5 == 0)
                    {
                        SaveSnapshot(_state.totalSales);
                    }
                });
            }
            else if (!taskCompletion.Task.IsCompleted)
            {
                Sender.Tell(new StopSimulate());
                
                ConsoleHelper.WriteToConsole(ConsoleColor.DarkMagenta,
                    $"Sale not persisted: " +
                    $"Total sales have already reached the expected profit of {expectedProfit}");
                
                ConsoleHelper.WriteToConsole(ConsoleColor.DarkMagenta,
                    _state.ToString());
                
                taskCompletion.TrySetResult(true);
            }
        });
        
        // Handle a SNAPSHOT success msg
        Command<SaveSnapshotSuccess>(success =>
        {
            ConsoleHelper.WriteToConsole(ConsoleColor.Blue,
                $"Snapshot saved successfully at sequence number {success.Metadata.SequenceNr}");
            
            // Optionally, delete old snapshots or events here if needed
            // DeleteMessages(success.Metadata.SequenceNr);
        });
        
        // Recover an EVENT
        Recover<Sale>(saleInfo =>
        {
            _state.totalSales += saleInfo.Price;
            
            ConsoleHelper.WriteToConsole(ConsoleColor.DarkGreen,
                $"Event was recovered. Price: {saleInfo.Price}");
        });
        
        // Recover a SNAPSHOT
        Recover<SnapshotOffer>(offer =>
        {
            var salesFromSnapshot = (long) offer.Snapshot;
            _state.totalSales = salesFromSnapshot;
            
            ConsoleHelper.WriteToConsole(ConsoleColor.DarkGreen,
                $"Snapshot was recovered. Total sales from snapshot: {salesFromSnapshot}");
        });
    }
}
public class SalesSimulatorActor : ReceiveActor
{
    private readonly IActorRef _salesActor;
    private ICancelable scheduler;

    public SalesSimulatorActor(IActorRef salesActor)
    {
        _salesActor = salesActor;

        // Schedule the first sale simulation immediately and then every 2 seconds:
        scheduler = Context.System.Scheduler.ScheduleTellRepeatedlyCancelable(TimeSpan.Zero, 
            TimeSpan.FromSeconds(2), Self, new StartSimulate(), Self);
        
        Receive<StartSimulate>(HandleStart);
        Receive<StopSimulate>(HandleStop);
    }

    private void HandleStart(StartSimulate message)
    {
        ConsoleHelper.WriteToConsole(ConsoleColor.Black,
            $"About to simulate a sale...");

        Random random = new Random();
        string[] products = { "Apple", "Google", "Nokia", "Xiaomi", "Huawei" };

        var randomBrand = products[random.Next(products.Length)];
        var randomPrice = random.Next(1, 6) * 100; // 100, 200, 300, 400, or 500

        var nextSale = new Sale(randomPrice, randomBrand);
        _salesActor.Tell(nextSale);
    }
    
    private void HandleStop(StopSimulate message)
    {
        scheduler.Cancel();
        ConsoleHelper.WriteToConsole(ConsoleColor.DarkRed,
            "Simulation stopped");
    }
}
// A sale EVENT to be persisted 
public class Sale(long pricePaid, string productBrand)
{
    public long Price { get; set; } = pricePaid;
    public string Brand { get; set; } = productBrand;
}

// MESSAGES for the simulator actor
public class StartSimulate { }
public class StopSimulate { }

// Internal state that will be persisted in a SNAPSHOT 
class SalesActorState
{
    public long totalSales { get; set; }

    public override string ToString()
    {
        return $"[SalesActorState: Total sales are {totalSales}]";
    }
}

public class ConsoleHelper
{
    public static void WriteToConsole(ConsoleColor color, string text)
    {
        Console.ForegroundColor = color;
        Console.WriteLine(text);
        Console.ResetColor();
    }
}

The documents created in the Events and Snapshots collections are visible in the Documents View in the Studio:

The Events collection

The events collection

The Events collection

  1. The Events collection.
  2. The event document ID in the format: <Events/persistentId/sequenceNr-with-leading-zeros>
  3. The unique ID of the actor that persisted these events.
  4. The unique sequence number of the event.
  5. The data that was stored for the event.
The payload

The event payload

The data stored for each event is an instance of the Sale class, containing Price and Brand fields.


The Snapshots collection

The snapshots collection

The Snapshots collection

  1. The Snapshots collection.
  2. The snapshot document ID in the format: <Snapshots/persistentId/sequenceNr-with-leading-zeros>
  3. The unique ID of the actor that persisted this snapshot.
  4. The sequence number of the event after which the snapshot was stored, 5 in this case.
  5. The data stored in this snapshot represents the actor's state immediately after event 5 was stored.
    In this example, it reflects the accumulated sales profit made after the first 5 sale events.