Skip to content

Commit 65aa288

Browse files
authored
feat(NODE-2014)!: return executor result from withSession and withTransaction (#3783)
1 parent 787bdbf commit 65aa288

File tree

6 files changed

+122
-65
lines changed

6 files changed

+122
-65
lines changed

src/mongo_client.ts

Lines changed: 17 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -256,7 +256,7 @@ export interface MongoClientOptions extends BSONSerializeOptions, SupportedNodeC
256256
}
257257

258258
/** @public */
259-
export type WithSessionCallback = (session: ClientSession) => Promise<any>;
259+
export type WithSessionCallback<T = unknown> = (session: ClientSession) => Promise<T>;
260260

261261
/** @internal */
262262
export interface MongoClientPrivate {
@@ -605,29 +605,30 @@ export class MongoClient extends TypedEventEmitter<MongoClientEvents> {
605605
}
606606

607607
/**
608-
* Runs a given operation with an implicitly created session. The lifetime of the session
609-
* will be handled without the need for user interaction.
608+
* A convenience method for creating and handling the clean up of a ClientSession.
609+
* The session will always be ended when the executor finishes.
610610
*
611-
* NOTE: presently the operation MUST return a Promise (either explicit or implicitly as an async function)
612-
*
613-
* @param options - Optional settings for the command
614-
* @param callback - An callback to execute with an implicitly created session
611+
* @param executor - An executor function that all operations using the provided session must be invoked in
612+
* @param options - optional settings for the session
615613
*/
616-
async withSession(callback: WithSessionCallback): Promise<void>;
617-
async withSession(options: ClientSessionOptions, callback: WithSessionCallback): Promise<void>;
618-
async withSession(
619-
optionsOrOperation: ClientSessionOptions | WithSessionCallback,
620-
callback?: WithSessionCallback
621-
): Promise<void> {
614+
async withSession<T = any>(executor: WithSessionCallback<T>): Promise<T>;
615+
async withSession<T = any>(
616+
options: ClientSessionOptions,
617+
executor: WithSessionCallback<T>
618+
): Promise<T>;
619+
async withSession<T = any>(
620+
optionsOrExecutor: ClientSessionOptions | WithSessionCallback<T>,
621+
executor?: WithSessionCallback<T>
622+
): Promise<T> {
622623
const options = {
623624
// Always define an owner
624625
owner: Symbol(),
625626
// If it's an object inherit the options
626-
...(typeof optionsOrOperation === 'object' ? optionsOrOperation : {})
627+
...(typeof optionsOrExecutor === 'object' ? optionsOrExecutor : {})
627628
};
628629

629630
const withSessionCallback =
630-
typeof optionsOrOperation === 'function' ? optionsOrOperation : callback;
631+
typeof optionsOrExecutor === 'function' ? optionsOrExecutor : executor;
631632

632633
if (withSessionCallback == null) {
633634
throw new MongoInvalidArgumentError('Missing required callback parameter');
@@ -636,7 +637,7 @@ export class MongoClient extends TypedEventEmitter<MongoClientEvents> {
636637
const session = this.startSession(options);
637638

638639
try {
639-
await withSessionCallback(session);
640+
return await withSessionCallback(session);
640641
} finally {
641642
try {
642643
await session.endSession();

src/sessions.ts

Lines changed: 34 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ export interface ClientSessionOptions {
6767
}
6868

6969
/** @public */
70-
export type WithTransactionCallback<T = void> = (session: ClientSession) => Promise<T>;
70+
export type WithTransactionCallback<T = any> = (session: ClientSession) => Promise<T>;
7171

7272
/** @public */
7373
export type ClientSessionEvents = {
@@ -432,18 +432,16 @@ export class ClientSession extends TypedEventEmitter<ClientSessionEvents> {
432432
}
433433

434434
/**
435-
* Runs a provided callback within a transaction, retrying either the commitTransaction operation
436-
* or entire transaction as needed (and when the error permits) to better ensure that
437-
* the transaction can complete successfully.
435+
* Starts a transaction and runs a provided function, ensuring the commitTransaction is always attempted when all operations run in the function have completed.
438436
*
439437
* **IMPORTANT:** This method requires the user to return a Promise, and `await` all operations.
440-
* Any callbacks that do not return a Promise will result in undefined behavior.
441438
*
442439
* @remarks
443440
* This function:
444-
* - Will return the command response from the final commitTransaction if every operation is successful (can be used as a truthy object)
445-
* - Will return `undefined` if the transaction is explicitly aborted with `await session.abortTransaction()`
446-
* - Will throw if one of the operations throws or `throw` statement is used inside the `withTransaction` callback
441+
* - If all operations successfully complete and the `commitTransaction` operation is successful, then this function will return the result of the provided function.
442+
* - If the transaction is unable to complete or an error is thrown from within the provided function, then this function will throw an error.
443+
* - If the transaction is manually aborted within the provided function it will not throw.
444+
* - May be called multiple times if the driver needs to attempt to retry the operations.
447445
*
448446
* Checkout a descriptive example here:
449447
* @see https://www.mongodb.com/developer/quickstart/node-transactions/
@@ -452,7 +450,7 @@ export class ClientSession extends TypedEventEmitter<ClientSessionEvents> {
452450
* @param options - optional settings for the transaction
453451
* @returns A raw command response or undefined
454452
*/
455-
async withTransaction<T = void>(
453+
async withTransaction<T = any>(
456454
fn: WithTransactionCallback<T>,
457455
options?: TransactionOptions
458456
): Promise<Document | undefined> {
@@ -543,25 +541,29 @@ function attemptTransactionCommit<T>(
543541
session: ClientSession,
544542
startTime: number,
545543
fn: WithTransactionCallback<T>,
546-
options?: TransactionOptions
544+
result: any,
545+
options: TransactionOptions
547546
): Promise<T> {
548-
return session.commitTransaction().catch((err: MongoError) => {
549-
if (
550-
err instanceof MongoError &&
551-
hasNotTimedOut(startTime, MAX_WITH_TRANSACTION_TIMEOUT) &&
552-
!isMaxTimeMSExpiredError(err)
553-
) {
554-
if (err.hasErrorLabel(MongoErrorLabel.UnknownTransactionCommitResult)) {
555-
return attemptTransactionCommit(session, startTime, fn, options);
556-
}
547+
return session.commitTransaction().then(
548+
() => result,
549+
(err: MongoError) => {
550+
if (
551+
err instanceof MongoError &&
552+
hasNotTimedOut(startTime, MAX_WITH_TRANSACTION_TIMEOUT) &&
553+
!isMaxTimeMSExpiredError(err)
554+
) {
555+
if (err.hasErrorLabel(MongoErrorLabel.UnknownTransactionCommitResult)) {
556+
return attemptTransactionCommit(session, startTime, fn, result, options);
557+
}
557558

558-
if (err.hasErrorLabel(MongoErrorLabel.TransientTransactionError)) {
559-
return attemptTransaction(session, startTime, fn, options);
559+
if (err.hasErrorLabel(MongoErrorLabel.TransientTransactionError)) {
560+
return attemptTransaction(session, startTime, fn, options);
561+
}
560562
}
561-
}
562563

563-
throw err;
564-
});
564+
throw err;
565+
}
566+
);
565567
}
566568

567569
const USER_EXPLICIT_TXN_END_STATES = new Set<TxnState>([
@@ -574,11 +576,11 @@ function userExplicitlyEndedTransaction(session: ClientSession) {
574576
return USER_EXPLICIT_TXN_END_STATES.has(session.transaction.state);
575577
}
576578

577-
function attemptTransaction<TSchema>(
579+
function attemptTransaction<T>(
578580
session: ClientSession,
579581
startTime: number,
580-
fn: WithTransactionCallback<TSchema>,
581-
options?: TransactionOptions
582+
fn: WithTransactionCallback<T>,
583+
options: TransactionOptions = {}
582584
): Promise<any> {
583585
session.startTransaction(options);
584586

@@ -591,18 +593,18 @@ function attemptTransaction<TSchema>(
591593

592594
if (!isPromiseLike(promise)) {
593595
session.abortTransaction().catch(() => null);
594-
throw new MongoInvalidArgumentError(
595-
'Function provided to `withTransaction` must return a Promise'
596+
return Promise.reject(
597+
new MongoInvalidArgumentError('Function provided to `withTransaction` must return a Promise')
596598
);
597599
}
598600

599601
return promise.then(
600-
() => {
602+
result => {
601603
if (userExplicitlyEndedTransaction(session)) {
602-
return;
604+
return result;
603605
}
604606

605-
return attemptTransactionCommit(session, startTime, fn, options);
607+
return attemptTransactionCommit(session, startTime, fn, result, options);
606608
},
607609
err => {
608610
function maybeRetryOrThrow(err: MongoError): Promise<any> {

test/integration/sessions/sessions.test.ts

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,7 @@ describe('Sessions Spec', function () {
8181

8282
describe('withSession', function () {
8383
let client: MongoClient;
84+
8485
beforeEach(async function () {
8586
client = await this.configuration.newClient().connect();
8687
});
@@ -184,6 +185,13 @@ describe('Sessions Spec', function () {
184185
expect(client.s.sessionPool.sessions).to.have.length(1);
185186
expect(sessionWasEnded).to.be.true;
186187
});
188+
189+
it('resolves with the value the callback returns', async () => {
190+
const result = await client.withSession(async session => {
191+
return client.db('test').collection('foo').find({}, { session }).toArray();
192+
});
193+
expect(result).to.be.an('array');
194+
});
187195
});
188196

189197
context('unacknowledged writes', () => {

test/integration/transactions/transactions.test.ts

Lines changed: 49 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import {
88
MongoNetworkError,
99
type ServerSessionPool
1010
} from '../../mongodb';
11+
import { type FailPoint } from '../../tools/utils';
1112

1213
describe('Transactions', function () {
1314
describe('withTransaction', function () {
@@ -90,33 +91,33 @@ describe('Transactions', function () {
9091
await client.close();
9192
});
9293

93-
it('should return undefined when transaction is aborted explicitly', async () => {
94+
it('returns result of executor when transaction is aborted explicitly', async () => {
9495
const session = client.startSession();
9596

9697
const withTransactionResult = await session
9798
.withTransaction(async session => {
9899
await collection.insertOne({ a: 1 }, { session });
99100
await collection.findOne({ a: 1 }, { session });
100101
await session.abortTransaction();
102+
return 'aborted!';
101103
})
102104
.finally(async () => await session.endSession());
103105

104-
expect(withTransactionResult).to.be.undefined;
106+
expect(withTransactionResult).to.equal('aborted!');
105107
});
106108

107-
it('should return raw command when transaction is successfully committed', async () => {
109+
it('returns result of executor when transaction is successfully committed', async () => {
108110
const session = client.startSession();
109111

110112
const withTransactionResult = await session
111113
.withTransaction(async session => {
112114
await collection.insertOne({ a: 1 }, { session });
113115
await collection.findOne({ a: 1 }, { session });
116+
return 'committed!';
114117
})
115118
.finally(async () => await session.endSession());
116119

117-
expect(withTransactionResult).to.exist;
118-
expect(withTransactionResult).to.be.an('object');
119-
expect(withTransactionResult).to.have.property('ok', 1);
120+
expect(withTransactionResult).to.equal('committed!');
120121
});
121122

122123
it('should throw when transaction is aborted due to an error', async () => {
@@ -136,6 +137,48 @@ describe('Transactions', function () {
136137
});
137138
}
138139
);
140+
141+
context('when retried', { requires: { mongodb: '>=4.2.0', topology: '!single' } }, () => {
142+
let client: MongoClient;
143+
let collection: Collection<{ a: number }>;
144+
145+
beforeEach(async function () {
146+
client = this.configuration.newClient();
147+
148+
await client.db('admin').command({
149+
configureFailPoint: 'failCommand',
150+
mode: { times: 2 },
151+
data: {
152+
failCommands: ['commitTransaction'],
153+
errorCode: 24,
154+
errorLabels: ['TransientTransactionError'],
155+
closeConnection: false
156+
}
157+
} as FailPoint);
158+
159+
collection = await client.db('withTransaction').createCollection('withTransactionRetry');
160+
});
161+
162+
afterEach(async () => {
163+
await client?.close();
164+
});
165+
166+
it('returns the value of the final call to the executor', async () => {
167+
const session = client.startSession();
168+
169+
let counter = 0;
170+
const withTransactionResult = await session
171+
.withTransaction(async session => {
172+
await collection.insertOne({ a: 1 }, { session });
173+
counter += 1;
174+
return counter;
175+
})
176+
.finally(async () => await session.endSession());
177+
178+
expect(counter).to.equal(3);
179+
expect(withTransactionResult).to.equal(3);
180+
});
181+
});
139182
});
140183

141184
describe('startTransaction', function () {

test/tools/unified-spec-runner/operations.ts

Lines changed: 4 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -593,18 +593,11 @@ operations.set('withTransaction', async ({ entities, operation, client, testConf
593593
maxCommitTimeMS: operation.arguments!.maxCommitTimeMS
594594
};
595595

596-
let errorFromOperations = null;
597-
const result = await session.withTransaction(async () => {
598-
errorFromOperations = await (async () => {
599-
for (const callbackOperation of operation.arguments!.callback) {
600-
await executeOperationAndCheck(callbackOperation, entities, client, testConfig);
601-
}
602-
})().catch(error => error);
596+
await session.withTransaction(async () => {
597+
for (const callbackOperation of operation.arguments!.callback) {
598+
await executeOperationAndCheck(callbackOperation, entities, client, testConfig);
599+
}
603600
}, options);
604-
605-
if (result == null || errorFromOperations) {
606-
throw errorFromOperations ?? Error('transaction not committed');
607-
}
608601
});
609602

610603
operations.set('countDocuments', async ({ entities, operation }) => {

test/types/sessions.test-d.ts

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,3 +15,13 @@ expectType<ClientSession>(
1515
})
1616
);
1717
expectError(client.startSession({ defaultTransactionOptions: { readConcern: 1 } }));
18+
19+
let something: any;
20+
expectType<number>(await client.withSession(async () => 2));
21+
expectType<string>(await client.withSession<string>(async () => something));
22+
const untypedFn: any = () => 2;
23+
expectType<any>(await client.withSession(untypedFn));
24+
const unknownFn: () => Promise<unknown> = async () => 2;
25+
expectType<unknown>(await client.withSession(unknownFn));
26+
// Not a promise returning function
27+
expectError(await client.withSession(() => null));

0 commit comments

Comments
 (0)