Skip to content

Commit d83d354

Browse files
committed
Enforce order of operations for synchronous requests
1 parent f644c06 commit d83d354

File tree

2 files changed

+202
-20
lines changed

2 files changed

+202
-20
lines changed

packages/grpc-native-core/src/client_interceptors.js

+112-20
Original file line numberDiff line numberDiff line change
@@ -652,16 +652,22 @@ EndListener.prototype.onReceiveMessage = function(){};
652652
EndListener.prototype.onReceiveStatus = function(){};
653653
EndListener.prototype.recvMessageWithContext = function(){};
654654

655+
var OP_DEPENDENCIES = {
656+
[grpc.opType.SEND_MESSAGE]: [grpc.opType.SEND_INITIAL_METADATA],
657+
[grpc.opType.SEND_CLOSE_FROM_CLIENT]: [grpc.opType.SEND_MESSAGE],
658+
[grpc.opType.RECV_MESSAGE]: [grpc.opType.SEND_INITIAL_METADATA]
659+
};
660+
655661
/**
656662
* Produces a callback triggered by streaming response messages.
657663
* @private
658664
* @param {EventEmitter} emitter
659665
* @param {grpc.internal~Call} call
660-
* @param {grpc~Listener} listener
666+
* @param {function} get_listener Returns a grpc~Listener.
661667
* @param {grpc~deserialize} deserialize
662668
* @return {Function}
663669
*/
664-
function _getStreamReadCallback(emitter, call, listener, deserialize) {
670+
function _getStreamReadCallback(emitter, call, get_listener, deserialize) {
665671
return function (err, response) {
666672
if (err) {
667673
// Something has gone wrong. Stop reading and wait for status
@@ -684,6 +690,7 @@ function _getStreamReadCallback(emitter, call, listener, deserialize) {
684690
emitter._readsDone();
685691
return;
686692
}
693+
var listener = get_listener();
687694
var context = {
688695
call: call,
689696
listener: listener
@@ -692,6 +699,66 @@ function _getStreamReadCallback(emitter, call, listener, deserialize) {
692699
};
693700
}
694701

702+
/**
703+
* Tests whether a batch can be started.
704+
* @private
705+
* @param {number[]} batch_ops The operations in the batch we are checking.
706+
* @param {number[]} completed_ops Previously completed operations.
707+
* @return {boolean}
708+
*/
709+
function _areBatchRequirementsMet(batch_ops, completed_ops) {
710+
var dependencies = _.flatMap(batch_ops, function(op) {
711+
return OP_DEPENDENCIES[op] || [];
712+
});
713+
var dependencies_met = _.intersection(dependencies,
714+
batch_ops.concat(completed_ops));
715+
return _.isEqual(dependencies_met.sort(), dependencies.sort());
716+
}
717+
718+
/**
719+
* Enforces the order of operations for synchronous requests. If a batch's
720+
* operations cannot be started because required operations have not started
721+
* yet, the batch is deferred until requirements are met.
722+
* @private
723+
* @param {grpc.Client~Call} call
724+
* @param {object} batch
725+
* @param {object} batch_state
726+
* @param {number[]} [batch_state.completed_ops] The ops already sent.
727+
* @param {object} [batch_state.deferred_batches] Batches to be sent after
728+
* their dependencies are fulfilled.
729+
* @param {function} callback
730+
* @return {object}
731+
*/
732+
function _startBatchIfReady(call, batch, batch_state, callback) {
733+
var completed_ops = batch_state.completed_ops;
734+
var deferred_batches = batch_state.deferred_batches;
735+
var batch_ops = _.map(_.keys(batch), Number);
736+
if (_areBatchRequirementsMet(batch_ops, completed_ops)) {
737+
// Dependencies are met, start the batch and any deferred batches whose
738+
// dependencies are met as a result.
739+
call.startBatch(batch, callback);
740+
completed_ops = _.union(completed_ops, batch_ops);
741+
deferred_batches = _.flatMap(deferred_batches, function(deferred_batch) {
742+
var deferred_batch_ops = _.map(_.keys(deferred_batch), Number);
743+
if (_areBatchRequirementsMet(deferred_batch_ops, completed_ops)) {
744+
call.startBatch(deferred_batch.batch, deferred_batch.callback);
745+
return [];
746+
}
747+
return [deferred_batch];
748+
});
749+
} else {
750+
// Dependencies are not met, defer the batch
751+
deferred_batches = deferred_batches.concat({
752+
batch: batch,
753+
callback: callback
754+
});
755+
}
756+
return {
757+
completed_ops: completed_ops,
758+
deferred_batches: deferred_batches
759+
};
760+
}
761+
695762
/**
696763
* Produces an interceptor which will start gRPC batches for unary calls.
697764
* @private
@@ -708,19 +775,25 @@ function _getUnaryInterceptor(method_definition, channel, emitter, callback) {
708775
var call = common.getCall(channel, method_definition.path, options);
709776
var first_listener;
710777
var final_requester = {};
778+
var batch_state = {
779+
completed_ops: [],
780+
deferred_batches: []
781+
};
711782
final_requester.start = function (metadata, listener) {
712783
var batch = {
713784
[grpc.opType.SEND_INITIAL_METADATA]:
714785
metadata._getCoreRepresentation(),
715786
};
716787
first_listener = listener;
717-
call.startBatch(batch, function () { });
788+
batch_state = _startBatchIfReady(call, batch, batch_state,
789+
function() {});
718790
};
719791
final_requester.sendMessage = function (message) {
720792
var batch = {
721793
[grpc.opType.SEND_MESSAGE]: serialize(message),
722794
};
723-
call.startBatch(batch, function () { });
795+
batch_state = _startBatchIfReady(call, batch, batch_state,
796+
function() {});
724797
};
725798
final_requester.halfClose = function () {
726799
var batch = {
@@ -729,7 +802,7 @@ function _getUnaryInterceptor(method_definition, channel, emitter, callback) {
729802
[grpc.opType.RECV_MESSAGE]: true,
730803
[grpc.opType.RECV_STATUS_ON_CLIENT]: true
731804
};
732-
call.startBatch(batch, function (err, response) {
805+
var callback = function (err, response) {
733806
response.status.metadata = Metadata._fromCoreRepresentation(
734807
response.status.metadata);
735808
var status = response.status;
@@ -757,7 +830,8 @@ function _getUnaryInterceptor(method_definition, channel, emitter, callback) {
757830
first_listener.onReceiveMetadata(response.metadata);
758831
first_listener.onReceiveMessage(deserialized);
759832
first_listener.onReceiveStatus(status);
760-
});
833+
};
834+
batch_state = _startBatchIfReady(call, batch, batch_state, callback);
761835
};
762836
final_requester.cancel = function () {
763837
call.cancel();
@@ -895,25 +969,34 @@ function _getServerStreamingInterceptor(method_definition, channel, emitter) {
895969
method_definition.responseDeserialize);
896970
var serialize = method_definition.requestSerialize;
897971
return function (options) {
898-
var first_listener;
972+
var batch_state = {
973+
completed_ops: [],
974+
deferred_batches: []
975+
};
899976
var call = common.getCall(channel, method_definition.path, options);
900977
var final_requester = {};
978+
var first_listener;
979+
var get_listener = function() {
980+
return first_listener;
981+
};
901982
final_requester.start = function(metadata, listener) {
902983
first_listener = listener;
903984
metadata = metadata.clone();
904985
var metadata_batch = {
905986
[grpc.opType.SEND_INITIAL_METADATA]: metadata._getCoreRepresentation(),
906-
[grpc.opType.RECV_INITIAL_METADATA]: true,
987+
[grpc.opType.RECV_INITIAL_METADATA]: true
907988
};
908-
call.startBatch(metadata_batch, function(err, response) {
989+
var callback = function(err, response) {
909990
if (err) {
910991
// The call has stopped for some reason. A non-OK status will arrive
911992
// in the other batch.
912993
return;
913994
}
914995
first_listener.onReceiveMetadata(
915996
Metadata._fromCoreRepresentation(response.metadata));
916-
});
997+
};
998+
batch_state = _startBatchIfReady(call, metadata_batch, batch_state,
999+
callback);
9171000
var status_batch = {
9181001
[grpc.opType.RECV_STATUS_ON_CLIENT]: true
9191002
};
@@ -935,26 +1018,28 @@ function _getServerStreamingInterceptor(method_definition, channel, emitter) {
9351018
var send_batch = {
9361019
[grpc.opType.SEND_MESSAGE]: message
9371020
};
938-
call.startBatch(send_batch, function(err, response) {
1021+
var callback = function(err, response) {
9391022
if (err) {
9401023
// The call has stopped for some reason. A non-OK status will arrive
9411024
// in the other batch.
9421025
return;
9431026
}
944-
});
1027+
};
1028+
batch_state = _startBatchIfReady(call, send_batch, batch_state, callback);
9451029
};
9461030
final_requester.halfClose = function() {
9471031
var batch = {
9481032
[grpc.opType.SEND_CLOSE_FROM_CLIENT]: true
9491033
};
950-
call.startBatch(batch, function() {});
1034+
batch_state = _startBatchIfReady(call, batch, batch_state, function() {});
9511035
};
9521036
final_requester.recvMessageWithContext = function(context) {
9531037
var recv_batch = {
9541038
[grpc.opType.RECV_MESSAGE]: true
9551039
};
956-
call.startBatch(recv_batch, _getStreamReadCallback(emitter, call,
957-
first_listener, deserialize));
1040+
var callback = _getStreamReadCallback(emitter, call,
1041+
get_listener, deserialize);
1042+
batch_state = _startBatchIfReady(call, recv_batch, batch_state, callback);
9581043
};
9591044
final_requester.cancel = function() {
9601045
call.cancel();
@@ -981,6 +1066,9 @@ function _getBidiStreamingInterceptor(method_definition, channel, emitter) {
9811066
method_definition.responseDeserialize);
9821067
return function (options) {
9831068
var first_listener;
1069+
var get_listener = function() {
1070+
return first_listener;
1071+
};
9841072
var call = common.getCall(channel, method_definition.path, options);
9851073
var final_requester = {};
9861074
final_requester.start = function (metadata, listener) {
@@ -1057,7 +1145,7 @@ function _getBidiStreamingInterceptor(method_definition, channel, emitter) {
10571145
[grpc.opType.RECV_MESSAGE]: true
10581146
};
10591147
call.startBatch(recv_batch, _getStreamReadCallback(emitter, call,
1060-
first_listener, deserialize));
1148+
get_listener, deserialize));
10611149
};
10621150
final_requester.cancel = function() {
10631151
call.cancel();
@@ -1144,11 +1232,13 @@ function _getServerStreamingListener(method_definition, emitter) {
11441232
onReceiveMessage: function(message, next, context) {
11451233
if (emitter.push(message) && message !== null) {
11461234
var call = context.call;
1147-
var listener = context.listener;
1235+
var get_listener = function() {
1236+
return context.listener;
1237+
};
11481238
var read_batch = {};
11491239
read_batch[grpc.opType.RECV_MESSAGE] = true;
11501240
call.startBatch(read_batch, _getStreamReadCallback(emitter, call,
1151-
listener, deserialize));
1241+
get_listener, deserialize));
11521242
} else {
11531243
emitter.reading = false;
11541244
}
@@ -1176,11 +1266,13 @@ function _getBidiStreamingListener(method_definition, emitter) {
11761266
onReceiveMessage: function(message, next, context) {
11771267
if (emitter.push(message) && message !== null) {
11781268
var call = context.call;
1179-
var listener = context.listener;
1269+
var get_listener = function() {
1270+
return context.listener;
1271+
};
11801272
var read_batch = {};
11811273
read_batch[grpc.opType.RECV_MESSAGE] = true;
11821274
call.startBatch(read_batch, _getStreamReadCallback(emitter, call,
1183-
listener, deserialize));
1275+
get_listener, deserialize));
11841276
} else {
11851277
emitter.reading = false;
11861278
}

packages/grpc-native-core/test/client_interceptors_test.js

+90
Original file line numberDiff line numberDiff line change
@@ -1702,4 +1702,94 @@ describe('Client interceptors', function() {
17021702
bidi_stream.end();
17031703
});
17041704
});
1705+
1706+
describe('order of operations enforced for async interceptors', function() {
1707+
it('with unary call', function(done) {
1708+
var expected_calls = [
1709+
'close_b',
1710+
'message_b',
1711+
'start_b',
1712+
'done'
1713+
];
1714+
var registry = new CallRegistry(done, expected_calls, true);
1715+
var message = {value: 'foo'};
1716+
var interceptor_a = function(options, nextCall) {
1717+
return new InterceptingCall(nextCall(options), {
1718+
start: function(metadata, listener, next) {
1719+
setTimeout(function() { next(metadata, listener); }, 50);
1720+
},
1721+
sendMessage: function(message, next) {
1722+
setTimeout(function () { next(message); }, 10);
1723+
}
1724+
});
1725+
};
1726+
var interceptor_b = function(options, nextCall) {
1727+
return new InterceptingCall(nextCall(options), {
1728+
start: function(metadata, listener, next) {
1729+
registry.addCall('start_b');
1730+
next(metadata, listener);
1731+
},
1732+
sendMessage: function(message, next) {
1733+
registry.addCall('message_b');
1734+
next(message);
1735+
},
1736+
halfClose: function(next) {
1737+
registry.addCall('close_b');
1738+
next();
1739+
}
1740+
});
1741+
};
1742+
var options = {
1743+
interceptors: [interceptor_a, interceptor_b]
1744+
};
1745+
client.echo(message, options, function(err, response) {
1746+
assert.strictEqual(err, null);
1747+
registry.addCall('done');
1748+
});
1749+
});
1750+
it('with serverStreaming call', function(done) {
1751+
var expected_calls = [
1752+
'close_b',
1753+
'message_b',
1754+
'start_b',
1755+
'done'
1756+
];
1757+
var registry = new CallRegistry(done, expected_calls, true);
1758+
var message = {value: 'foo'};
1759+
var interceptor_a = function(options, nextCall) {
1760+
return new InterceptingCall(nextCall(options), {
1761+
start: function(metadata, listener, next) {
1762+
setTimeout(function() { next(metadata, listener); }, 50);
1763+
},
1764+
sendMessage: function(message, next) {
1765+
setTimeout(function () { next(message); }, 10);
1766+
}
1767+
});
1768+
};
1769+
var interceptor_b = function(options, nextCall) {
1770+
return new InterceptingCall(nextCall(options), {
1771+
start: function(metadata, listener, next) {
1772+
registry.addCall('start_b');
1773+
next(metadata, listener);
1774+
},
1775+
sendMessage: function(message, next) {
1776+
registry.addCall('message_b');
1777+
next(message);
1778+
},
1779+
halfClose: function(next) {
1780+
registry.addCall('close_b');
1781+
next();
1782+
}
1783+
});
1784+
};
1785+
var options = {
1786+
interceptors: [interceptor_a, interceptor_b]
1787+
};
1788+
var stream = client.echoServerStream(message, options);
1789+
stream.on('data', function(response) {
1790+
assert.strictEqual(response.value, 'foo');
1791+
registry.addCall('done');
1792+
});
1793+
});
1794+
});
17051795
});

0 commit comments

Comments
 (0)