Stream Query Results



Streaming overview

  • Immediate processing:
    Neither the client nor the server holds the full response in memory.
    Instead, as soon as the server has a single result, it sends it to the client.
    Thus, your application can start processing results before the server sends them all.

  • No tracking:
    The stream results are Not tracked by the session.
    Changes made to the resulting entities will not be sent to the server when SaveChanges is called.

  • A snapshot of the data:
    The stream results are a snapshot of the data at the time when the query is computed by the server.
    Results that match the query after it was already processed are Not streamed to the client.

  • Query limitations::

    • A streaming query does not wait for indexing by design.
      So calling WaitForNonStaleResults is Not supported and will result in an exception.

    • Using Include to load a related document to the session in a streaming query is Not supported.
      Learn how to stream related documents here below.

Stream by query

Stream a dynamic query

// Define a query on a collection
IRavenQueryable<Employee> query = session
    .Query<Employee>()
    .Where(x => x.FirstName == "Robert");

// Call 'Stream' to execute the query
// Optionally, pass an 'out param' for getting the query stats
IEnumerator<StreamResult<Employee>> streamResults = 
    session.Advanced.Stream(query, out StreamQueryStatistics streamQueryStats);

// Read from the stream
while (streamResults.MoveNext())
{
    // Process the received result
    StreamResult<Employee> currentResult = streamResults.Current;
    
    // Get the document from the result
    // This entity will Not be tracked by the session
    Employee employee = currentResult.Document;
    
    // The currentResult item also provides the following:
    var employeeId  = currentResult.Id;
    var documentMetadata = currentResult.Metadata;
    var documentChangeVector = currentResult.ChangeVector;

    // Can get info from the stats, i.e. get number of total results
    int totalResults = streamQueryStats.TotalResults;
    // Get the Auto-Index that was used/created with this dynamic query
    string indexUsed = streamQueryStats.IndexName;
}
// Define a query on a collection
IRavenQueryable<Employee> query = asyncSession
    .Query<Employee>()
    .Where(x => x.FirstName == "Robert");

// Call 'StreamAsync' to execute the query
// Optionally, pass an 'out param' for getting the query stats
await using (IAsyncEnumerator<StreamResult<Employee>> streamResults = 
             await asyncSession.Advanced.StreamAsync(query, out StreamQueryStatistics streamQueryStats))
{
    // Read from the stream
    while (await streamResults.MoveNextAsync())
    {
        // Process the received result
        StreamResult<Employee> currentResult = streamResults.Current;
        
        // Get the document from the result
        // This entity will Not be tracked by the session
        Employee employee = currentResult.Document;
        
        // The currentResult item also provides the following:
        var employeeId  = currentResult.Id;
        var documentMetadata = currentResult.Metadata;
        var documentChangeVector = currentResult.ChangeVector;
        
        // Can get info from the stats, i.e. get number of total results
        int totalResults = streamQueryStats.TotalResults;
        // Get the Auto-Index that was used/created with this dynamic query
        string indexUsed = streamQueryStats.IndexName;
    }
}
// Define a document query on a collection
IDocumentQuery<Employee> query = session
    .Advanced
    .DocumentQuery<Employee>()
    .WhereEquals(x => x.FirstName, "Robert");

// Call 'Stream' to execute the query
// Optionally, add an out param for getting the query stats
IEnumerator<StreamResult<Employee>> streamResults = 
    session.Advanced.Stream(query, out StreamQueryStatistics streamQueryStats);

// Read from the stream
while (streamResults.MoveNext())
{
    // Process the received result
    StreamResult<Employee> currentResult = streamResults.Current;
    
    // Get the document from the result
    // This entity will Not be tracked by the session
    Employee employee = currentResult.Document;
        
    // The currentResult item also provides the following:
    var employeeId  = currentResult.Id;
    var documentMetadata = currentResult.Metadata;
    var documentChangeVector = currentResult.ChangeVector;
    
    // Can get info from the stats, i.e. get number of total results
    int totalResults = streamQueryStats.TotalResults;
    // Get the Auto-Index that was used/created with this dynamic query
    string indexUsed = streamQueryStats.IndexName;
}
// Define a document query on a collection
IAsyncDocumentQuery<Employee> query = asyncSession
    .Advanced
    .AsyncDocumentQuery<Employee>()
    .WhereEquals(x => x.FirstName, "Robert");

// Call 'StreamAsync' to execute the query
// Optionally, add an out param for getting the query stats
await using (IAsyncEnumerator<StreamResult<Employee>> streamResults =
             await asyncSession.Advanced.StreamAsync(query, out StreamQueryStatistics streamQueryStats))
{
    // Read from the stream
    while (await streamResults.MoveNextAsync())
    {
        // Process the received result
        StreamResult<Employee> currentResult = streamResults.Current;
        
        // Get the document from the result
        // This entity will Not be tracked by the session
        Employee employee = currentResult.Document;
        
        // The currentResult item also provides the following:
        var employeeId  = currentResult.Id;
        var documentMetadata = currentResult.Metadata;
        var documentChangeVector = currentResult.ChangeVector;
        
        // Can get info from the stats, i.e. get number of total results
        int totalResults = streamQueryStats.TotalResults;
        // Get the Auto-Index that was used/created with this dynamic query
        string indexUsed = streamQueryStats.IndexName;
    }
}

