Stream Query Results
-
RavenDB supports streaming data from the server to the client.
Streaming is useful when processing a large number of results. -
The data streamed can be a result of a dynamic query, a static index query, or just filtered by a prefix.
-
To stream results, use the
Stream
method from theAdvanced
session operations. -
In this page:
Streaming overview
-
Immediate processing:
Neither the client nor the server holds the full response in memory.
Instead, as soon as the server has a single result, it sends it to the client.
Thus, your application can start processing results before the server sends them all. -
No tracking:
The stream results are Not tracked by the session.
Changes made to the resulting entities will not be sent to the server when SaveChanges is called. -
A snapshot of the data:
The stream results are a snapshot of the data at the time when the query is computed by the server.
Results that match the query after it was already processed are Not streamed to the client. -
Query limitations::
-
A streaming query does not wait for indexing by design.
So calling WaitForNonStaleResults is Not supported and will result in an exception. -
Using Include to load a related document to the session in a streaming query is Not supported.
Learn how to stream related documents here below.
-
Stream by query
Stream a dynamic query
// Define a query on a collection
IRavenQueryable<Employee> query = session
.Query<Employee>()
.Where(x => x.FirstName == "Robert");
// Call 'Stream' to execute the query
// Optionally, pass an 'out param' for getting the query stats
IEnumerator<StreamResult<Employee>> streamResults =
session.Advanced.Stream(query, out StreamQueryStatistics streamQueryStats);
// Read from the stream
while (streamResults.MoveNext())
{
// Process the received result
StreamResult<Employee> currentResult = streamResults.Current;
// Get the document from the result
// This entity will Not be tracked by the session
Employee employee = currentResult.Document;
// The currentResult item also provides the following:
var employeeId = currentResult.Id;
var documentMetadata = currentResult.Metadata;
var documentChangeVector = currentResult.ChangeVector;
// Can get info from the stats, i.e. get number of total results
int totalResults = streamQueryStats.TotalResults;
// Get the Auto-Index that was used/created with this dynamic query
string indexUsed = streamQueryStats.IndexName;
}
// Define a query on a collection
IRavenQueryable<Employee> query = asyncSession
.Query<Employee>()
.Where(x => x.FirstName == "Robert");
// Call 'StreamAsync' to execute the query
// Optionally, pass an 'out param' for getting the query stats
await using (IAsyncEnumerator<StreamResult<Employee>> streamResults =
await asyncSession.Advanced.StreamAsync(query, out StreamQueryStatistics streamQueryStats))
{
// Read from the stream
while (await streamResults.MoveNextAsync())
{
// Process the received result
StreamResult<Employee> currentResult = streamResults.Current;
// Get the document from the result
// This entity will Not be tracked by the session
Employee employee = currentResult.Document;
// The currentResult item also provides the following:
var employeeId = currentResult.Id;
var documentMetadata = currentResult.Metadata;
var documentChangeVector = currentResult.ChangeVector;
// Can get info from the stats, i.e. get number of total results
int totalResults = streamQueryStats.TotalResults;
// Get the Auto-Index that was used/created with this dynamic query
string indexUsed = streamQueryStats.IndexName;
}
}
// Define a document query on a collection
IDocumentQuery<Employee> query = session
.Advanced
.DocumentQuery<Employee>()
.WhereEquals(x => x.FirstName, "Robert");
// Call 'Stream' to execute the query
// Optionally, add an out param for getting the query stats
IEnumerator<StreamResult<Employee>> streamResults =
session.Advanced.Stream(query, out StreamQueryStatistics streamQueryStats);
// Read from the stream
while (streamResults.MoveNext())
{
// Process the received result
StreamResult<Employee> currentResult = streamResults.Current;
// Get the document from the result
// This entity will Not be tracked by the session
Employee employee = currentResult.Document;
// The currentResult item also provides the following:
var employeeId = currentResult.Id;
var documentMetadata = currentResult.Metadata;
var documentChangeVector = currentResult.ChangeVector;
// Can get info from the stats, i.e. get number of total results
int totalResults = streamQueryStats.TotalResults;
// Get the Auto-Index that was used/created with this dynamic query
string indexUsed = streamQueryStats.IndexName;
}
// Define a document query on a collection
IAsyncDocumentQuery<Employee> query = asyncSession
.Advanced
.AsyncDocumentQuery<Employee>()
.WhereEquals(x => x.FirstName, "Robert");
// Call 'StreamAsync' to execute the query
// Optionally, add an out param for getting the query stats
await using (IAsyncEnumerator<StreamResult<Employee>> streamResults =
await asyncSession.Advanced.StreamAsync(query, out StreamQueryStatistics streamQueryStats))
{
// Read from the stream
while (await streamResults.MoveNextAsync())
{
// Process the received result
StreamResult<Employee> currentResult = streamResults.Current;
// Get the document from the result
// This entity will Not be tracked by the session
Employee employee = currentResult.Document;
// The currentResult item also provides the following:
var employeeId = currentResult.Id;
var documentMetadata = currentResult.Metadata;
var documentChangeVector = currentResult.ChangeVector;
// Can get info from the stats, i.e. get number of total results
int totalResults = streamQueryStats.TotalResults;
// Get the Auto-Index that was used/created with this dynamic query
string indexUsed = streamQueryStats.IndexName;
}
}
Stream a dynamic raw query
// Define a raw query using RQL
IRawDocumentQuery<Employee> query = session
.Advanced
.RawQuery<Employee>("from Employees where FirstName = 'Robert'");
// Call 'Stream' to execute the query
IEnumerator<StreamResult<Employee>> streamResults = session.Advanced.Stream(query);
while (streamResults.MoveNext())
{
StreamResult<Employee> currentResult = streamResults.Current;
Employee employee = streamResults.Current.Document;
}
// Define a raw query using RQL
IAsyncRawDocumentQuery<Employee> query = asyncSession
.Advanced
.AsyncRawQuery<Employee>("from Employees where FirstName = 'Robert'");
// Call 'StreamAsync' to execute the query
await using (IAsyncEnumerator<StreamResult<Employee>> streamResults =
await asyncSession.Advanced.StreamAsync(query))
{
while (await streamResults.MoveNextAsync())
{
StreamResult<Employee> currentResult = streamResults.Current;
Employee employee = streamResults.Current.Document;
}
}
Stream a projected query
// Define a query with projected results
// Each query result is not an Emplyee document but an entity of type 'NameProjection'.
IRavenQueryable<NameProjection> query = session
.Query<Employee>()
.ProjectInto<NameProjection>();
// Call 'Stream' to execute the query
IEnumerator<StreamResult<NameProjection>> streamResults = session.Advanced.Stream(query);
while (streamResults.MoveNext())
{
StreamResult<NameProjection> currentResult = streamResults.Current;
NameProjection employeeName = streamResults.Current.Document;
}
// Define a query with projected results
// Each query result is not an Employee document but an entity of type 'NameProjection'.
IRavenQueryable<NameProjection> query = asyncSession
.Query<Employee>()
.ProjectInto<NameProjection>();
// Call 'StreamAsync' to execute the query
await using (IAsyncEnumerator<StreamResult<NameProjection>> streamResults =
await asyncSession.Advanced.StreamAsync(query))
{
while (await streamResults.MoveNextAsync())
{
StreamResult<NameProjection> currentResult = streamResults.Current;
NameProjection employeeName = streamResults.Current.Document;
}
}
// Each query result will be of this class type
public class NameProjection
{
public string FirstName { get; set; }
public string LastName { get; set; }
}
Stream an index query
// Define a query on an index
IQueryable<Employee> query = session.Query<Employee, Employees_ByFirstName>()
.Where(employee => employee.FirstName == "Robert");
// Call 'Stream' to execute the query
IEnumerator<StreamResult<Employee>> streamResults = session.Advanced.Stream(query);
while (streamResults.MoveNext())
{
StreamResult<Employee> currentResult = streamResults.Current;
Employee employee = streamResults.Current.Document;
}
// Define a query on an index
IQueryable<Employee> query = asyncSession.Query<Employee, Employees_ByFirstName>()
.Where(employee => employee.FirstName == "Robert");
// Call 'StreamAsync' to execute the query
await using (IAsyncEnumerator<StreamResult<Employee>> streamResults =
await asyncSession.Advanced.StreamAsync(query))
{
while (await streamResults.MoveNextAsync())
{
StreamResult<Employee> currentResult = streamResults.Current;
Employee employee = streamResults.Current.Document;
}
}
// The index:
public class Employees_ByFirstName : AbstractIndexCreationTask<Employee>
{
public Employees_ByFirstName()
{
Map = employees => from employee in employees
select new
{
FirstName = employee.FirstName
};
}
}
Stream related documents
Why streaming query results does not support 'include':
- A document can reference related documents.
- An Include clause in a non-streamed query loads these related documents to the session
so that they can be accessed without an additional query to the server. - Those included documents are sent to the client at the end of the query results.
This does not mesh well with streaming, which is designed to allow transferring massive amounts of data,
possibly over a significant amount of time.
How to stream related documents:
- Instead of using include, define the query so that it will return a projection.
- The projected query results will not be just the documents from the queried collection.
Instead, each result will be an entity containing the related document entities in addition to the original queried document. - On the client side, you need to define a class that matches the projected query result.
Example:
- The below example uses RawQuery.
However, the same logic can be applied to a Query, DocumentQuery, or when querying an index. - Note:
The projected class in the example contains the full related documents.
However, you can project just the needed properties from the related documents.
// Define a query with a 'select' clause to project the results.
// The related Company & Employee documents are 'loaded',
// and returned in the projection together with the Order document itself.
// Each query result is not an Order document but an entity of type 'AllDocsProjection'.
IRawDocumentQuery<AllDocsProjection> query = session
.Advanced
.RawQuery<AllDocsProjection>(@"from Orders as o
where o.ShipTo.City = 'London'
load o.Company as c, o.Employee as e
select {
Order: o,
Company: c,
Employee: e
}");
// Call 'Stream' to execute the query
IEnumerator<StreamResult<AllDocsProjection>> streamResults = session.Advanced.Stream(query);
while (streamResults.MoveNext())
{
StreamResult<AllDocsProjection> currentResult = streamResults.Current;
AllDocsProjection projection = streamResults.Current.Document;
Order theOrderDoc = projection.Order;
Company theRelatedCompanyDoc = projection.Company;
Employee theRelatedEmployeeDoc = projection.Employee;
}
// Define a query with a 'select' clause to project the results.
// The related Company & Employee documents are 'loaded',
// and returned in the projection together with the Order document itself.
// Each query result is not an Order document but an entity of type 'AllDocsProjection'.
IAsyncRawDocumentQuery<AllDocsProjection> query = asyncSession
.Advanced
.AsyncRawQuery<AllDocsProjection>(@"from Orders as o
where o.ShipTo.City = 'London'
load o.Company as c, o.Employee as e
select {
Order: o,
Company: c,
Employee: e
}");
// Call 'StreamAsync' to execute the query
await using (IAsyncEnumerator<StreamResult<AllDocsProjection>> streamResults =
await asyncSession.Advanced.StreamAsync(query))
{
while (await streamResults.MoveNextAsync())
{
StreamResult<AllDocsProjection> currentResult = streamResults.Current;
AllDocsProjection projection = streamResults.Current.Document;
Order theOrderDoc = projection.Order;
Company theRelatedCompanyDoc = projection.Company;
Employee theRelatedEmployeeDoc = projection.Employee;
}
}
// Each query result will be of this class type
public class AllDocsProjection
{
public Order Order { get; set; }
public Employee Employee { get; set; }
public Company Company { get; set; }
}
By query syntax
// Stream by query:
IEnumerator<StreamResult<T>> Stream<T>(IQueryable<T> query);
IEnumerator<StreamResult<T>> Stream<T>(IQueryable<T> query, out StreamQueryStatistics streamQueryStats);
IEnumerator<StreamResult<T>> Stream<T>(IDocumentQuery<T> query);
IEnumerator<StreamResult<T>> Stream<T>(IDocumentQuery<T> query, out StreamQueryStatistics streamQueryStats);
IEnumerator<StreamResult<T>> Stream<T>(IRawDocumentQuery<T> query);
IEnumerator<StreamResult<T>> Stream<T>(IRawDocumentQuery<T> query, out StreamQueryStatistics streamQueryStats);
Parameters | type | description |
---|---|---|
query | IQueryable, IDocumentQuery or IRawDocumentQuery | The query for which to stream results |
out streamQueryStats |
StreamQueryStatistics | Information about performed query |
Return Value | |
---|---|
IEnumerator<StreamResult<T>> | Enumerator with resulting entities |
Stream by prefix
Stream results by prefix
- Streamed data can also be filtered by an ID prefix and by other filtering options, see syntax below.
- Note: No auto-index is created when streaming results by a prefix.
string idPrefix = "Orders/";
string matches = "*25-A|77?-A";
// Filter streamed results by the passing 'prefix' and an optional 'matches' string
IEnumerator<StreamResult<Order>> streamResults = session.Advanced.Stream<Order>(idPrefix, matches);
while (streamResults.MoveNext())
{
// Documents that will be returned are only those matching the following:
// * Document ID starts with "Orders/"
// * The rest of the ID (after prefix) must match the 'matches' string
// e.g. "Orders/325-A" or Orders/772-A", etc.
StreamResult<Order> currentResult = streamResults.Current;
Order order = currentResult.Document;
}
string idPrefix = "Orders/";
string matches = "*25-A|77?-A";
// Filter streamed results by the passing 'prefix' and an optional 'matches' string
await using (IAsyncEnumerator<StreamResult<Order>> streamResults =
await asyncSession.Advanced.StreamAsync<Order>(idPrefix, matches))
{
while (await streamResults.MoveNextAsync())
{
// Documents that will be returned are only those matching the following:
// * Document ID starts with "Orders/"
// * The rest of the ID (after prefix) must match the 'matches' string
// e.g. "Orders/325-A" or Orders/772-A", etc.
StreamResult<Order> currentResult = streamResults.Current;
Order order = currentResult.Document;
}
}
By prefix syntax
// Stream by prefix:
IEnumerator<StreamResult<T>> Stream<T>(string startsWith, string matches = null,
int start = 0, int pageSize = int.MaxValue, string startAfter = null);
Parameters | type | description |
---|---|---|
startsWith | string |
Stream documents with this ID prefix |
matches | string |
Filter the ID part that comes after the specified prefix. Use '?' for any character, '*' any characters. Use '|' to separate rules. |
start | int |
Number of documents to skip |
pageSize | int |
Maximum number of documents to retrieve |
startAfter | string |
Skip fetching documents until this ID is found. Only return documents after this ID (default: null). |
Return Value | |
---|---|
IEnumerator<StreamResult<T>> | Enumerator with resulting entities |