Stream Query Results



Streaming overview

  • Immediate processing:
    Neither the client nor the server holds the full response in memory.
    Instead, as soon as the server has a single result, it sends it to the client.
    Thus, your application can start processing results before the server sends them all.

  • No tracking:
    The stream results are Not tracked by the session.
    Changes made to the resulting entities will not be sent to the server when saveChanges is called.

  • A snapshot of the data:
    The stream results are a snapshot of the data at the time when the query is computed by the server.
    Results that match the query after it was already processed are Not streamed to the client.

  • Query limitations::

    • A streaming query does not wait for indexing by design.
      So calling waitForNonStaleResults is Not supported and will result in an exception.

    • Using include to load a related document to the session in a streaming query is Not supported.
      Learn how to stream related documents here below.

Stream by query

Stream a dynamic query

// Define a query on a collection
const query = session.query({ collection: "employees" })
    .whereEquals('FirstName', 'Robert');

// Call stream() to execute the query, it returns a Node.js ReadableStream.
// Parms: pass the query and an optional callback for getting the query stats.
let streamQueryStats;
const queryStream = await session.advanced.stream(query, s => streamQueryStats = s);

// Two options to get query stats:
// * Pass a callback to stream() with an 'out param' that will be filled with query stats.
//   This param can then be accessed in the 'end' event.
// * Or: Use an event listener, listen to the 'stats' event, as described below.

// Handle stream events with callback functions:        

// Process the item received:
queryStream.on("data", resultItem => {
    // Get the employee entity from the result item.
    // Note: This entity will Not be tracked by the session.
    const employee = resultItem.document;

    // The resultItem also provides the following:
    const employeeId = resultItem.id;
    const documentMetadata = resultItem.metadata;
    const documentChangeVector = resultItem.changeVector;
});

// Can get query stats by using an event listener:
queryStream.once("stats", queryStats => {
    // Get number of total results
    const totalResults = queryStats.totalResults;
    // Get the Auto-Index that was used/created with this dynamic query
    const indexUsed = queryStats.indexName;
});

// Stream emits an 'end' event when there is no more data to read:
queryStream.on("end", () => {            
    // Get info from 'streamQueryStats', the stats object
    const totalResults = streamQueryStats.totalResults;
    const indexUsed = streamQueryStats.indexName;
});

queryStream.on("error", err => {
    // Handle errors
});

Stream a dynamic raw query

// Define a raw query using RQL
const rawQuery = session.advanced
    .rawQuery("from Employees where FirstName = 'Robert'");

// Call stream() to execute the query
const queryStream = await session.advanced.stream(rawQuery);

// Handle stats & stream events as described in the dynamic query example above.

Stream a projected query

// Define a query with projected results
// Each query result is not an Employee document but an entity containing selected fields only.
const projectedQuery = session.query({collection: 'employees'})
    .selectFields(['FirstName', 'LastName']);
       
// Call stream() to execute the query
const queryStream = await session.advanced.stream(projectedQuery);

queryStream.on("data", resultItem => {
    // entity contains only the projected fields
    const employeeName = resultItem.document;
});

// Handle stats & stream events as described in the dynamic query example above.

Stream an index query

// Define a query on an index
const query = session.query({ indexName: "Employees/ByFirstName" })
    .whereEquals("FirstName", "Robert");

// Call stream() to execute the query
const queryStream = await session.advanced.stream(query);

// Can get info about the index used from the stats
queryStream.once("stats", queryStats => {
    const indexUsed = queryStats.indexName;
    const isIndexStale = queryStats.stale;
    const lastTimeIndexWasUpdated = queryStats.indexTimestamp;
});

// Handle stats & stream events as described in the dynamic query example above.
// The index:
class Employees_ByFirstName extends AbstractJavaScriptIndexCreationTask {

    constructor () {
        super();

        this.map("Employees", employee => {
            return {
                firstName: employee.FirstName
            }
        });
    }
}


