@@ -75,6 +75,7 @@ function ShareDbMongo(mongo, options) {
75
75
76
76
this . _middleware = new MiddlewareHandler ( ) ;
77
77
78
+ this . _transactionConnections = Object . create ( null ) ;
78
79
this . _sessions = Object . create ( null ) ;
79
80
this . _transactionOpLinks = Object . create ( null ) ;
80
81
} ;
@@ -83,12 +84,12 @@ ShareDbMongo.prototype = Object.create(DB.prototype);
83
84
84
85
ShareDbMongo . prototype . projectsSnapshots = true ;
85
86
86
- ShareDbMongo . prototype . getCollection = function ( collectionName , callback ) {
87
+ ShareDbMongo . prototype . getCollection = function ( collectionName , options , callback ) {
87
88
// Check the collection name
88
89
var err = this . validateCollectionName ( collectionName ) ;
89
90
if ( err ) return callback ( err ) ;
90
91
// Gotcha: calls back sync if connected or async if not
91
- this . getDbs ( function ( err , mongo ) {
92
+ this . getDbs ( options , function ( err , mongo ) {
92
93
if ( err ) return callback ( err ) ;
93
94
var collection = mongo . collection ( collectionName ) ;
94
95
return callback ( null , collection ) ;
@@ -100,7 +101,7 @@ ShareDbMongo.prototype._getCollectionPoll = function(collectionName, callback) {
100
101
var err = this . validateCollectionName ( collectionName ) ;
101
102
if ( err ) return callback ( err ) ;
102
103
// Gotcha: calls back sync if connected or async if not
103
- this . getDbs ( function ( err , mongo , mongoPoll ) {
104
+ this . getDbs ( { } , function ( err , mongo , mongoPoll ) {
104
105
if ( err ) return callback ( err ) ;
105
106
var collection = ( mongoPoll || mongo ) . collection ( collectionName ) ;
106
107
return callback ( null , collection ) ;
@@ -118,7 +119,15 @@ ShareDbMongo.prototype.getCollectionPoll = function(collectionName, callback) {
118
119
this . _getCollectionPoll ( collectionName , callback ) ;
119
120
} ;
120
121
121
- ShareDbMongo . prototype . getDbs = function ( callback ) {
122
+ ShareDbMongo . prototype . getDbs = function ( options , callback ) {
123
+ if ( typeof options === 'function' ) {
124
+ callback = options ;
125
+ options = null ;
126
+ }
127
+ options = options || { } ;
128
+
129
+ if ( options . transaction ) return this . _getTransactionDbs ( options . transaction , callback ) ;
130
+
122
131
if ( this . closed ) {
123
132
var err = ShareDbMongo . alreadyClosedError ( ) ;
124
133
return callback ( err ) ;
@@ -129,6 +138,23 @@ ShareDbMongo.prototype.getDbs = function(callback) {
129
138
} , callback ) ;
130
139
} ;
131
140
141
+ ShareDbMongo . prototype . _getTransactionDbs = function ( transactionId , callback ) {
142
+ return this . _getTransactionClient ( transactionId )
143
+ . then ( function ( client ) {
144
+ callback ( null , client . db ( ) ) ;
145
+ } , callback ) ;
146
+ } ;
147
+
148
+ ShareDbMongo . prototype . _getTransactionClient = function ( transactionId ) {
149
+ if ( ! this . _transactionConnections [ transactionId ] ) {
150
+ // TODO: this bypasses the creation function, which probably isn't desirable
151
+ var client = new mongodb . MongoClient ( this . _mongoClient . s . url ) ;
152
+ this . _transactionConnections [ transactionId ] = client . connect ( ) ;
153
+ }
154
+
155
+ return this . _transactionConnections [ transactionId ] ;
156
+ } ;
157
+
132
158
ShareDbMongo . prototype . _connect = function ( mongo , options ) {
133
159
// Create the mongo connection client connections if needed
134
160
//
@@ -187,7 +213,7 @@ ShareDbMongo.prototype.close = function(callback) {
187
213
} ;
188
214
}
189
215
var self = this ;
190
- this . getDbs ( function ( err ) {
216
+ this . getDbs ( { } , function ( err ) {
191
217
// Ignore "already closed"
192
218
if ( err && err . code === 5101 ) return callback ( ) ;
193
219
if ( err ) return callback ( err ) ;
@@ -206,14 +232,18 @@ ShareDbMongo.prototype.close = function(callback) {
206
232
// **** Commit methods
207
233
208
234
ShareDbMongo . prototype . commit = function ( collectionName , id , op , snapshot , options , callback ) {
235
+ console . log ( 'commit op' , op , snapshot ) ;
209
236
options = options || { } ;
210
237
var self = this ;
211
238
var request = createRequestForMiddleware ( options , collectionName , op ) ;
212
239
// TODO: Wrap callback to abort transaction on error and tidy up session in .finally()
213
240
this . _writeOp ( collectionName , id , op , snapshot , options , function ( err , result ) {
241
+ console . log ( 'op written' , err , op ) ;
214
242
if ( err ) return callback ( err ) ;
215
243
var opId = result . insertedId ;
244
+ console . log ( 'write snapshot...' , snapshot ) ;
216
245
self . _writeSnapshot ( request , id , snapshot , opId , options , function ( err , succeeded ) {
246
+ console . log ( 'snapshot written' , err , succeeded ) ;
217
247
if ( succeeded ) return callback ( err , succeeded ) ;
218
248
// Cleanup unsuccessful op if snapshot write failed. This is not
219
249
// necessary for data correctness, but it gets rid of clutter
@@ -226,27 +256,30 @@ ShareDbMongo.prototype.commit = function(collectionName, id, op, snapshot, optio
226
256
227
257
ShareDbMongo . prototype . commitTransaction = function ( transactionId , callback ) {
228
258
if ( ! this . _sessions [ transactionId ] ) return callback ( ) ;
229
- this . _sessions [ transactionId ] . commitTransaction ( )
230
- . then ( function ( ) {
231
- callback ( ) ;
232
- } , callback ) ;
259
+ this . _sessions [ transactionId ] . then ( function ( session ) {
260
+ return session . commitTransaction ( ) ;
261
+ } , callback ) ;
233
262
} ;
234
263
235
264
ShareDbMongo . prototype . abortTransaction = function ( transactionId , callback ) {
236
265
if ( ! this . _sessions [ transactionId ] ) return callback ( ) ;
237
- this . _sessions [ transactionId ] . abortTransaction ( )
238
- . then ( function ( ) {
239
- callback ( ) ;
240
- } , callback ) ;
266
+ this . _sessions [ transactionId ] . then ( function ( session ) {
267
+ return session . abortTransaction ( ) ;
268
+ } , callback ) ;
241
269
} ;
242
270
243
271
ShareDbMongo . prototype . _ensureTransactionStarted = function ( transactionId ) {
244
- if ( typeof transactionId !== 'string' ) return null ;
272
+ if ( typeof transactionId !== 'string' ) return Promise . resolve ( ) ;
273
+
245
274
if ( ! this . _sessions [ transactionId ] ) {
246
- this . _sessions [ transactionId ] = this . _mongoClient . startSession ( ) ;
247
- this . _sessions [ transactionId ] . startTransaction ( ) ;
248
275
this . _transactionOpLinks [ transactionId ] = [ ] ;
276
+ this . _sessions [ transactionId ] = this . _getTransactionClient ( transactionId ) . then ( function ( client ) {
277
+ var session = client . startSession ( ) ;
278
+ session . startTransaction ( ) ;
279
+ return session ;
280
+ } ) ;
249
281
}
282
+
250
283
return this . _sessions [ transactionId ] ;
251
284
} ;
252
285
@@ -273,25 +306,27 @@ ShareDbMongo.prototype._writeOp = function(collectionName, id, op, snapshot, opt
273
306
return callback ( err ) ;
274
307
}
275
308
var self = this ;
276
- this . getOpCollection ( collectionName , function ( err , opCollection ) {
309
+ this . getOpCollection ( collectionName , options , function ( err , opCollection ) {
277
310
if ( err ) return callback ( err ) ;
278
311
var doc = shallowClone ( op ) ;
279
312
doc . d = id ;
280
313
var transactionLinks = self . _transactionOpLinks [ options . transaction ] || [ ] ;
281
314
doc . o = transactionLinks [ op . v - 1 ] || snapshot . _opLink ;
282
- var session = self . _ensureTransactionStarted ( options . transaction ) ;
283
- opCollection . insertOne ( doc , { session : session } )
284
- . then ( function ( result ) {
285
- if ( options . transaction ) {
286
- self . _transactionOpLinks [ options . transaction ] [ op . v ] = result . insertedId ;
287
- }
288
- callback ( null , result ) ;
289
- } , callback ) ;
315
+ // TODO: Rewrite as async/await (supported since Node.js v7.6.0)
316
+ self . _ensureTransactionStarted ( options . transaction ) . then ( function ( session ) {
317
+ opCollection . insertOne ( doc , { session : session } )
318
+ . then ( function ( result ) {
319
+ if ( options . transaction ) {
320
+ self . _transactionOpLinks [ options . transaction ] [ op . v ] = result . insertedId ;
321
+ }
322
+ callback ( null , result ) ;
323
+ } , callback ) ;
324
+ } , callback ) ;
290
325
} ) ;
291
326
} ;
292
327
293
328
ShareDbMongo . prototype . _deleteOp = function ( collectionName , opId , callback ) {
294
- this . getOpCollection ( collectionName , function ( err , opCollection ) {
329
+ this . getOpCollection ( collectionName , { } , function ( err , opCollection ) {
295
330
if ( err ) return callback ( err ) ;
296
331
opCollection . deleteOne ( { _id : opId } )
297
332
. then ( function ( result ) {
@@ -302,43 +337,47 @@ ShareDbMongo.prototype._deleteOp = function(collectionName, opId, callback) {
302
337
303
338
ShareDbMongo . prototype . _writeSnapshot = function ( request , id , snapshot , opId , options , callback ) {
304
339
var self = this ;
305
- this . getCollection ( request . collectionName , function ( err , collection ) {
340
+ this . getCollection ( request . collectionName , options , function ( err , collection ) {
306
341
if ( err ) return callback ( err ) ;
307
342
request . documentToWrite = castToDoc ( id , snapshot , opId ) ;
308
- var session = self . _ensureTransactionStarted ( options . transaction ) ;
309
- if ( request . documentToWrite . _v === 1 ) {
310
- self . _middleware . trigger ( MiddlewareHandler . Actions . beforeCreate , request , function ( middlewareErr ) {
311
- if ( middlewareErr ) {
312
- return callback ( middlewareErr ) ;
313
- }
314
- collection . insertOne ( request . documentToWrite , { session : session } )
315
- . then (
316
- function ( ) {
317
- callback ( null , true ) ;
318
- } ,
319
- function ( err ) {
343
+ self . _ensureTransactionStarted ( options . transaction ) . then ( function ( session ) {
344
+ console . log ( 'session?' , ! ! session ) ;
345
+ if ( request . documentToWrite . _v === 1 ) {
346
+ self . _middleware . trigger ( MiddlewareHandler . Actions . beforeCreate , request , function ( middlewareErr ) {
347
+ if ( middlewareErr ) {
348
+ return callback ( middlewareErr ) ;
349
+ }
350
+ collection . insertOne ( request . documentToWrite , { session : session } )
351
+ . then (
352
+ function ( ) {
353
+ callback ( null , true ) ;
354
+ } ,
355
+ function ( err ) {
320
356
// Return non-success instead of duplicate key error, since this is
321
357
// expected to occur during simultaneous creates on the same id
322
- if ( err . code === 11000 && / \b _ i d _ \b / . test ( err . message ) ) {
323
- return callback ( null , false ) ;
358
+ if ( err . code === 11000 && / \b _ i d _ \b / . test ( err . message ) ) {
359
+ return callback ( null , false ) ;
360
+ }
361
+ return callback ( err ) ;
324
362
}
325
- return callback ( err ) ;
326
- }
327
- ) ;
328
- } ) ;
329
- } else {
330
- request . query = { _id : id , _v : request . documentToWrite . _v - 1 } ;
331
- self . _middleware . trigger ( MiddlewareHandler . Actions . beforeOverwrite , request , function ( middlewareErr ) {
332
- if ( middlewareErr ) {
333
- return callback ( middlewareErr ) ;
334
- }
335
- collection . replaceOne ( request . query , request . documentToWrite , { session : session } )
336
- . then ( function ( result ) {
337
- var succeeded = ! ! result . modifiedCount ;
338
- callback ( null , succeeded ) ;
339
- } , callback ) ;
340
- } ) ;
341
- }
363
+ ) ;
364
+ } ) ;
365
+ } else {
366
+ request . query = { _id : id , _v : request . documentToWrite . _v - 1 } ;
367
+ console . log ( 'query' , request . query ) ;
368
+ console . log ( 'to write' , request . documentToWrite ) ;
369
+ self . _middleware . trigger ( MiddlewareHandler . Actions . beforeOverwrite , request , function ( middlewareErr ) {
370
+ if ( middlewareErr ) {
371
+ return callback ( middlewareErr ) ;
372
+ }
373
+ collection . replaceOne ( request . query , request . documentToWrite , { session : session } )
374
+ . then ( function ( result ) {
375
+ var succeeded = ! ! result . modifiedCount ;
376
+ callback ( null , succeeded ) ;
377
+ } , callback ) ;
378
+ } ) ;
379
+ }
380
+ } , callback ) ;
342
381
} ) ;
343
382
} ;
344
383
@@ -347,7 +386,7 @@ ShareDbMongo.prototype._writeSnapshot = function(request, id, snapshot, opId, op
347
386
348
387
ShareDbMongo . prototype . getSnapshot = function ( collectionName , id , fields , options , callback ) {
349
388
var self = this ;
350
- this . getCollection ( collectionName , function ( err , collection ) {
389
+ this . getCollection ( collectionName , options , function ( err , collection ) {
351
390
if ( err ) return callback ( err ) ;
352
391
var query = { _id : id } ;
353
392
var projection = getProjection ( fields , options ) ;
@@ -367,7 +406,7 @@ ShareDbMongo.prototype.getSnapshot = function(collectionName, id, fields, option
367
406
368
407
ShareDbMongo . prototype . getSnapshotBulk = function ( collectionName , ids , fields , options , callback ) {
369
408
var self = this ;
370
- this . getCollection ( collectionName , function ( err , collection ) {
409
+ this . getCollection ( collectionName , options , function ( err , collection ) {
371
410
if ( err ) return callback ( err ) ;
372
411
var query = { _id : { $in : ids } } ;
373
412
var projection = getProjection ( fields , options ) ;
@@ -414,9 +453,9 @@ ShareDbMongo.prototype.validateCollectionName = function(collectionName) {
414
453
} ;
415
454
416
455
// Get and return the op collection from mongo, ensuring it has the op index.
417
- ShareDbMongo . prototype . getOpCollection = function ( collectionName , callback ) {
456
+ ShareDbMongo . prototype . getOpCollection = function ( collectionName , options , callback ) {
418
457
var self = this ;
419
- this . getDbs ( function ( err , mongo ) {
458
+ this . getDbs ( options , function ( err , mongo ) {
420
459
if ( err ) return callback ( err ) ;
421
460
var name = self . getOplogCollectionName ( collectionName ) ;
422
461
var collection = mongo . collection ( name ) ;
@@ -548,7 +587,7 @@ ShareDbMongo.prototype.getOpsBulk = function(collectionName, fromMap, toMap, opt
548
587
549
588
ShareDbMongo . prototype . getCommittedOpVersion = function ( collectionName , id , snapshot , op , options , callback ) {
550
589
var self = this ;
551
- this . getOpCollection ( collectionName , function ( err , opCollection ) {
590
+ this . getOpCollection ( collectionName , { } , function ( err , opCollection ) {
552
591
if ( err ) return callback ( err ) ;
553
592
var query = {
554
593
src : op . src ,
@@ -685,7 +724,7 @@ function getOpsQuery(id, from, to) {
685
724
}
686
725
687
726
ShareDbMongo . prototype . _getOps = function ( collectionName , id , from , to , options , callback ) {
688
- this . getOpCollection ( collectionName , function ( err , opCollection ) {
727
+ this . getOpCollection ( collectionName , options , function ( err , opCollection ) {
689
728
if ( err ) return callback ( err ) ;
690
729
var query = getOpsQuery ( id , from , to ) ;
691
730
// Exclude the `d` field, which is only for use internal to livedb-mongo.
@@ -701,7 +740,7 @@ ShareDbMongo.prototype._getOps = function(collectionName, id, from, to, options,
701
740
} ;
702
741
703
742
ShareDbMongo . prototype . _getOpsBulk = function ( collectionName , conditions , options , callback ) {
704
- this . getOpCollection ( collectionName , function ( err , opCollection ) {
743
+ this . getOpCollection ( collectionName , options , function ( err , opCollection ) {
705
744
if ( err ) return callback ( err ) ;
706
745
var query = { $or : conditions } ;
707
746
// Exclude the `m` field, which can be used to store metadata on ops for
@@ -745,7 +784,7 @@ ShareDbMongo.prototype._getOpLink = function(collectionName, id, to, options, ca
745
784
if ( ! this . getOpsWithoutStrictLinking ) return this . _getSnapshotOpLink ( collectionName , id , options , callback ) ;
746
785
747
786
var db = this ;
748
- this . getOpCollection ( collectionName , function ( error , collection ) {
787
+ this . getOpCollection ( collectionName , options , function ( error , collection ) {
749
788
if ( error ) return callback ( error ) ;
750
789
751
790
// If to is null, we want the most recent version, so just return the
@@ -816,7 +855,7 @@ function closeCursor(cursor, callback, error, returnValue) {
816
855
817
856
ShareDbMongo . prototype . _getSnapshotOpLink = function ( collectionName , id , options , callback ) {
818
857
var self = this ;
819
- this . getCollection ( collectionName , function ( err , collection ) {
858
+ this . getCollection ( collectionName , options , function ( err , collection ) {
820
859
if ( err ) return callback ( err ) ;
821
860
var query = { _id : id } ;
822
861
var projection = { _id : 0 , _o : 1 , _v : 1 } ;
@@ -835,7 +874,7 @@ ShareDbMongo.prototype._getSnapshotOpLink = function(collectionName, id, options
835
874
836
875
ShareDbMongo . prototype . _getSnapshotOpLinkBulk = function ( collectionName , ids , options , callback ) {
837
876
var self = this ;
838
- this . getCollection ( collectionName , function ( err , collection ) {
877
+ this . getCollection ( collectionName , options , function ( err , collection ) {
839
878
if ( err ) return callback ( err ) ;
840
879
var query = { _id : { $in : ids } } ;
841
880
var projection = { _o : 1 , _v : 1 } ;
@@ -917,7 +956,7 @@ ShareDbMongo.prototype._query = function(collection, inputQuery, projection, cal
917
956
918
957
ShareDbMongo . prototype . query = function ( collectionName , inputQuery , fields , options , callback ) {
919
958
var self = this ;
920
- this . getCollection ( collectionName , function ( err , collection ) {
959
+ this . getCollection ( collectionName , options , function ( err , collection ) {
921
960
if ( err ) return callback ( err ) ;
922
961
var projection = getProjection ( fields , options ) ;
923
962
self . _query ( collection , inputQuery , projection , function ( err , results , extra ) {
0 commit comments