Skip to content

Commit c3dd731

Browse files
author
Burcu Dogan
committed
Merge pull request #118 from stephenplusplus/storage-stream
storage: use streams.
2 parents 96778dc + 61dc024 commit c3dd731

File tree

4 files changed

+175
-219
lines changed

4 files changed

+175
-219
lines changed

lib/storage/index.js

+80-131
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,7 @@
2020

2121
'use strict';
2222

23-
var events = require('events');
24-
var fs = require('fs');
23+
var duplexify = require('duplexify');
2524
var nodeutil = require('util');
2625
var stream = require('stream');
2726
var uuid = require('node-uuid');
@@ -72,85 +71,6 @@ BufferStream.prototype._read = function() {
7271
this.push(null);
7372
};
7473

75-
/**
76-
* A readable stream that streams the contents of a file.
77-
*
78-
* @private
79-
*
80-
* @constructor
81-
* @mixes {stream#Readable}
82-
*
83-
* @param {module:storage~Bucket} bucket - Bucket the source file belongs to.
84-
* @param {string} name - Name of the file to read from.
85-
*
86-
* @example
87-
* ```js
88-
* var myBucket = new Bucket({
89-
* bucketName: 'my-bucket'
90-
* });
91-
* var readStream = new ReadStream(myBucket, 'file/to/fetch.pdf');
92-
* ```
93-
*/
94-
function ReadStream(bucket, name) {
95-
events.EventEmitter.call(this);
96-
97-
this.bucket = bucket;
98-
this.name = name;
99-
this.remoteStream = null;
100-
101-
this.open();
102-
}
103-
104-
nodeutil.inherits(ReadStream, events.EventEmitter);
105-
106-
/**
107-
* Open a connection to retrieve a file.
108-
*/
109-
ReadStream.prototype.open = function() {
110-
var that = this;
111-
this.bucket.stat(this.name, function(err, metadata) {
112-
if (err) {
113-
that.emit('error', err);
114-
return;
115-
}
116-
that.bucket.conn.createAuthorizedReq(
117-
{ uri: metadata.mediaLink }, function(err, req) {
118-
if (err) {
119-
that.emit('error', err);
120-
return;
121-
}
122-
that.remoteStream = that.bucket.conn.requester(req);
123-
that.remoteStream.on('complete', that.emit.bind(that, 'complete'));
124-
that.emit('readable');
125-
});
126-
});
127-
};
128-
129-
/**
130-
* Pipe the output to the destination stream with the provided options.
131-
*
132-
* @param {stream} dest - Destination stream to write to.
133-
* @param {object} opts - Piping options.
134-
* @return {stream}
135-
*/
136-
ReadStream.prototype.pipe = function(dest, opts) {
137-
var that = this;
138-
if (!that.remoteStream) {
139-
return that.once('readable', function() {
140-
that.pipe(dest, opts);
141-
});
142-
}
143-
// Register an on-data listener instead of piping, so we can avoid writing if
144-
// the request ends up with a non-200 response.
145-
that.remoteStream.on('data', function(data) {
146-
if (!that.errored) {
147-
that.emit('data', data);
148-
dest.write(data);
149-
}
150-
});
151-
return dest;
152-
};
153-
15474
/**
15575
* Google Cloud Storage allows you to store data on Google infrastructure. See
15676
* the guide on {@link https://developers.google.com/storage} to create a
@@ -312,87 +232,117 @@ Bucket.prototype.remove = function(name, callback) {
312232

313233
/**
314234
* Create a readable stream to read contents of the provided remote file. It
315-
* can be piped to a write stream, or listened to for 'data' and `complete`
316-
* events to read a file's contents.
235+
* can be piped to a write stream, or listened to for 'data' events to read a
236+
* file's contents.
317237
*
318238
* @param {string} name - Name of the remote file.
319239
* @return {ReadStream}
320240
*
321241
* @example
322242
* ```js
323243
* // Create a readable stream and write the file contents to "/path/to/file"
324-
* bucket.createReadStream('filename')
325-
* .pipe(fs.createWriteStream('/path/to/file'));
244+
* var fs = require('fs');
245+
*
246+
* bucket.createReadStream('remote-file-name')
247+
* .pipe(fs.createWriteStream('local-file-path'))
248+
* .on('error', function(err) {});
326249
* ```
327250
*/
328251
Bucket.prototype.createReadStream = function(name) {
329-
return new ReadStream(this, name);
252+
var that = this;
253+
var dup = duplexify();
254+
this.stat(name, function(err, metadata) {
255+
if (err) {
256+
dup.emit('error', err);
257+
return;
258+
}
259+
that.conn.createAuthorizedReq(
260+
{ uri: metadata.mediaLink }, function(err, req) {
261+
if (err) {
262+
dup.emit('error', err);
263+
return;
264+
}
265+
dup.setReadable(that.conn.requester(req));
266+
});
267+
});
268+
return dup;
330269
};
331270

