diff --git a/dev/src/reference/query-util.ts b/dev/src/reference/query-util.ts index c615bc254..8aad95888 100644 --- a/dev/src/reference/query-util.ts +++ b/dev/src/reference/query-util.ts @@ -181,6 +181,7 @@ export class QueryUtil< const startTime = Date.now(); const isExplain = explainOptions !== undefined; + let numDocumentsReceived = 0; let lastReceivedDocument: QueryDocumentSnapshot< AppModelType, DbModelType @@ -239,6 +240,7 @@ export class QueryUtil< ); } + ++numDocumentsReceived; callback(undefined, output); if (proto.done) { @@ -317,6 +319,12 @@ export class QueryUtil< stream.destroy(err); streamActive.resolve(/* active= */ false); } else if (lastReceivedDocument && retryWithCursor) { + if (query instanceof VectorQuery) { + throw new Error( + 'Unimplemented: Vector query does not support cursors yet.' + ); + } + logger( 'Query._stream', tag, @@ -330,12 +338,30 @@ export class QueryUtil< // the query cursor. Note that we do not use backoff here. The // call to `requestStream()` will backoff should the restart // fail before delivering any results. + let newQuery: Query; + if (!this._queryOptions.limit) { + newQuery = query; + } else { + const newLimit = + this._queryOptions.limit - numDocumentsReceived; + if ( + this._queryOptions.limitType === undefined || + this._queryOptions.limitType === LimitType.First + ) { + newQuery = query.limit(newLimit); + } else { + newQuery = query.limitToLast(newLimit); + } + } + if (this._queryOptions.requireConsistency) { - request = query + request = newQuery .startAfter(lastReceivedDocument) .toProto(lastReceivedDocument.readTime); } else { - request = query.startAfter(lastReceivedDocument).toProto(); + request = newQuery + .startAfter(lastReceivedDocument) + .toProto(); } // Set lastReceivedDocument to null before each retry attempt to ensure the retry makes progress diff --git a/dev/test/query.ts b/dev/test/query.ts index a5eee6841..a45278208 100644 --- a/dev/test/query.ts +++ b/dev/test/query.ts @@ -18,6 +18,7 @@ import {describe, it, beforeEach, afterEach} from 'mocha'; import {expect, use} from 'chai'; import * as chaiAsPromised from 'chai-as-promised'; import * as extend from 'extend'; +import * as assert from 'assert'; import {firestore, google} from '../protos/firestore_v1_proto_api'; import { @@ -55,7 +56,6 @@ import {GoogleError, Status} from 'google-gax'; import api = google.firestore.v1; import protobuf = google.protobuf; import {Filter} from '../src/filter'; -import {Deferred} from '../src/util'; const PROJECT_ID = 'test-project'; const DATABASE_ROOT = `projects/${PROJECT_ID}/databases/(default)`; @@ -3577,70 +3577,136 @@ describe('query resumption', () => { setTimeoutHandler(setTimeout); }); - // Prevent regression of - // https://github.com/googleapis/nodejs-firestore/issues/1790 - it('results should not be double produced on retryable error with back pressure', async () => { - // Generate the IDs of the documents that will match the query. - const documentIds = Array.from(new Array(500), (_, index) => `doc${index}`); + // Return `numDocs` document responses, followed by an error response. + function* getDocResponsesFollowedByError( + documentIds: string[], + numDocs: number, + error: Error, + startAtEnd?: boolean + ): Generator { + assert(numDocs <= documentIds.length); + const sliced = startAtEnd + ? documentIds.slice(-1 * numDocs) + : documentIds.slice(0, numDocs); + let runQueryResponses = sliced.map(documentId => result(documentId)); + if (startAtEnd) { + runQueryResponses = runQueryResponses.reverse(); + } + for (const runQueryResponse of runQueryResponses) { + yield runQueryResponse; + } + yield error; + } - // Finds the index in `documentIds` of the document referred to in the - // "startAt" of the given request. - function getStartAtDocumentIndex( - request: api.IRunQueryRequest - ): number | null { - const startAt = request.structuredQuery?.startAt; - const startAtValue = startAt?.values?.[0]?.referenceValue; - const startAtBefore = startAt?.before; - if (typeof startAtValue !== 'string') { - return null; + // Returns the documents from the given `documentIds` starting at the cursor + // determined by `startAt` (or `endAt`) in the request. It will continue to + // return documents until either the request `limit` is reached or `numDocs` + // (if provided) is reached. If an `error` is provided, it will return the + // given error after the docs are returned. + function* getDocResponsesForRequest( + request: api.IRunQueryRequest, + documentIds: string[], + options?: { + numDocs?: number; + error?: Error; + } + ): Generator { + let begin: number | null | undefined; + let end: number | null | undefined; + let reverseOrder: boolean; + if (request.structuredQuery?.startAt) { + begin = getStartAtDocumentIndex(request, documentIds); + if (begin === null) { + throw new Error('the request should specify a valid startAt'); + } + if (request.structuredQuery.limit?.value) { + end = begin + request.structuredQuery.limit.value; + } else { + end = undefined; } - const docId = startAtValue.split('/').pop()!; - const docIdIndex = documentIds.indexOf(docId); - if (docIdIndex < 0) { - return null; + reverseOrder = false; + } else if (request.structuredQuery?.endAt) { + end = getEndAtDocumentIndex(request, documentIds); + if (end === null) { + throw new Error('the request should specify a valid endAt'); } - return startAtBefore ? docIdIndex : docIdIndex + 1; + if (request.structuredQuery.limit?.value) { + begin = end - request.structuredQuery.limit.value; + } else { + begin = undefined; + } + reverseOrder = true; + } else { + throw new Error('the request does not specify a valid startAt or endAt'); } - const RETRYABLE_ERROR_DOMAIN = 'RETRYABLE_ERROR_DOMAIN'; + const runQueryResponses = documentIds + .slice(begin, end) + .map(documentId => result(documentId)); + + let numDocsReturned = 0; + for (const runQueryResponse of reverseOrder + ? runQueryResponses.reverse() + : runQueryResponses) { + // If `numDocs` is provided, stop iterating when it is reached. + if (options?.numDocs && numDocsReturned === options.numDocs) { + break; + } + numDocsReturned++; + yield runQueryResponse; + } - // A mock replacement for Query._isPermanentRpcError which (a) resolves - // a promise once invoked and (b) treats a specific error "domain" as - // non-retryable. - function mockIsPermanentRpcError(err: GoogleError): boolean { - mockIsPermanentRpcError.invoked.resolve(true); - return err?.domain !== RETRYABLE_ERROR_DOMAIN; + // If `error` is provided, emit it after all the docs. + if (options?.error) { + yield options.error; } - mockIsPermanentRpcError.invoked = new Deferred(); - - // Return the first half of the documents, followed by a retryable error. - function* getRequest1Responses(): Generator { - const runQueryResponses = documentIds - .slice(0, documentIds.length / 2) - .map(documentId => result(documentId)); - for (const runQueryResponse of runQueryResponses) { - yield runQueryResponse; - } - const retryableError = new GoogleError('simulated retryable error'); - retryableError.domain = RETRYABLE_ERROR_DOMAIN; - yield retryableError; + } + + // Finds the index in `documentIds` of the document referred to in the + // "startAt" of the given request. Returns `null` if it cannot find one. + function getStartAtDocumentIndex( + request: api.IRunQueryRequest, + documentIds: string[] + ): number | null { + const startAt = request.structuredQuery?.startAt; + const startAtValue = startAt?.values?.[0]?.referenceValue; + const startAtBefore = startAt?.before; + if (typeof startAtValue !== 'string') { + return null; + } + const docId = startAtValue.split('/').pop()!; + const docIdIndex = documentIds.indexOf(docId); + if (docIdIndex < 0) { + return null; } + return startAtBefore ? docIdIndex : docIdIndex + 1; + } - // Return the remaining documents. - function* getRequest2Responses( - request: api.IRunQueryRequest - ): Generator { - const startAtDocumentIndex = getStartAtDocumentIndex(request); - if (startAtDocumentIndex === null) { - throw new Error('request #2 should specify a valid startAt'); - } - const runQueryResponses = documentIds - .slice(startAtDocumentIndex) - .map(documentId => result(documentId)); - for (const runQueryResponse of runQueryResponses) { - yield runQueryResponse; - } + // Finds the index in `documentIds` of the document referred to in the + // "endAt" of the given request. Returns `null` if it cannot find one. + function getEndAtDocumentIndex( + request: api.IRunQueryRequest, + documentIds: string[] + ): number | null { + const endAt = request.structuredQuery?.endAt; + const endAtValue = endAt?.values?.[0]?.referenceValue; + const endAtBefore = endAt?.before; + if (typeof endAtValue !== 'string') { + return null; + } + const docId = endAtValue.split('/').pop()!; + const docIdIndex = documentIds.indexOf(docId); + if (docIdIndex < 0) { + return null; } + return endAtBefore ? docIdIndex : docIdIndex - 1; + } + + // Prevent regression of + // https://github.com/googleapis/nodejs-firestore/issues/1790 + it('results should not be double produced on retryable error with back pressure', async () => { + // Generate the IDs of the documents that will match the query. + const documentIds = Array.from(new Array(500), (_, index) => `doc${index}`); // Set up the mocked responses from Watch. let requestNum = 0; @@ -3649,27 +3715,30 @@ describe('query resumption', () => { requestNum++; switch (requestNum) { case 1: - return stream(...getRequest1Responses()); + // Return the first half of the documents, followed by a retryable error. + return stream( + ...getDocResponsesFollowedByError( + documentIds, + documentIds.length / 2, + new GoogleError('simulated retryable error') + ) + ); case 2: - return stream(...getRequest2Responses(request!)); + // Return the remaining documents. + return stream(...getDocResponsesForRequest(request!, documentIds)); default: throw new Error(`should never get here (requestNum=${requestNum})`); } }, }; - // Create an async iterator to get the result set but DO NOT iterate over - // it immediately. Instead, allow the responses to pile up and fill the - // buffers. Once isPermanentError() is invoked, indicating that the first - // request has failed and is about to be retried, collect the results from - // the async iterator into an array. + // Create an async iterator to get the result set. firestore = await createInstance(overrides); const query = firestore.collection('collectionId'); - query._queryUtil._isPermanentRpcError = mockIsPermanentRpcError; + query._queryUtil._isPermanentRpcError = () => false; const iterator = query .stream() [Symbol.asyncIterator]() as AsyncIterator; - await mockIsPermanentRpcError.invoked.promise; const snapshots = await collect(iterator); // Verify that the async iterator returned the correct documents and, @@ -3677,4 +3746,202 @@ describe('query resumption', () => { const actualDocumentIds = snapshots.map(snapshot => snapshot.id); expect(actualDocumentIds).to.eql(documentIds); }); + + it('resuming queries with a cursor should respect the original query limit', async () => { + // Generate the IDs of the documents that will match the query. + const documentIds = Array.from(new Array(500), (_, index) => `doc${index}`); + + // Set up the mocked responses from Watch. + let requestNum = 0; + const overrides: ApiOverride = { + runQuery: request => { + requestNum++; + switch (requestNum) { + case 1: + return stream( + ...getDocResponsesFollowedByError( + documentIds, + documentIds.length / 2, + new GoogleError('simulated retryable error') + ) + ); + case 2: + return stream(...getDocResponsesForRequest(request!, documentIds)); + default: + throw new Error(`should never get here (requestNum=${requestNum})`); + } + }, + }; + + // Create an async iterator to get the result set. + const limit = 300; + firestore = await createInstance(overrides); + const query = firestore.collection('collectionId').limit(limit); + query._queryUtil._isPermanentRpcError = () => false; + const iterator = query + .stream() + [Symbol.asyncIterator]() as AsyncIterator; + const snapshots = await collect(iterator); + + // Verify that we got the correct number of results, and the results match + // the documents we expect. + const actualDocumentIds = snapshots.map(snapshot => snapshot.id); + expect(actualDocumentIds.length).to.eql(limit); + expect(actualDocumentIds).to.eql(documentIds.slice(0, limit)); + }); + + it('resuming queries with a cursor should respect the original query limitToLast', async () => { + // Generate the IDs of the documents that will match the query. + const documentIds = Array.from(new Array(500), (_, index) => `doc${index}`); + + // Set up the mocked responses from Watch. + let requestNum = 0; + const overrides: ApiOverride = { + runQuery: request => { + requestNum++; + switch (requestNum) { + case 1: + return stream( + ...getDocResponsesFollowedByError( + documentIds, + documentIds.length / 2, + new GoogleError('simulated retryable error'), + /*startAtEnd*/ true + ) + ); + case 2: + return stream(...getDocResponsesForRequest(request!, documentIds)); + default: + throw new Error(`should never get here (requestNum=${requestNum})`); + } + }, + }; + + // `stream()` cannot be called for `limitToLast` queries. We can, however, + // test using the `.get()` method which does some additional processing. + const limit = 300; + firestore = await createInstance(overrides); + const query = firestore + .collection('collectionId') + .orderBy(FieldPath.documentId()) + .limitToLast(limit); + query._queryUtil._isPermanentRpcError = () => false; + const snapshots = await query.get(); + + // Verify that we got the correct number of results, and the results match + // the documents we expect. + const actualDocumentIds = snapshots.docs.map(snapshot => snapshot.id); + expect(actualDocumentIds.length).to.eql(limit); + // slice(-limit) returns the last `limit` documents in the array. + expect(actualDocumentIds).to.eql(documentIds.slice(-limit)); + }); + + it('resuming queries with multiple failures should respect the original limit', async () => { + // Generate the IDs of the documents that will match the query. + const documentIds = Array.from(new Array(600), (_, index) => `doc${index}`); + + // Set up the mocked responses from Watch. + let requestNum = 0; + const overrides: ApiOverride = { + runQuery: request => { + requestNum++; + switch (requestNum) { + case 1: + // Get the first 60 documents followed by a retryable error. + return stream( + ...getDocResponsesFollowedByError( + documentIds, + documentIds.length / 10, + new GoogleError('simulated retryable error') + ) + ); + case 2: + // Get the another 120 documents followed by a retryable error. + return stream( + ...getDocResponsesForRequest(request!, documentIds, { + numDocs: documentIds.length / 5, + error: new GoogleError('simulated retryable error'), + }) + ); + case 3: + // Get the rest of the documents. + return stream(...getDocResponsesForRequest(request!, documentIds)); + default: + throw new Error(`should never get here (requestNum=${requestNum})`); + } + }, + }; + + // Create an async iterator to get the result set. + const limit = 300; + firestore = await createInstance(overrides); + const query = firestore.collection('collectionId').limit(limit); + query._queryUtil._isPermanentRpcError = () => false; + const iterator = query + .stream() + [Symbol.asyncIterator]() as AsyncIterator; + const snapshots = await collect(iterator); + + // Verify that we got the correct number of results, and the results match + // the documents we expect. + const actualDocumentIds = snapshots.map(snapshot => snapshot.id); + expect(actualDocumentIds.length).to.eql(limit); + expect(actualDocumentIds).to.eql(documentIds.slice(0, limit)); + }); + + it('resuming queries with multiple failures should respect the original limitToLast', async () => { + // Generate the IDs of the documents that will match the query. + const documentIds = Array.from(new Array(600), (_, index) => `doc${index}`); + + // Set up the mocked responses from Watch. + let requestNum = 0; + const overrides: ApiOverride = { + runQuery: request => { + requestNum++; + switch (requestNum) { + case 1: + // Get the first 60 documents followed by a retryable error. + return stream( + ...getDocResponsesFollowedByError( + documentIds, + documentIds.length / 10, + new GoogleError('simulated retryable error'), + /*startAtEnd*/ true + ) + ); + case 2: + // Get the another 120 documents followed by a retryable error. + return stream( + ...getDocResponsesForRequest(request!, documentIds, { + numDocs: documentIds.length / 5, + error: new GoogleError('simulated retryable error'), + }) + ); + case 3: + // Get the rest of the documents. + return stream(...getDocResponsesForRequest(request!, documentIds)); + default: + throw new Error(`should never get here (requestNum=${requestNum})`); + } + }, + }; + + // `stream()` cannot be called for `limitToLast` queries. We can, however, + // test using the `.get()` method which does some additional processing. + const limit = 300; + firestore = await createInstance(overrides); + const query = firestore + .collection('collectionId') + .orderBy(FieldPath.documentId()) + .limitToLast(limit); + query._queryUtil._isPermanentRpcError = () => false; + const snapshots = await query.get(); + + // Verify that we got the correct number of results, and the results match + // the documents we expect. + const actualDocumentIds = snapshots.docs.map(snapshot => snapshot.id); + expect(actualDocumentIds.length).to.eql(limit); + // slice(-limit) returns the last `limit` documents in the array. + expect(actualDocumentIds).to.eql(documentIds.slice(-limit)); + }); }); diff --git a/dev/test/recursive-delete.ts b/dev/test/recursive-delete.ts index e96f6c4f6..121d29033 100644 --- a/dev/test/recursive-delete.ts +++ b/dev/test/recursive-delete.ts @@ -226,7 +226,7 @@ describe('recursiveDelete() method:', () => { 'LESS_THAN', endAt('root') ), - limit(RECURSIVE_DELETE_MAX_PENDING_OPS) + limit(RECURSIVE_DELETE_MAX_PENDING_OPS - 1) ); return stream(); }