Stream a dynamic raw query

// Define a raw query using RQL
IRawDocumentQuery<Employee> query = session
    .Advanced
    .RawQuery<Employee>("from Employees where FirstName = 'Robert'");

// Call 'Stream' to execute the query
IEnumerator<StreamResult<Employee>> streamResults = session.Advanced.Stream(query);

while (streamResults.MoveNext())
{
    StreamResult<Employee> currentResult = streamResults.Current;
    Employee employee = streamResults.Current.Document;
}
// Define a raw query using RQL
IAsyncRawDocumentQuery<Employee> query = asyncSession
    .Advanced
    .AsyncRawQuery<Employee>("from Employees where FirstName = 'Robert'");

// Call 'StreamAsync' to execute the query
await using (IAsyncEnumerator<StreamResult<Employee>> streamResults =
             await asyncSession.Advanced.StreamAsync(query))
{
    while (await streamResults.MoveNextAsync())
    {
        StreamResult<Employee> currentResult = streamResults.Current;
        Employee employee = streamResults.Current.Document;
    }
}

Stream a projected query

// Define a query with projected results
// Each query result is not an Emplyee document but an entity of type 'NameProjection'.
IRavenQueryable<NameProjection> query = session
    .Query<Employee>()
    .ProjectInto<NameProjection>();

// Call 'Stream' to execute the query
IEnumerator<StreamResult<NameProjection>> streamResults = session.Advanced.Stream(query);

while (streamResults.MoveNext())
{
    StreamResult<NameProjection> currentResult = streamResults.Current;
    NameProjection employeeName = streamResults.Current.Document;
}
// Define a query with projected results
// Each query result is not an Employee document but an entity of type 'NameProjection'.
IRavenQueryable<NameProjection> query = asyncSession
    .Query<Employee>()
    .ProjectInto<NameProjection>();

// Call 'StreamAsync' to execute the query
await using (IAsyncEnumerator<StreamResult<NameProjection>> streamResults =
             await asyncSession.Advanced.StreamAsync(query))
{
    while (await streamResults.MoveNextAsync())
    {
        StreamResult<NameProjection> currentResult = streamResults.Current;
        NameProjection employeeName = streamResults.Current.Document;
    }
}
// Each query result will be of this class type
public class NameProjection
{
    public string FirstName { get; set; }
    public string LastName { get; set; }
}

Stream an index query

// Define a query on an index
IQueryable<Employee> query = session.Query<Employee, Employees_ByFirstName>()
    .Where(employee => employee.FirstName == "Robert");

// Call 'Stream' to execute the query
IEnumerator<StreamResult<Employee>> streamResults = session.Advanced.Stream(query);

while (streamResults.MoveNext())
{
    StreamResult<Employee> currentResult = streamResults.Current;
    Employee employee = streamResults.Current.Document;
}
// Define a query on an index
IQueryable<Employee> query = asyncSession.Query<Employee, Employees_ByFirstName>()
    .Where(employee => employee.FirstName == "Robert");

// Call 'StreamAsync' to execute the query
await using (IAsyncEnumerator<StreamResult<Employee>> streamResults =
             await asyncSession.Advanced.StreamAsync(query))
{
    while (await streamResults.MoveNextAsync())
    {
        StreamResult<Employee> currentResult = streamResults.Current;
        Employee employee = streamResults.Current.Document;
    }
}
// The index:
public class Employees_ByFirstName : AbstractIndexCreationTask<Employee>
{
    public Employees_ByFirstName()
    {
        Map = employees => from employee in employees
            select new
            {
                FirstName = employee.FirstName
            };
    }
}


Why streaming query results does not support 'include':

  • A document can reference related documents.
  • An Include clause in a non-streamed query loads these related documents to the session
    so that they can be accessed without an additional query to the server.
  • Those included documents are sent to the client at the end of the query results.
    This does not mesh well with streaming, which is designed to allow transferring massive amounts of data,
    possibly over a significant amount of time.

How to stream related documents:

  • Instead of using include, define the query so that it will return a projection.
  • The projected query results will not be just the documents from the queried collection.
    Instead, each result will be an entity containing the related document entities in addition to the original queried document.
  • On the client side, you need to define a class that matches the projected query result.

Example:

  • The below example uses RawQuery.
    However, the same logic can be applied to a Query, DocumentQuery, or when querying an index.
  • Note:
    The projected class in the example contains the full related documents.
    However, you can project just the needed properties from the related documents.

// Define a query with a 'select' clause to project the results.

// The related Company & Employee documents are 'loaded',
// and returned in the projection together with the Order document itself.

// Each query result is not an Order document but an entity of type 'AllDocsProjection'.

