Skip to content

Commit ae4f597

Browse files
committed
feat: implement message selectors
1 parent 755d839 commit ae4f597

File tree

4 files changed

+44
-16
lines changed

4 files changed

+44
-16
lines changed

riot/riot.mli

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -268,7 +268,12 @@ val wait_pids : Pid.t list -> unit
268268
exception Receive_timeout
269269
exception Syscall_timeout
270270

271-
val receive : ?after:int64 -> ?ref:unit Ref.t -> unit -> Message.t
271+
val receive :
272+
?selector:(Message.t -> [ `select of 'msg | `skip ]) ->
273+
?after:int64 ->
274+
?ref:unit Ref.t ->
275+
unit ->
276+
'msg
272277
(** [receive ()] will return the first message in the process mailbox.
273278
274279
This function will suspend a process that has an empty mailbox, and the
@@ -283,6 +288,11 @@ val receive : ?after:int64 -> ?ref:unit Ref.t -> unit -> Message.t
283288
284289
### Selective Receive
285290
291+
If a `selector` was passed, the `selector` function will be used to select
292+
if a message will be picked or if it will be skipped.
293+
294+
### Receive with Refs
295+
286296
If a `ref` was passed, then `[receive ~ref ()]` will skip all messages
287297
created before the creation of this `Ref.t` value, and will only return
288298
newer messages.

riot/runtime/core/proc_effect.ml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,9 @@ type _ Effect.t +=
55
| Receive : {
66
ref : 'a Ref.t option;
77
timeout : Timeout.t;
8+
selector : Message.t -> [ `select of 'msg | `skip ];
89
}
9-
-> Message.t Effect.t
10+
-> 'msg Effect.t
1011
[@@unboxed]
1112

1213
type _ Effect.t += Yield : unit Effect.t [@@unboxed]

riot/runtime/import.ml

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,13 @@ let syscall ?timeout name interest source cb =
1717
Effect.perform (Proc_effect.Syscall { name; interest; source; timeout });
1818
cb ()
1919

20-
let receive ?after ?ref () =
20+
let default_selector msg = `select msg
21+
22+
let receive ?(selector = default_selector) ?after ?ref () =
2123
let timeout =
2224
match after with None -> `infinity | Some after -> `after after
2325
in
24-
Effect.perform (Proc_effect.Receive { ref; timeout })
26+
Effect.perform (Proc_effect.Receive { ref; timeout; selector })
2527

2628
let yield () = Effect.perform Proc_effect.Yield
2729
let random () = (_get_sch ()).rnd

riot/runtime/scheduler/scheduler.ml

Lines changed: 27 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -95,8 +95,8 @@ module Scheduler = struct
9595
add_to_run_queue sch proc)
9696
pool.schedulers
9797

98-
let handle_receive k pool sch (proc : Process.t) (ref : 'a Ref.t option)
99-
timeout =
98+
let handle_receive k pool sch (proc : Process.t) ~(ref : 'a Ref.t option)
99+
~timeout ~selector =
100100
Trace.handle_receive_span @@ fun () ->
101101
let open Proc_state in
102102
(* When a timeout is specified, we want to create it in the timer
@@ -179,21 +179,35 @@ module Scheduler = struct
179179
but we already have removed the monitor, then we will simply ignore this message. *)
180180
| ( _,
181181
Some
182-
Message.
183-
{
184-
msg = Process.Messages.Monitor (Process_down mon_pid) as msg;
185-
_;
186-
} ) ->
182+
(Message.
183+
{
184+
msg =
185+
Process.Messages.Monitor (Process_down mon_pid) as msg;
186+
_;
187+
} as envelope) ) ->
187188
Process.clear_receive_timeout proc;
188-
if Process.is_monitored_by_pid proc mon_pid then k (Continue msg)
189+
if Process.is_monitored_by_pid proc mon_pid then (
190+
match selector msg with
191+
| `select msg ->
192+
Process.clear_receive_timeout proc;
193+
k (Continue msg)
194+
| `skip ->
195+
Process.add_to_save_queue proc envelope;
196+
go (fuel - 1))
189197
else go (fuel - 1)
190198
(* lastly, if we have a ref and the mesasge is newer than the ref, and
191199
when we don't have a ref, we just pop the message and continue with it
192200
*)
193-
| _, Some Message.{ msg; _ } ->
194-
Process.clear_receive_timeout proc;
195-
k (Continue msg)
201+
| _, Some msg -> (
202+
match selector Message.(msg.msg) with
203+
| `select msg ->
204+
Process.clear_receive_timeout proc;
205+
k (Continue msg)
206+
| `skip ->
207+
Process.add_to_save_queue proc msg;
208+
go (fuel - 1))
196209
in
210+
197211
go fuel
198212

199213
let handle_syscall k pool (sch : t) (proc : Process.t) name interest source
@@ -263,9 +277,10 @@ module Scheduler = struct
263277
let perform : type a b. (a, b) step_callback =
264278
fun k eff ->
265279
match eff with
280+
| Receive { ref; timeout; selector } ->
281+
handle_receive k pool sch proc ~ref ~timeout ~selector
266282
| Syscall { name; interest; source; timeout } ->
267283
handle_syscall k pool sch proc name interest source timeout
268-
| Receive { ref; timeout } -> handle_receive k pool sch proc ref timeout
269284
| Yield ->
270285
Log.trace (fun f ->
271286
f "Process %a: yielding" Pid.pp (Process.pid proc));

0 commit comments

Comments
 (0)