332271
/**
333-
* Write the provided data to the destination with optional metadata.
272+
* Create a Duplex to handle the upload of a file.
334273
*
335-
* @param {string} name - Name of the remote file.
336-
* @param {object} options - Configuration object.
337-
* @param {String|Buffer|ReadableStream=} options.data - Data.
338-
* @param {string=} options.filename - Path of the source file.
339-
* @param {object=} options.metadata - Optional metadata.
340-
* @param {function} callback - The callback function.
274+
* @param {string} name - Name of the remote file to create.
275+
* @param {object=} metadata - Optional metadata.
276+
* @return {stream}
341277
*
342278
* @example
343279
* ```js
344-
* // Upload file.pdf
345-
* bucket.write('filename', {
346-
* filename: '/path/to/file.pdf',
347-
* metadata: {
348-
* // optional metadata
349-
* }
350-
* }, function(err) {});
280+
* // Read from a local file and pipe to your bucket.
281+
* var fs = require('fs');
351282
*
352-
* // Upload a readable stream
353-
* bucket.write('filename', {
354-
* data: fs.createReadStream('/path/to/file.pdf')
355-
* }, function(err) {});
283+
* fs.createReadStream('local-file-path')
284+
* .pipe(bucket.createWriteStream('remote-file-name'))
285+
* .on('error', function(err) {})
286+
* .on('complete', function(fileObject) {});
287+
* ```
288+
*/
289+
Bucket.prototype.createWriteStream = function(name, metadata) {
290+
var dup = duplexify();
291+
this.getWritableStream_(name, (metadata || {}), function(err, writable) {
292+
if (err) {
293+
dup.emit('error', err);
294+
return;
295+
}
296+
writable.on('complete', function(res) {
297+
util.handleResp(null, res, res.body, function(err, data) {
298+
if (err) {
299+
dup.emit('error', err);
300+
return;
301+
}
302+
dup.emit('complete', data);
303+
});
304+
});
305+
dup.setWritable(writable);
306+
dup.pipe(writable);
307+
});
308+
return dup;
309+
};
310+
311+
/**
312+
* Write the provided data to the destination with optional metadata.
313+
*
314+
* @param {string} name - Name of the remote file toc reate.
315+
* @param {object|string|buffer} options - Configuration object or data.
316+
* @param {object=} options.metadata - Optional metadata.
317+
* @param {function=} callback - The callback function.
356318
*
357-
* // Upload "Hello World" as file contents. `data` can be any string or buffer
319+
* @example
320+
* ```js
321+
* // Upload "Hello World" as file contents. `data` can be any string or buffer.
358322
* bucket.write('filename', {
359323
* data: 'Hello World'
360324
* }, function(err) {});
325+
*
326+
* // A shorthand for the above.
327+
* bucket.write('filename', 'Hello World', function(err) {});
361328
* ```
362329
*/
363330
Bucket.prototype.write = function(name, options, callback) {
364331
callback = callback || util.noop;
332+
var data = typeof options === 'object' ? options.data : options;
365333
var metadata = options.metadata || {};
366-
var readStream = options.data;
367-
var isStringOrBuffer =
368-
typeof readStream === 'string' || readStream instanceof Buffer;
369334

370-
if (options.filename) {
371-
readStream = fs.createReadStream(options.filename);
372-
} else if (readStream && isStringOrBuffer) {
373-
readStream = new BufferStream(readStream);
374-
}
375-
376-
if (!readStream) {
335+
if (typeof data === 'undefined') {
377336
// metadata only write
378337
this.makeReq('PATCH', 'o/' + name, null, metadata, callback);
379338
return;
380339
}
381340

382-
this.getRemoteStream_(name, metadata, function(err, remoteStream) {
383-
if (err) {
384-
callback(err);
385-
return;
386-
}
387-
// TODO(jbd): High potential of multiple callback invokes.
388-
readStream.pipe(remoteStream)
389-
.on('error', callback);
390-
remoteStream
341+
if (typeof data === 'string' || data instanceof Buffer) {
342+
new BufferStream(data).pipe(this.createWriteStream(name, metadata))
391343
.on('error', callback)
392-
.on('complete', function(resp) {
393-
util.handleResp(null, resp, resp.body, callback);
394-
});
395-
});
344+
.on('complete', callback.bind(null, null));
345+
}
396346
};
397347

398348
/**
@@ -402,7 +352,7 @@ Bucket.prototype.write = function(name, options, callback) {
402352
* @param {object} metadata - File descriptive metadata.
403353
* @param {function} callback - The callback function.
404354
*/
405-
Bucket.prototype.getRemoteStream_ = function(name, metadata, callback) {
355+
Bucket.prototype.getWritableStream_ = function(name, metadata, callback) {
406356
var boundary = uuid.v4();
407357
var that = this;
408358
metadata.contentType = metadata.contentType || 'text/plain';
@@ -434,8 +384,7 @@ Bucket.prototype.getRemoteStream_ = function(name, metadata, callback) {
434384
remoteStream.write('Content-Type: ' + metadata.contentType + '\n\n');
435385
var oldEndFn = remoteStream.end;
436386
remoteStream.end = function(data, encoding, callback) {
437-
data = data || '';
438-
data += '\n--' + boundary + '--\n';
387+
data = (data || '') + '\n--' + boundary + '--\n';
439388
remoteStream.write(data, encoding, callback);
440389
oldEndFn.apply(this);
441390
};

package.json

+2-1
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
"dependencies": {
3737
"async": "^0.9.0",
3838
"bytebuffer": "^3.2.0",
39+
"duplexify": "^3.1.2",
3940
"gapitoken": "^0.1.3",
4041
"node-uuid": "^1.4.1",
4142
"protobufjs": "^3.4.0",
@@ -51,7 +52,7 @@
5152
"scripts": {
5253
"lint": "jshint lib/ regression/ test/",
5354
"test": "mocha --recursive --reporter spec",
54-
"regression-test": "mocha regression/ --reporter spec --timeout 10000",
55+
"regression-test": "mocha regression/ --reporter spec --timeout 15000",
5556
"cover": "istanbul cover -x 'regression/*' _mocha -- --timeout 10000 test/* regression/*"
5657
},
5758
"license": "Apache 2"

regression/data/five-mb-file.zip

5 MB
Binary file not shown.

0 commit comments

Comments
 (0)