Queries
-
Akka.Persistence.Query comes with several stream-based query interfaces for querying persisted data.
This interface abstracts the underlying database, allowing your application to switch persistence providers without requiring changes to the query code. -
The RavenDB persistence plugin fully supports all of Akka's query interfaces.
Just includeAkka.Persistence.RavenDb.Query
in your application. -
In this page:
Interface types
Each query interface comes in two forms to allow flexible querying based on whether you need
real-time updates (continuous) or a snapshot of the current state (current).
-
Continuous Query (e.g.,
EventsByPersistenceId
):
This type of query continuously streams data as it is persisted.
It starts from a specified offset (or from the beginning if no offset is provided)
and keeps the stream open to deliver new data as it is added. -
Current Query (e.g.,
CurrentEventsByPersistenceId
):
This type of query retrieves only the data available up to the point of the query.
Once all current data is fetched, the stream is completed.
Data that is persisted after the query is completed is Not included in the stream.
Supported interfaces
IPersistenceIdsQuery & ICurrentPersistenceIdsQuery
Use these methods to retrieve the PersistenceIds of ALL actors that have persisted events to the journal store:
PersistenceIds()
The stream does Not complete when it reaches the end of the PersistenceIds list that currently exists in the journal store.
Instead, it continues to push new PersistenceIds as they are added.
CurrentPersistenceIds()
The stream is completed immediately when it reaches the end of the result set.
PersistenceIds that are created after the query is completed are Not included in the stream.
// Obtain the RavenDB read journal
// ===============================
RavenDbReadJournal readJournal = PersistenceQuery
.Get(system) // system is your 'ActorSystem' param
.ReadJournalFor<RavenDbReadJournal>(RavenDbReadJournal.Identifier);
// Issue query 'CurrentPersistenceIds' to the journal
// ==================================================
Source<string, NotUsed> allPersistenceIds = readJournal.CurrentPersistenceIds();
// The materializer handles data flow from the persistence storage through the query pipeline
// ==========================================================================================
ActorMaterializer materializer = system.Materializer();
// Execute the query and consume the results
// =========================================
allPersistenceIds.RunForeach(persistenceId =>
{
Console.WriteLine($"ActorID: {persistenceId}");
}, materializer).Wait();
Syntax:
public Source<string, NotUsed> PersistenceIds()
public Source<string, NotUsed> CurrentPersistenceIds()
IEventsByPersistenceIdQuery & ICurrentEventsByPersistenceIdQuery
-
Use the methods below to retrieve events that have been persisted by a specific actor.
-
The returned event stream is ordered by the sequence numbers of the events.
EventsByPersistenceId()
The stream does Not complete when it reaches the end of the currently stored events.
Instead, it continues to push new events as they are persisted.
CurrentEventsByPersistenceId()
The stream is completed immediately when it reaches the end of the result set.
Events that are stored after the query is completed are Not included in the event stream.
RavenDbReadJournal readJournal = PersistenceQuery
.Get(system)
.ReadJournalFor<RavenDbReadJournal>(RavenDbReadJournal.Identifier);
// Issue query 'CurrentEventsByPersistenceId'
Source<EventEnvelope, NotUsed> eventsSource = readJournal
.CurrentEventsByPersistenceId("sales-actor", 0L, long.MaxValue);
ActorMaterializer materializer = system.Materializer();
eventsSource.RunForeach(envelope =>
{
var saleEvent = (Sale)envelope.Event;
Console.WriteLine($"Sale Event - Brand: {saleEvent.Brand}, Price: {saleEvent.Price}");
}, materializer).Wait();
Syntax:
public Source<EventEnvelope, NotUsed> EventsByPersistenceId(string persistenceId,
long fromSequenceNr,
long toSequenceNr)
public Source<EventEnvelope, NotUsed> CurrentEventsByPersistenceId(string persistenceId,
long fromSequenceNr,
long toSequenceNr)
Parameter | Type | Description |
---|---|---|
persistenceId | string |
The actor's persistence ID for which to retrieve events. |
fromSequenceNr | long |
Retrieve events from this sequenceNr. |
toSequenceNr | long |
Retrieve events up to this sequenceNr. Use 0L and long.MaxValue respectively to retrieve all events. |
IEventsByTagQuery & ICurrentEventsByTagQuery
-
In Akka.Persistence, you can add one or more string tags to events.
-
Use the methods below to retrieve events that have a specific tag.
The query will be applied to all events persisted by all actors.
Results will include events with the specified tag, regardless of the PersistenceId they are associated with. -
You can specify the change-vector of an event document as the offset to determine where in the event stream you want to start querying.
- In RavenDB, a change-vector is a unique identifier that represents the version of a document (an event in this case)
across different nodes in a distributed database. - The change-vector of a document can be obtained from the Properties pane in the Document View in the Studio.
- In RavenDB, a change-vector is a unique identifier that represents the version of a document (an event in this case)
-
The returned event stream is ordered by the change-vector value of the event documents.
EventsByTagQuery()
The stream does Not complete when it reaches the end of the currently stored events.
Instead, it continues to push new events as they are persisted.
CurrentEventsByTagQuery()
The stream is completed immediately when it reaches the end of the result set.
Events that are stored after the query is completed are Not included in the event stream.
RavenDbReadJournal _readJournal = PersistenceQuery.Get(system)
.ReadJournalFor<RavenDbReadJournal>(RavenDbReadJournal.Identifier);
// Define an offset after which to return results.
// See the available offset options in the syntax below..
ChangeVectorOffset cvOffset =
new ChangeVectorOffset("RAFT:1-hJ9jo4rRBEKs/kqNXV107Q TRXN:1169-5LEbeyPG40eQiq6fnnCthA");
// Issue query 'CurrentEventsByTag'
var eventsSource = _readJournal.CurrentEventsByTag("some-tag", cvOffset);
ActorMaterializer materializer = system.Materializer();
eventsSource.RunForeach(envelope =>
{
var saleEvent = (Sale)envelope.Event;
Console.WriteLine($"Sale Event - Brand: {saleEvent.Brand}, Price: {saleEvent.Price}");
}, materializer).Wait();
Syntax:
public Source<EventEnvelope, NotUsed> EventsByTag(string tag, Offset offset)
public Source<EventEnvelope, NotUsed> CurrentEventsByTag(string tag, Offset offset)
Parameter | Type | Description |
---|---|---|
tag | string |
Retrieve only events that contain this tag. |
offset | null |
Retrieve all events from the beginning, no offset is applied. |
offset | Offset.NoOffset |
Retrieve all events from the beginning, no offset is applied. |
offset | Offset.Sequence(0) |
Retrieve all events from the beginning, no offset is applied. |
offset | ChangeVectorOffset |
Provide a change-vector to retrieve events starting after this point. |
Note:
Offset.TimeBasedUuid
is not supported.
Offset.Sequence(x)
where x is > 0 is not supported.
IAllEventsQuery & ICurrentAllEventsQuery
-
Use the methods below to retrieve all events regardless of which PersistenceId they are associated with.
-
The returned event stream is ordered by the change-vector value of the event documents.
AllEvents()
The stream does Not complete when it reaches the end of the currently stored events.
Instead, it continues to push new events as they are persisted.
CurrentAllEvents()
The stream is completed immediately when it reaches the end of the result set.
Events that are stored after the query is completed are Not included in the event stream.
RavenDbReadJournal readJournal = PersistenceQuery.Get(system)
.ReadJournalFor<RavenDbReadJournal>(RavenDbReadJournal.Identifier);
// Issue query 'CurrentAllEvents'
var eventsSource = readJournal.CurrentAllEvents(Offset.NoOffset());
ActorMaterializer materializer = system.Materializer();
eventsSource.RunForeach(envelope =>
{
var saleEvent = (Sale)envelope.Event;
Console.WriteLine($"Sale Event - Brand: {saleEvent.Brand}, Price: {saleEvent.Price}");
}, materializer).Wait();
Syntax:
public Source<EventEnvelope, NotUsed> AllEvents(Offset offset)
public Source<EventEnvelope, NotUsed> CurrentAllEvents(Offset offset)
The available options for the offset
parameter are the same as those listed
for the EventsByTag
& CurrentEventsByTag
methods above.
Inner implementation details
Indexes
To support the above queries and optimize for fast data retrieval,
the RavenDB plugin automatically creates the following internal static-indexes upon instantiation of RavenDbReadJournal
:
ActorsByChangeVector
EventsByTagAndChangeVector
Additional collections
In addition to the Events & Snapshots collections, which contain the persisted data,
the RavenDB plugin creates the following collections to keep track of actors and event metadata:
-
UniqueActors
This collection stores a document for each unique actor that has persisted data.
Each document includes the actor's PersistenceId. -
EventMetadatas
This collection stores a document for each unique actor that has persisted data.
Each document includes the latest sequence number of the most recent event persisted by the actor.
Streaming queries
The RavenDb plugin implements the above queries as streaming queries.
You can monitor each query sent from your client to the RavenDB server in the Traffic Watch view in the Studio.
Navigate to Traffic Watch view
- Navigate to Manage Server > Traffic Watch.
- Select Streams from the HTTP types dropdown.
View queries in Traffic Watch