Skip to content

storage: createReadStream: accept start/end offsets #327

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 40 additions & 2 deletions lib/storage/file.js
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,14 @@ File.prototype.copy = function(destination, callback) {
* hash wasn't returned from the API. CRC32c will provide better performance
* with less reliability. You may also choose to skip validation completely,
* however this is **not recommended**.
* @param {number} options.start - A byte offset to begin the file's download
* from. NOTE: Byte ranges are inclusive; that is, `options.start = 0` and
* `options.end = 999` represent the first 1000 bytes in a file or object.
* NOTE: when specifying a byte range, data integrity is not available.
* @param {number} options.end - A byte offset to stop reading the file at.

This comment was marked as spam.

This comment was marked as spam.

* NOTE: Byte ranges are inclusive; that is, `options.start = 0` and
* `options.end = 999` represent the first 1000 bytes in a file or object.
* NOTE: when specifying a byte range, data integrity is not available.
*
* @example
* //-
Expand All @@ -238,12 +246,25 @@ File.prototype.copy = function(destination, callback) {
* image.createReadStream()
* .pipe(fs.createWriteStream('/Users/stephen/Photos/image.png'))
* .on('error', function(err) {});
*
* //-
* // To limit the downloaded data to only a byte range, pass an options object.
* //-
* var logFile = myBucket.file('access_log');
* logFile.createReadStream({
* start: 10000,
* end: 20000
* })
* .pipe(fs.createWriteStream('/Users/stephen/logfile.txt'))
* .on('error', function(err) {});
*/
File.prototype.createReadStream = function(options) {
options = options || {};

var that = this;
var throughStream = through();
var rangeRequest =
util.is(options.start, 'number') || util.is(options.end, 'number');
var throughStream = streamEvents(through());

var validations = ['crc32c', 'md5'];
var validation;
Expand All @@ -262,6 +283,10 @@ File.prototype.createReadStream = function(options) {
validation = 'all';
}

if (rangeRequest) {
validation = false;
}

var crc32c = validation === 'crc32c' || validation === 'all';
var md5 = validation === 'md5' || validation === 'all';

Expand All @@ -288,6 +313,12 @@ File.prototype.createReadStream = function(options) {
uri: uri
};

if (rangeRequest) {
reqOpts.headers = {
Range: 'bytes=' + [options.start || '', options.end || ''].join('-')
};
}

that.bucket.storage.makeAuthorizedRequest_(reqOpts, {
onAuthorized: function(err, authorizedReqOpts) {
if (err) {
Expand Down Expand Up @@ -318,6 +349,13 @@ File.prototype.createReadStream = function(options) {
})

.on('complete', function(res) {
if (rangeRequest) {
// Range requests can't receive data integrity checks.
throughStream.emit('complete', res);
throughStream.end();
return;
}

var failed = false;
var crcFail = true;
var md5Fail = true;
Expand Down Expand Up @@ -356,7 +394,7 @@ File.prototype.createReadStream = function(options) {

throughStream.emit('error', error);
} else {
throughStream.emit('complete');
throughStream.emit('complete', res);

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.

}

throughStream.end();
Expand Down
24 changes: 24 additions & 0 deletions regression/storage.js
Original file line number Diff line number Diff line change
Expand Up @@ -336,6 +336,30 @@ describe('storage', function() {
});
});

it('should read a byte range from a file', function(done) {
bucket.upload(files.big.path, function(err, file) {
assert.ifError(err);

var fileSize = file.metadata.size;
var byteRange = {
start: Math.floor(fileSize * 1/3),
end: Math.floor(fileSize * 2/3)
};
var expectedContentSize = byteRange.start + 1;

var sizeStreamed = 0;
file.createReadStream(byteRange)
.on('data', function (chunk) {
sizeStreamed += chunk.length;
})
.on('error', done)
.on('complete', function() {
assert.equal(sizeStreamed, expectedContentSize);
file.delete(done);
});
});
});

describe('stream write', function() {
it('should stream write, then remove file (3mb)', function(done) {
var file = bucket.file('LargeFile');
Expand Down
77 changes: 65 additions & 12 deletions test/storage/file.js
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ nodeutil.inherits(FakeDuplexify, stream.Duplex);
var makeWritableStream_Override;
var fakeUtil = extend({}, util, {
makeWritableStream: function() {
var args = [].slice.call(arguments);
var args = util.toArray(arguments);
(makeWritableStream_Override || util.makeWritableStream).apply(null, args);
}
});
Expand All @@ -62,7 +62,7 @@ var request_Cached = request;
var request_Override;

function fakeRequest() {
var args = [].slice.apply(arguments);
var args = util.toArray(arguments);
var results = (request_Override || request_Cached).apply(null, args);
return results;
}
Expand All @@ -85,14 +85,9 @@ function FakeConfigStore() {
describe('File', function() {
var File;
var FILE_NAME = 'file-name.png';
var options = {
makeAuthorizedRequest_: function(req, callback) {
(callback.onAuthorized || callback)(null, req);
}
};
var bucket = new Bucket(options, 'bucket-name');
var file;
var directoryFile;
var bucket;

before(function() {
mockery.registerMock('configstore', FakeConfigStore);
Expand All @@ -112,14 +107,21 @@ describe('File', function() {
});

beforeEach(function() {
makeWritableStream_Override = null;
request_Override = null;
var options = {
makeAuthorizedRequest_: function(req, callback) {
(callback.onAuthorized || callback)(null, req);
}
};
bucket = new Bucket(options, 'bucket-name');

This comment was marked as spam.


file = new File(bucket, FILE_NAME);
file.makeReq_ = util.noop;

directoryFile = new File(bucket, 'directory/file.jpg');
directoryFile.makeReq_ = util.noop;

makeWritableStream_Override = null;
request_Override = null;

This comment was marked as spam.

});

describe('initialization', function() {
Expand Down Expand Up @@ -387,7 +389,9 @@ describe('File', function() {

file.createReadStream({ validation: 'crc32c' })
.on('error', done)
.on('complete', done);
.on('complete', function () {
done();
});
});

it('should emit an error if crc32c validation fails', function(done) {
Expand All @@ -405,7 +409,9 @@ describe('File', function() {

file.createReadStream({ validation: 'md5' })
.on('error', done)
.on('complete', done);
.on('complete', function () {
done();
});
});

it('should emit an error if md5 validation fails', function(done) {
Expand All @@ -430,6 +436,53 @@ describe('File', function() {
});
});
});

it('should accept a start range', function(done) {
var startOffset = 100;

request_Override = function(opts) {
setImmediate(function () {
assert.equal(opts.headers.Range, 'bytes=' + startOffset + '-');
done();
});
return duplexify();
};

file.metadata = metadata;
file.createReadStream({ start: startOffset });
});

it('should accept an end range', function(done) {
var endOffset = 100;

request_Override = function(opts) {
setImmediate(function () {
assert.equal(opts.headers.Range, 'bytes=-' + endOffset);
done();
});
return duplexify();
};

file.metadata = metadata;
file.createReadStream({ end: endOffset });
});

it('should accept both a start and end range', function(done) {
var startOffset = 100;
var endOffset = 101;

request_Override = function(opts) {
setImmediate(function () {
var expectedRange = 'bytes=' + startOffset + '-' + endOffset;
assert.equal(opts.headers.Range, expectedRange);
done();
});
return duplexify();
};

file.metadata = metadata;
file.createReadStream({ start: startOffset, end: endOffset });
});
});

describe('createWriteStream', function() {
Expand Down