Skip to content

Commit e438ac2

Browse files
authored
Add op_id throughout op API (#2734)
Removes the magic number hack to switch between flatbuffers and the minimal dispatcher. Adds machinery to pass the op_id through the shared_queue.
1 parent 5350abb commit e438ac2

22 files changed

+354
-251
lines changed

cli/dispatch_minimal.rs

Lines changed: 13 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -8,33 +8,26 @@ use crate::state::ThreadSafeState;
88
use deno::Buf;
99
use deno::CoreOp;
1010
use deno::Op;
11+
use deno::OpId;
1112
use deno::PinnedBuf;
1213
use futures::Future;
1314

14-
const DISPATCH_MINIMAL_TOKEN: i32 = 0xCAFE;
15-
const OP_READ: i32 = 1;
16-
const OP_WRITE: i32 = 2;
15+
const OP_READ: OpId = 1;
16+
const OP_WRITE: OpId = 2;
1717

1818
#[derive(Copy, Clone, Debug, PartialEq)]
1919
// This corresponds to RecordMinimal on the TS side.
2020
pub struct Record {
2121
pub promise_id: i32,
22-
pub op_id: i32,
2322
pub arg: i32,
2423
pub result: i32,
2524
}
2625

2726
impl Into<Buf> for Record {
2827
fn into(self) -> Buf {
29-
let vec = vec![
30-
DISPATCH_MINIMAL_TOKEN,
31-
self.promise_id,
32-
self.op_id,
33-
self.arg,
34-
self.result,
35-
];
28+
let vec = vec![self.promise_id, self.arg, self.result];
3629
let buf32 = vec.into_boxed_slice();
37-
let ptr = Box::into_raw(buf32) as *mut [u8; 5 * 4];
30+
let ptr = Box::into_raw(buf32) as *mut [u8; 3 * 4];
3831
unsafe { Box::from_raw(ptr) }
3932
}
4033
}
@@ -48,32 +41,25 @@ pub fn parse_min_record(bytes: &[u8]) -> Option<Record> {
4841
let p32 = p as *const i32;
4942
let s = unsafe { std::slice::from_raw_parts(p32, bytes.len() / 4) };
5043

51-
if s.len() < 5 {
44+
if s.len() != 3 {
5245
return None;
5346
}
5447
let ptr = s.as_ptr();
55-
let ints = unsafe { std::slice::from_raw_parts(ptr, 5) };
56-
if ints[0] != DISPATCH_MINIMAL_TOKEN {
57-
return None;
58-
}
48+
let ints = unsafe { std::slice::from_raw_parts(ptr, 3) };
5949
Some(Record {
60-
promise_id: ints[1],
61-
op_id: ints[2],
62-
arg: ints[3],
63-
result: ints[4],
50+
promise_id: ints[0],
51+
arg: ints[1],
52+
result: ints[2],
6453
})
6554
}
6655

6756
#[test]
6857
fn test_parse_min_record() {
69-
let buf = vec![
70-
0xFE, 0xCA, 0, 0, 1, 0, 0, 0, 2, 0, 0, 0, 3, 0, 0, 0, 4, 0, 0, 0,
71-
];
58+
let buf = vec![1, 0, 0, 0, 3, 0, 0, 0, 4, 0, 0, 0];
7259
assert_eq!(
7360
parse_min_record(&buf),
7461
Some(Record {
7562
promise_id: 1,
76-
op_id: 2,
7763
arg: 3,
7864
result: 4,
7965
})
@@ -88,11 +74,12 @@ fn test_parse_min_record() {
8874

8975
pub fn dispatch_minimal(
9076
state: &ThreadSafeState,
77+
op_id: OpId,
9178
mut record: Record,
9279
zero_copy: Option<PinnedBuf>,
9380
) -> CoreOp {
9481
let is_sync = record.promise_id == 0;
95-
let min_op = match record.op_id {
82+
let min_op = match op_id {
9683
OP_READ => ops::read(record.arg, zero_copy),
9784
OP_WRITE => ops::write(record.arg, zero_copy),
9885
_ => unimplemented!(),

cli/ops.rs

Lines changed: 7 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -27,14 +27,7 @@ use crate::tokio_write;
2727
use crate::version;
2828
use crate::worker::Worker;
2929
use atty;
30-
use deno::Buf;
31-
use deno::CoreOp;
32-
use deno::ErrBox;
33-
use deno::Loader;
34-
use deno::ModuleSpecifier;
35-
use deno::Op;
36-
use deno::OpResult;
37-
use deno::PinnedBuf;
30+
use deno::*;
3831
use flatbuffers::FlatBufferBuilder;
3932
use futures;
4033
use futures::Async;
@@ -82,16 +75,20 @@ fn empty_buf() -> Buf {
8275
Box::new([])
8376
}
8477

78+
const FLATBUFFER_OP_ID: OpId = 44;
79+
8580
pub fn dispatch_all(
8681
state: &ThreadSafeState,
82+
op_id: OpId,
8783
control: &[u8],
8884
zero_copy: Option<PinnedBuf>,
8985
op_selector: OpSelector,
9086
) -> CoreOp {
9187
let bytes_sent_control = control.len();
9288
let bytes_sent_zero_copy = zero_copy.as_ref().map(|b| b.len()).unwrap_or(0);
93-
let op = if let Some(min_record) = parse_min_record(control) {
94-
dispatch_minimal(state, min_record, zero_copy)
89+
let op = if op_id != FLATBUFFER_OP_ID {
90+
let min_record = parse_min_record(control).unwrap();
91+
dispatch_minimal(state, op_id, min_record, zero_copy)
9592
} else {
9693
dispatch_all_legacy(state, control, zero_copy, op_selector)
9794
};

cli/state.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ use deno::CoreOp;
2020
use deno::ErrBox;
2121
use deno::Loader;
2222
use deno::ModuleSpecifier;
23+
use deno::OpId;
2324
use deno::PinnedBuf;
2425
use futures::future::Shared;
2526
use futures::Future;
@@ -104,10 +105,11 @@ impl Deref for ThreadSafeState {
104105
impl ThreadSafeState {
105106
pub fn dispatch(
106107
&self,
108+
op_id: OpId,
107109
control: &[u8],
108110
zero_copy: Option<PinnedBuf>,
109111
) -> CoreOp {
110-
ops::dispatch_all(self, control, zero_copy, self.dispatch_selector)
112+
ops::dispatch_all(self, op_id, control, zero_copy, self.dispatch_selector)
111113
}
112114
}
113115

cli/worker.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,8 @@ impl Worker {
2929
{
3030
let mut i = isolate.lock().unwrap();
3131
let state_ = state.clone();
32-
i.set_dispatch(move |control_buf, zero_copy_buf| {
33-
state_.dispatch(control_buf, zero_copy_buf)
32+
i.set_dispatch(move |op_id, control_buf, zero_copy_buf| {
33+
state_.dispatch(op_id, control_buf, zero_copy_buf)
3434
});
3535
let state_ = state.clone();
3636
i.set_js_error_create(move |v8_exception| {

core/core.d.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,12 @@
55
// Deno and therefore do not flow through to the runtime type library.
66

77
declare interface MessageCallback {
8-
(msg: Uint8Array): void;
8+
(opId: number, msg: Uint8Array): void;
99
}
1010

1111
declare interface DenoCore {
1212
dispatch(
13+
opId: number,
1314
control: Uint8Array,
1415
zeroCopy?: ArrayBufferView | null
1516
): Uint8Array | null;

core/examples/http_bench.js

Lines changed: 9 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -29,20 +29,19 @@ function createResolvable() {
2929
return Object.assign(promise, methods);
3030
}
3131

32-
const scratch32 = new Int32Array(4);
32+
const scratch32 = new Int32Array(3);
3333
const scratchBytes = new Uint8Array(
3434
scratch32.buffer,
3535
scratch32.byteOffset,
3636
scratch32.byteLength
3737
);
38-
assert(scratchBytes.byteLength === 4 * 4);
38+
assert(scratchBytes.byteLength === 3 * 4);
3939

4040
function send(promiseId, opId, arg, zeroCopy = null) {
4141
scratch32[0] = promiseId;
42-
scratch32[1] = opId;
43-
scratch32[2] = arg;
44-
scratch32[3] = -1;
45-
return Deno.core.dispatch(scratchBytes, zeroCopy);
42+
scratch32[1] = arg;
43+
scratch32[2] = -1;
44+
return Deno.core.dispatch(opId, scratchBytes, zeroCopy);
4645
}
4746

4847
/** Returns Promise<number> */
@@ -55,13 +54,12 @@ function sendAsync(opId, arg, zeroCopy = null) {
5554
}
5655

5756
function recordFromBuf(buf) {
58-
assert(buf.byteLength === 16);
57+
assert(buf.byteLength === 3 * 4);
5958
const buf32 = new Int32Array(buf.buffer, buf.byteOffset, buf.byteLength / 4);
6059
return {
6160
promiseId: buf32[0],
62-
opId: buf32[1],
63-
arg: buf32[2],
64-
result: buf32[3]
61+
arg: buf32[1],
62+
result: buf32[2]
6563
};
6664
}
6765

@@ -72,7 +70,7 @@ function sendSync(opId, arg) {
7270
return record.result;
7371
}
7472

75-
function handleAsyncMsgFromRust(buf) {
73+
function handleAsyncMsgFromRust(opId, buf) {
7674
const record = recordFromBuf(buf);
7775
const { promiseId, result } = record;
7876
const p = promiseMap.get(promiseId);

core/examples/http_bench.rs

Lines changed: 23 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -36,25 +36,23 @@ impl log::Log for Logger {
3636
fn flush(&self) {}
3737
}
3838

39-
const OP_LISTEN: i32 = 1;
40-
const OP_ACCEPT: i32 = 2;
41-
const OP_READ: i32 = 3;
42-
const OP_WRITE: i32 = 4;
43-
const OP_CLOSE: i32 = 5;
39+
const OP_LISTEN: OpId = 1;
40+
const OP_ACCEPT: OpId = 2;
41+
const OP_READ: OpId = 3;
42+
const OP_WRITE: OpId = 4;
43+
const OP_CLOSE: OpId = 5;
4444

4545
#[derive(Clone, Debug, PartialEq)]
4646
pub struct Record {
4747
pub promise_id: i32,
48-
pub op_id: i32,
4948
pub arg: i32,
5049
pub result: i32,
5150
}
5251

5352
impl Into<Buf> for Record {
5453
fn into(self) -> Buf {
55-
let buf32 = vec![self.promise_id, self.op_id, self.arg, self.result]
56-
.into_boxed_slice();
57-
let ptr = Box::into_raw(buf32) as *mut [u8; 16];
54+
let buf32 = vec![self.promise_id, self.arg, self.result].into_boxed_slice();
55+
let ptr = Box::into_raw(buf32) as *mut [u8; 3 * 4];
5856
unsafe { Box::from_raw(ptr) }
5957
}
6058
}
@@ -63,28 +61,26 @@ impl From<&[u8]> for Record {
6361
fn from(s: &[u8]) -> Record {
6462
#[allow(clippy::cast_ptr_alignment)]
6563
let ptr = s.as_ptr() as *const i32;
66-
let ints = unsafe { std::slice::from_raw_parts(ptr, 4) };
64+
let ints = unsafe { std::slice::from_raw_parts(ptr, 3) };
6765
Record {
6866
promise_id: ints[0],
69-
op_id: ints[1],
70-
arg: ints[2],
71-
result: ints[3],
67+
arg: ints[1],
68+
result: ints[2],
7269
}
7370
}
7471
}
7572

7673
impl From<Buf> for Record {
7774
fn from(buf: Buf) -> Record {
78-
assert_eq!(buf.len(), 4 * 4);
75+
assert_eq!(buf.len(), 3 * 4);
7976
#[allow(clippy::cast_ptr_alignment)]
80-
let ptr = Box::into_raw(buf) as *mut [i32; 4];
77+
let ptr = Box::into_raw(buf) as *mut [i32; 3];
8178
let ints: Box<[i32]> = unsafe { Box::from_raw(ptr) };
82-
assert_eq!(ints.len(), 4);
79+
assert_eq!(ints.len(), 3);
8380
Record {
8481
promise_id: ints[0],
85-
op_id: ints[1],
86-
arg: ints[2],
87-
result: ints[3],
82+
arg: ints[1],
83+
result: ints[2],
8884
}
8985
}
9086
}
@@ -93,7 +89,6 @@ impl From<Buf> for Record {
9389
fn test_record_from() {
9490
let r = Record {
9591
promise_id: 1,
96-
op_id: 2,
9792
arg: 3,
9893
result: 4,
9994
};
@@ -102,7 +97,7 @@ fn test_record_from() {
10297
#[cfg(target_endian = "little")]
10398
assert_eq!(
10499
buf,
105-
vec![1u8, 0, 0, 0, 2, 0, 0, 0, 3, 0, 0, 0, 4, 0, 0, 0].into_boxed_slice()
100+
vec![1u8, 0, 0, 0, 3, 0, 0, 0, 4, 0, 0, 0].into_boxed_slice()
106101
);
107102
let actual = Record::from(buf);
108103
assert_eq!(actual, expected);
@@ -111,10 +106,14 @@ fn test_record_from() {
111106

112107
pub type HttpBenchOp = dyn Future<Item = i32, Error = std::io::Error> + Send;
113108

114-
fn dispatch(control: &[u8], zero_copy_buf: Option<PinnedBuf>) -> CoreOp {
109+
fn dispatch(
110+
op_id: OpId,
111+
control: &[u8],
112+
zero_copy_buf: Option<PinnedBuf>,
113+
) -> CoreOp {
115114
let record = Record::from(control);
116115
let is_sync = record.promise_id == 0;
117-
let http_bench_op = match record.op_id {
116+
let http_bench_op = match op_id {
118117
OP_LISTEN => {
119118
assert!(is_sync);
120119
op_listen()
@@ -139,7 +138,7 @@ fn dispatch(control: &[u8], zero_copy_buf: Option<PinnedBuf>) -> CoreOp {
139138
let rid = record.arg;
140139
op_write(rid, zero_copy_buf)
141140
}
142-
_ => panic!("bad op {}", record.op_id),
141+
_ => panic!("bad op {}", op_id),
143142
};
144143
let mut record_a = record.clone();
145144
let mut record_b = record.clone();

0 commit comments

Comments
 (0)