Why streaming query results does not support 'include':

  • A document can reference related documents.
  • An include clause in a non-streamed query loads these related documents to the session
    so that they can be accessed without an additional query to the server.
  • Those included documents are sent to the client at the end of the query results.
    This does not mesh well with streaming, which is designed to allow transferring massive amounts of data,
    possibly over a significant amount of time.

How to stream related documents:

  • Instead of using include, define the query so that it will return a projection.
  • The projected query results will not be just the documents from the queried collection.
    Instead, each result will be an entity containing the related document entities in addition to the original queried document.
  • On the client side, you need to define a class that matches the projected query result.

Example:

  • The below example uses RawQuery.
    However, the same logic can be applied to a Query, DocumentQuery, or when querying an index.
  • Note:
    The projected class in the example contains the full related documents.
    However, you can project just the needed properties from the related documents.

// Define a query with a 'select' clause to project the results.

// The related Company & Employee documents are 'loaded',
// and returned in the projection together with the Order document itself.

// Each query result is not an Order document
// but an entity containing the document & the related documents. 
const rawQuery = session.advanced
    .rawQuery(`from Orders as o
               where o.ShipTo.City = 'London'
               load o.Company as c, o.Employee as e
               select {
                   order: o,
                   company: c,
                   employee: e
               }`);

// Call stream() to execute the query
const queryStream = await session.advanced.stream(rawQuery);

queryStream.on("data", resultItem => {
    const theOrderDocument = resultItem.document.order;
    const theCompanyDocument = resultItem.document.company;
    const theEmployeeDocument = resultItem.document.employee;
});

// Handle stats & stream events as described in the dynamic query example above.

By query syntax

await session.advanced.stream(query, [statsCallback]);
Parameters type description
query IDocumentQuery or IRawDocumentQuery The query for which to stream results
statsCallback (streamStats) => void
  • An optional callback function with an output parameter.
  • The parameter passed to the callback will be filled with the StreamQueryStatistics object when query returns.
Return Value
Promise<DocumentResultStream> A Promise resolving to readable stream with query results
StreamQueryStatistics
totalResults number Total number of results
resultEtag number An Etag that is specific for the query results
indexName string Name of index that was used for the query
indexTimestamp object Time when index was last updated
stale boolean true if index is stale

Stream by prefix

Stream results by prefix


  • Streamed data can also be filtered by an ID prefix and by some filtering options, see below.
  • Note: No auto-index is created when streaming results by a prefix.

const idPrefix = "Order";

// Filter streamed results by passing an ID prefix
const streamResults = await session.advanced.stream(idPrefix);

queryStream.on("data", resultItem => {
    // Only documents with ID that starts with 'Order' 
    const resultDocument = resultItem.document;
});

queryStream.on("end", () => {
    // Stream ended, no more data
});

queryStream.on("error", err => {
    // Handle errors
});
const idPrefix = "Orders/";
const options = {
    matches: "*25-A|77?-A"
}

// Filter streamed results by ID prefix and by options
const streamResults = await session.advanced.stream(idPrefix, options);

queryStream.on("data", resultItem => {
    // Documents that will be returned are only those matching the following:
    // * Document ID starts with "Orders/"
    // * The rest of the ID (after prefix) must match the 'matches' string
    // e.g. "Orders/325-A" or Orders/772-A", etc.
    
    const resultDocument = resultItem.document;
});

queryStream.on("end", () => {
    // Stream ended, no more data
});

queryStream.on("error", err => {
    // Handle errors
});

By prefix syntax

await session.advanced.stream(idPrefix);
await session.advanced.stream(idPrefix, options);
Parameters type description
idPrefix string Stream documents with this ID prefix
options StartingWithOptions More filtering options, see description below
Return Value
Promise<DocumentResultStream> A Promise resolving to readable stream with query results
StartingWithOptions
matches number Filter the ID part that comes after the specified prefix.
Use '?' for any character, '*' any characters.
Use '|' to separate rules.
start number Number of documents to skip
pageSize number Maximum number of documents to retrieve
exclude strring Maximum number of documents to retrieve
startAfter string Skip fetching documents until this ID is found.
Only return documents after this ID (default: null).