Skip to content

Commit 3cb8187

Browse files
W-A-Jamesnbbeekenbaileympearson
authored andcommitted
feat(NODE-6275): Add CSOT support to GridFS (#4246)
Co-authored-by: Neal Beeken <[email protected]> Co-authored-by: Bailey Pearson <[email protected]>
1 parent c5a9ae5 commit 3cb8187

File tree

11 files changed

+634
-88
lines changed

11 files changed

+634
-88
lines changed

package-lock.json

+5-4
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

package.json

+1-1
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@
9797
"mocha": "^10.4.0",
9898
"mocha-sinon": "^2.1.2",
9999
"mongodb-client-encryption": "^6.1.0",
100-
"mongodb-legacy": "^6.1.1",
100+
"mongodb-legacy": "^6.1.2",
101101
"nyc": "^15.1.0",
102102
"prettier": "^3.3.3",
103103
"semver": "^7.6.3",

src/collection.ts

+8-2
Original file line numberDiff line numberDiff line change
@@ -501,12 +501,18 @@ export class Collection<TSchema extends Document = Document> {
501501
*/
502502
async findOne(): Promise<WithId<TSchema> | null>;
503503
async findOne(filter: Filter<TSchema>): Promise<WithId<TSchema> | null>;
504-
async findOne(filter: Filter<TSchema>, options: FindOptions): Promise<WithId<TSchema> | null>;
504+
async findOne(
505+
filter: Filter<TSchema>,
506+
options: Omit<FindOptions, 'timeoutMode'>
507+
): Promise<WithId<TSchema> | null>;
505508

506509
// allow an override of the schema.
507510
async findOne<T = TSchema>(): Promise<T | null>;
508511
async findOne<T = TSchema>(filter: Filter<TSchema>): Promise<T | null>;
509-
async findOne<T = TSchema>(filter: Filter<TSchema>, options?: FindOptions): Promise<T | null>;
512+
async findOne<T = TSchema>(
513+
filter: Filter<TSchema>,
514+
options?: Omit<FindOptions, 'timeoutMode'>
515+
): Promise<T | null>;
510516

511517
async findOne(
512518
filter: Filter<TSchema> = {},

src/gridfs/download.ts

+40-4
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ import { Readable } from 'stream';
22

33
import type { Document, ObjectId } from '../bson';
44
import type { Collection } from '../collection';
5+
import { CursorTimeoutMode } from '../cursor/abstract_cursor';
56
import type { FindCursor } from '../cursor/find_cursor';
67
import {
78
MongoGridFSChunkError,
@@ -12,6 +13,7 @@ import {
1213
import type { FindOptions } from '../operations/find';
1314
import type { ReadPreference } from '../read_preference';
1415
import type { Sort } from '../sort';
16+
import { CSOTTimeoutContext } from '../timeout';
1517
import type { Callback } from '../utils';
1618
import type { GridFSChunk } from './upload';
1719

@@ -28,7 +30,7 @@ export interface GridFSBucketReadStreamOptions {
2830
* to be returned by the stream. `end` is non-inclusive
2931
*/
3032
end?: number;
31-
/** @internal TODO(NODE-5688): make this public */
33+
/** @public */
3234
timeoutMS?: number;
3335
}
3436

@@ -98,8 +100,10 @@ export interface GridFSBucketReadStreamPrivate {
98100
skip?: number;
99101
start: number;
100102
end: number;
103+
timeoutMS?: number;
101104
};
102105
readPreference?: ReadPreference;
106+
timeoutContext?: CSOTTimeoutContext;
103107
}
104108

105109
/**
@@ -148,7 +152,11 @@ export class GridFSBucketReadStream extends Readable {
148152
end: 0,
149153
...options
150154
},
151-
readPreference
155+
readPreference,
156+
timeoutContext:
157+
options?.timeoutMS != null
158+
? new CSOTTimeoutContext({ timeoutMS: options.timeoutMS, serverSelectionTimeoutMS: 0 })
159+
: undefined
152160
};
153161
}
154162

@@ -196,7 +204,8 @@ export class GridFSBucketReadStream extends Readable {
196204
async abort(): Promise<void> {
197205
this.push(null);
198206
this.destroy();
199-
await this.s.cursor?.close();
207+
const remainingTimeMS = this.s.timeoutContext?.getRemainingTimeMSOrThrow();
208+
await this.s.cursor?.close({ timeoutMS: remainingTimeMS });
200209
}
201210
}
202211

@@ -352,7 +361,22 @@ function init(stream: GridFSBucketReadStream): void {
352361
filter['n'] = { $gte: skip };
353362
}
354363
}
355-
stream.s.cursor = stream.s.chunks.find(filter).sort({ n: 1 });
364+
365+
let remainingTimeMS: number | undefined;
366+
try {
367+
remainingTimeMS = stream.s.timeoutContext?.getRemainingTimeMSOrThrow(
368+
`Download timed out after ${stream.s.timeoutContext?.timeoutMS}ms`
369+
);
370+
} catch (error) {
371+
return stream.destroy(error);
372+
}
373+
374+
stream.s.cursor = stream.s.chunks
375+
.find(filter, {
376+
timeoutMode: stream.s.options.timeoutMS != null ? CursorTimeoutMode.LIFETIME : undefined,
377+
timeoutMS: remainingTimeMS
378+
})
379+
.sort({ n: 1 });
356380

357381
if (stream.s.readPreference) {
358382
stream.s.cursor.withReadPreference(stream.s.readPreference);
@@ -371,6 +395,18 @@ function init(stream: GridFSBucketReadStream): void {
371395
return;
372396
};
373397

398+
let remainingTimeMS: number | undefined;
399+
try {
400+
remainingTimeMS = stream.s.timeoutContext?.getRemainingTimeMSOrThrow(
401+
`Download timed out after ${stream.s.timeoutContext?.timeoutMS}ms`
402+
);
403+
} catch (error) {
404+
if (!stream.destroyed) stream.destroy(error);
405+
return;
406+
}
407+
408+
findOneOptions.timeoutMS = remainingTimeMS;
409+
374410
stream.s.files.findOne(stream.s.filter, findOneOptions).then(handleReadResult, error => {
375411
if (stream.destroyed) return;
376412
stream.destroy(error);

src/gridfs/index.ts

+59-15
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,12 @@ import type { ObjectId } from '../bson';
22
import type { Collection } from '../collection';
33
import type { FindCursor } from '../cursor/find_cursor';
44
import type { Db } from '../db';
5-
import { MongoRuntimeError } from '../error';
5+
import { MongoOperationTimeoutError, MongoRuntimeError } from '../error';
66
import { type Filter, TypedEventEmitter } from '../mongo_types';
77
import type { ReadPreference } from '../read_preference';
88
import type { Sort } from '../sort';
9+
import { CSOTTimeoutContext } from '../timeout';
10+
import { resolveOptions } from '../utils';
911
import { WriteConcern, type WriteConcernOptions } from '../write_concern';
1012
import type { FindOptions } from './../operations/find';
1113
import {
@@ -48,6 +50,7 @@ export interface GridFSBucketPrivate {
4850
chunkSizeBytes: number;
4951
readPreference?: ReadPreference;
5052
writeConcern: WriteConcern | undefined;
53+
timeoutMS?: number;
5154
};
5255
_chunksCollection: Collection<GridFSChunk>;
5356
_filesCollection: Collection<GridFSFile>;
@@ -81,11 +84,11 @@ export class GridFSBucket extends TypedEventEmitter<GridFSBucketEvents> {
8184
constructor(db: Db, options?: GridFSBucketOptions) {
8285
super();
8386
this.setMaxListeners(0);
84-
const privateOptions = {
87+
const privateOptions = resolveOptions(db, {
8588
...DEFAULT_GRIDFS_BUCKET_OPTIONS,
8689
...options,
8790
writeConcern: WriteConcern.fromOptions(options)
88-
};
91+
});
8992
this.s = {
9093
db,
9194
options: privateOptions,
@@ -109,7 +112,10 @@ export class GridFSBucket extends TypedEventEmitter<GridFSBucketEvents> {
109112
filename: string,
110113
options?: GridFSBucketWriteStreamOptions
111114
): GridFSBucketWriteStream {
112-
return new GridFSBucketWriteStream(this, filename, options);
115+
return new GridFSBucketWriteStream(this, filename, {
116+
timeoutMS: this.s.options.timeoutMS,
117+
...options
118+
});
113119
}
114120

115121
/**
@@ -122,7 +128,11 @@ export class GridFSBucket extends TypedEventEmitter<GridFSBucketEvents> {
122128
filename: string,
123129
options?: GridFSBucketWriteStreamOptions
124130
): GridFSBucketWriteStream {
125-
return new GridFSBucketWriteStream(this, filename, { ...options, id });
131+
return new GridFSBucketWriteStream(this, filename, {
132+
timeoutMS: this.s.options.timeoutMS,
133+
...options,
134+
id
135+
});
126136
}
127137

128138
/** Returns a readable stream (GridFSBucketReadStream) for streaming file data from GridFS. */
@@ -135,7 +145,7 @@ export class GridFSBucket extends TypedEventEmitter<GridFSBucketEvents> {
135145
this.s._filesCollection,
136146
this.s.options.readPreference,
137147
{ _id: id },
138-
options
148+
{ timeoutMS: this.s.options.timeoutMS, ...options }
139149
);
140150
}
141151

@@ -144,11 +154,27 @@ export class GridFSBucket extends TypedEventEmitter<GridFSBucketEvents> {
144154
*
145155
* @param id - The id of the file doc
146156
*/
147-
async delete(id: ObjectId): Promise<void> {
148-
const { deletedCount } = await this.s._filesCollection.deleteOne({ _id: id });
157+
async delete(id: ObjectId, options?: { timeoutMS: number }): Promise<void> {
158+
const { timeoutMS } = resolveOptions(this.s.db, options);
159+
let timeoutContext: CSOTTimeoutContext | undefined = undefined;
160+
161+
if (timeoutMS) {
162+
timeoutContext = new CSOTTimeoutContext({
163+
timeoutMS,
164+
serverSelectionTimeoutMS: this.s.db.client.options.serverSelectionTimeoutMS
165+
});
166+
}
149167

168+
const { deletedCount } = await this.s._filesCollection.deleteOne(
169+
{ _id: id },
170+
{ timeoutMS: timeoutContext?.remainingTimeMS }
171+
);
172+
173+
const remainingTimeMS = timeoutContext?.remainingTimeMS;
174+
if (remainingTimeMS != null && remainingTimeMS <= 0)
175+
throw new MongoOperationTimeoutError(`Timed out after ${timeoutMS}ms`);
150176
// Delete orphaned chunks before returning FileNotFound
151-
await this.s._chunksCollection.deleteMany({ files_id: id });
177+
await this.s._chunksCollection.deleteMany({ files_id: id }, { timeoutMS: remainingTimeMS });
152178

153179
if (deletedCount === 0) {
154180
// TODO(NODE-3483): Replace with more appropriate error
@@ -188,7 +214,7 @@ export class GridFSBucket extends TypedEventEmitter<GridFSBucketEvents> {
188214
this.s._filesCollection,
189215
this.s.options.readPreference,
190216
{ filename },
191-
{ ...options, sort, skip }
217+
{ timeoutMS: this.s.options.timeoutMS, ...options, sort, skip }
192218
);
193219
}
194220

@@ -198,18 +224,36 @@ export class GridFSBucket extends TypedEventEmitter<GridFSBucketEvents> {
198224
* @param id - the id of the file to rename
199225
* @param filename - new name for the file
200226
*/
201-
async rename(id: ObjectId, filename: string): Promise<void> {
227+
async rename(id: ObjectId, filename: string, options?: { timeoutMS: number }): Promise<void> {
202228
const filter = { _id: id };
203229
const update = { $set: { filename } };
204-
const { matchedCount } = await this.s._filesCollection.updateOne(filter, update);
230+
const { matchedCount } = await this.s._filesCollection.updateOne(filter, update, options);
205231
if (matchedCount === 0) {
206232
throw new MongoRuntimeError(`File with id ${id} not found`);
207233
}
208234
}
209235

210236
/** Removes this bucket's files collection, followed by its chunks collection. */
211-
async drop(): Promise<void> {
212-
await this.s._filesCollection.drop();
213-
await this.s._chunksCollection.drop();
237+
async drop(options?: { timeoutMS: number }): Promise<void> {
238+
const { timeoutMS } = resolveOptions(this.s.db, options);
239+
let timeoutContext: CSOTTimeoutContext | undefined = undefined;
240+
241+
if (timeoutMS) {
242+
timeoutContext = new CSOTTimeoutContext({
243+
timeoutMS,
244+
serverSelectionTimeoutMS: this.s.db.client.options.serverSelectionTimeoutMS
245+
});
246+
}
247+
248+
if (timeoutContext) {
249+
await this.s._filesCollection.drop({ timeoutMS: timeoutContext.remainingTimeMS });
250+
const remainingTimeMS = timeoutContext.getRemainingTimeMSOrThrow(
251+
`Timed out after ${timeoutMS}ms`
252+
);
253+
await this.s._chunksCollection.drop({ timeoutMS: remainingTimeMS });
254+
} else {
255+
await this.s._filesCollection.drop();
256+
await this.s._chunksCollection.drop();
257+
}
214258
}
215259
}

0 commit comments

Comments
 (0)