@@ -119,75 +119,78 @@ impl WidgetDriver {
119
119
room : Room ,
120
120
permissions_provider : impl PermissionsProvider ,
121
121
) -> Result < ( ) , ( ) > {
122
+ let ( mut client_api, mut actions) = ClientApi :: new ( ) ;
123
+
122
124
// Create a channel so that we can conveniently send all events to it.
123
125
let ( events_tx, mut events_rx) = unbounded_channel ( ) ;
124
126
125
- // `from.map(|m| Ok(Event::MessageFromWidget(msg)).forward(events_tx)`,
126
- // but apparently `UnboundedSender<T>` does not implement `Sink<T>`.
127
+ // Forward all of the incoming messages from the widget to the `events_tx`.
127
128
let tx = events_tx. clone ( ) ;
128
129
tokio:: spawn ( async move {
129
130
while let Ok ( msg) = self . from_widget_rx . recv ( ) . await {
130
131
let _ = tx. send ( Event :: MessageFromWidget ( msg) ) ;
131
132
}
132
133
} ) ;
133
134
134
- // Our "state" (everything we need to process events).
135
- let ( mut client_api, matrix) = ( ClientApi :: new ( ) , MatrixDriver :: new ( room) ) ;
136
- let mut event_forwarding_guard: Option < DropGuard > = None ;
135
+ // Forward all of the incoming events to the `ClientApi` implementation.
136
+ tokio:: spawn ( async move {
137
+ while let Some ( event) = events_rx. recv ( ) . await {
138
+ client_api. process ( event) ;
139
+ }
140
+ } ) ;
137
141
138
- // Process events by passing them to the `ClientApi` implementation.
139
- while let Some ( event) = events_rx. recv ( ) . await {
140
- for action in client_api. process ( event) {
141
- match action {
142
- Action :: SendToWidget ( msg) => {
143
- self . to_widget_tx . send ( msg) . await . map_err ( |_| ( ) ) ?
144
- }
145
- Action :: AcquirePermissions ( cmd) => {
146
- let obtained = permissions_provider. acquire_permissions ( cmd. clone ( ) ) . await ;
147
- let event = Event :: PermissionsAcquired ( cmd. ok ( obtained) ) ;
148
- events_tx. send ( event) . map_err ( |_| ( ) ) ?;
149
- }
150
- Action :: GetOpenId ( cmd) => {
151
- let result = cmd. result ( matrix. get_open_id ( ) . await ) ;
152
- events_tx. send ( Event :: OpenIdReceived ( result) ) . map_err ( |_| ( ) ) ?;
153
- }
154
- Action :: ReadMatrixEvent ( cmd) => {
155
- let matrix_events = matrix. read ( cmd. event_type . clone ( ) , cmd. limit ) . await ;
156
- let event = Event :: MatrixEventRead ( cmd. result ( matrix_events) ) ;
157
- events_tx. send ( event) . map_err ( |_| ( ) ) ?;
158
- }
159
- Action :: SendMatrixEvent ( cmd) => {
160
- let SendEventCommand { event_type, state_key, content } = cmd. clone ( ) ;
161
- let matrix_event_id = matrix. send ( event_type, state_key, content) . await ;
162
- let event = Event :: MatrixEventSent ( cmd. result ( matrix_event_id) ) ;
163
- events_tx. send ( event) . map_err ( |_| ( ) ) ?;
164
- }
165
- Action :: Subscribe => {
166
- // Only subscribe if we are not already subscribed.
167
- if event_forwarding_guard. is_none ( ) {
168
- let ( stop_forwarding, guard) = {
169
- let token = CancellationToken :: new ( ) ;
170
- ( token. child_token ( ) , token. drop_guard ( ) )
171
- } ;
172
-
173
- event_forwarding_guard = Some ( guard) ;
174
- let ( mut matrix, events_tx) = ( matrix. events ( ) , events_tx. clone ( ) ) ;
175
- tokio:: spawn ( async move {
176
- loop {
177
- tokio:: select! {
178
- _ = stop_forwarding. cancelled( ) => { return }
179
- Some ( event) = matrix. recv( ) => {
180
- let _ = events_tx. send( Event :: MatrixEventReceived ( event) ) ;
181
- }
142
+ // Process events that we receive **from** the client api implementation,
143
+ // i.e. the commands (actions) that the client sends to us.
144
+ let matrix = MatrixDriver :: new ( room) ;
145
+ let mut event_forwarding_guard: Option < DropGuard > = None ;
146
+ while let Some ( action) = actions. recv ( ) . await {
147
+ match action {
148
+ Action :: SendToWidget ( msg) => self . to_widget_tx . send ( msg) . await . map_err ( |_| ( ) ) ?,
149
+ Action :: AcquirePermissions ( cmd) => {
150
+ let obtained = permissions_provider. acquire_permissions ( cmd. clone ( ) ) . await ;
151
+ let event = Event :: PermissionsAcquired ( cmd. ok ( obtained) ) ;
152
+ events_tx. send ( event) . map_err ( |_| ( ) ) ?;
153
+ }
154
+ Action :: GetOpenId ( cmd) => {
155
+ let result = cmd. result ( matrix. get_open_id ( ) . await ) ;
156
+ events_tx. send ( Event :: OpenIdReceived ( result) ) . map_err ( |_| ( ) ) ?;
157
+ }
158
+ Action :: ReadMatrixEvent ( cmd) => {
159
+ let matrix_events = matrix. read ( cmd. event_type . clone ( ) , cmd. limit ) . await ;
160
+ let event = Event :: MatrixEventRead ( cmd. result ( matrix_events) ) ;
161
+ events_tx. send ( event) . map_err ( |_| ( ) ) ?;
162
+ }
163
+ Action :: SendMatrixEvent ( cmd) => {
164
+ let SendEventCommand { event_type, state_key, content } = cmd. clone ( ) ;
165
+ let matrix_event_id = matrix. send ( event_type, state_key, content) . await ;
166
+ let event = Event :: MatrixEventSent ( cmd. result ( matrix_event_id) ) ;
167
+ events_tx. send ( event) . map_err ( |_| ( ) ) ?;
168
+ }
169
+ Action :: Subscribe => {
170
+ // Only subscribe if we are not already subscribed.
171
+ if event_forwarding_guard. is_none ( ) {
172
+ let ( stop_forwarding, guard) = {
173
+ let token = CancellationToken :: new ( ) ;
174
+ ( token. child_token ( ) , token. drop_guard ( ) )
175
+ } ;
176
+
177
+ event_forwarding_guard = Some ( guard) ;
178
+ let ( mut matrix, events_tx) = ( matrix. events ( ) , events_tx. clone ( ) ) ;
179
+ tokio:: spawn ( async move {
180
+ loop {
181
+ tokio:: select! {
182
+ _ = stop_forwarding. cancelled( ) => { return }
183
+ Some ( event) = matrix. recv( ) => {
184
+ let _ = events_tx. send( Event :: MatrixEventReceived ( event) ) ;
182
185
}
183
186
}
184
- } ) ;
185
- }
186
- }
187
- Action :: Unsubscribe => {
188
- event_forwarding_guard = None ;
187
+ }
188
+ } ) ;
189
189
}
190
190
}
191
+ Action :: Unsubscribe => {
192
+ event_forwarding_guard = None ;
193
+ }
191
194
}
192
195
}
193
196
0 commit comments