IRawDocumentQuery<AllDocsProjection> query = session
    .Advanced
    .RawQuery<AllDocsProjection>(@"from Orders as o 
                                   where o.ShipTo.City = 'London'
                                   load o.Company as c, o.Employee as e
                                   select {
                                       Order: o,
                                       Company: c,
                                       Employee: e
                                   }");

// Call 'Stream' to execute the query
IEnumerator<StreamResult<AllDocsProjection>> streamResults = session.Advanced.Stream(query);

while (streamResults.MoveNext())
{
    StreamResult<AllDocsProjection> currentResult = streamResults.Current;
    AllDocsProjection projection = streamResults.Current.Document;
    
    Order theOrderDoc = projection.Order;
    Company theRelatedCompanyDoc = projection.Company;
    Employee theRelatedEmployeeDoc = projection.Employee;
}
// Define a query with a 'select' clause to project the results.

// The related Company & Employee documents are 'loaded',
// and returned in the projection together with the Order document itself.

// Each query result is not an Order document but an entity of type 'AllDocsProjection'.

IAsyncRawDocumentQuery<AllDocsProjection> query = asyncSession
    .Advanced
    .AsyncRawQuery<AllDocsProjection>(@"from Orders as o 
                                   where o.ShipTo.City = 'London'
                                   load o.Company as c, o.Employee as e
                                   select {
                                       Order: o,
                                       Company: c,
                                       Employee: e
                                   }");

// Call 'StreamAsync' to execute the query
await using (IAsyncEnumerator<StreamResult<AllDocsProjection>> streamResults =
             await asyncSession.Advanced.StreamAsync(query))
{
    while (await streamResults.MoveNextAsync())
    {
        StreamResult<AllDocsProjection> currentResult = streamResults.Current;
        AllDocsProjection projection = streamResults.Current.Document;
        
        Order theOrderDoc = projection.Order;
        Company theRelatedCompanyDoc = projection.Company;
        Employee theRelatedEmployeeDoc = projection.Employee;
    }
}
// Each query result will be of this class type
public class AllDocsProjection
{
    public Order Order { get; set; }
    public Employee Employee { get; set; }
    public Company Company { get; set; }
}

By query syntax

// Stream by query:
IEnumerator<StreamResult<T>> Stream<T>(IQueryable<T> query);
IEnumerator<StreamResult<T>> Stream<T>(IQueryable<T> query, out StreamQueryStatistics streamQueryStats);

IEnumerator<StreamResult<T>> Stream<T>(IDocumentQuery<T> query);
IEnumerator<StreamResult<T>> Stream<T>(IDocumentQuery<T> query, out StreamQueryStatistics streamQueryStats);

IEnumerator<StreamResult<T>> Stream<T>(IRawDocumentQuery<T> query);
IEnumerator<StreamResult<T>> Stream<T>(IRawDocumentQuery<T> query, out StreamQueryStatistics streamQueryStats);
Parameters type description
query IQueryable, IDocumentQuery or IRawDocumentQuery The query for which to stream results
out streamQueryStats StreamQueryStatistics Information about performed query
Return Value
IEnumerator<StreamResult<T>> Enumerator with resulting entities

Stream by prefix

Stream results by prefix


  • Streamed data can also be filtered by an ID prefix and by other filtering options, see syntax below.
  • Note: No auto-index is created when streaming results by a prefix.

string idPrefix = "Orders/";
string matches = "*25-A|77?-A";

// Filter streamed results by the passing 'prefix' and an optional 'matches' string
IEnumerator<StreamResult<Order>> streamResults = session.Advanced.Stream<Order>(idPrefix, matches);

while (streamResults.MoveNext())
{
    // Documents that will be returned are only those matching the following:
    // * Document ID starts with "Orders/"
    // * The rest of the ID (after prefix) must match the 'matches' string
    // e.g. "Orders/325-A" or Orders/772-A", etc.
    
    StreamResult<Order> currentResult = streamResults.Current;
    Order order = currentResult.Document;
}
string idPrefix = "Orders/";
string matches = "*25-A|77?-A";

// Filter streamed results by the passing 'prefix' and an optional 'matches' string
await using (IAsyncEnumerator<StreamResult<Order>> streamResults =
             await asyncSession.Advanced.StreamAsync<Order>(idPrefix, matches))
{
    while (await streamResults.MoveNextAsync())
    {
        // Documents that will be returned are only those matching the following:
        // * Document ID starts with "Orders/"
        // * The rest of the ID (after prefix) must match the 'matches' string
        // e.g. "Orders/325-A" or Orders/772-A", etc.

        StreamResult<Order> currentResult = streamResults.Current;
        Order order = currentResult.Document;
    }
}

By prefix syntax

// Stream by prefix:
IEnumerator<StreamResult<T>> Stream<T>(string startsWith, string matches = null,
    int start = 0, int pageSize = int.MaxValue, string startAfter = null);
Parameters type description
startsWith string Stream documents with this ID prefix
matches string Filter the ID part that comes after the specified prefix.
Use '?' for any character, '*' any characters.
Use '|' to separate rules.
start int Number of documents to skip
pageSize int Maximum number of documents to retrieve
startAfter string Skip fetching documents until this ID is found.
Only return documents after this ID (default: null).
Return Value
IEnumerator<StreamResult<T>> Enumerator with resulting entities