10
10
11
11
#define SC_BUFFERING_NDEBUG // comment to debug
12
12
13
+ /** Downcast frame_sink to sc_delay_buffer */
14
+ #define DOWNCAST (SINK ) container_of(SINK, struct sc_delay_buffer, frame_sink)
15
+
13
16
static bool
14
17
sc_delayed_frame_init (struct sc_delayed_frame * dframe , const AVFrame * frame ) {
15
18
dframe -> frame = av_frame_alloc ();
@@ -33,49 +36,44 @@ sc_delayed_frame_destroy(struct sc_delayed_frame *dframe) {
33
36
av_frame_free (& dframe -> frame );
34
37
}
35
38
36
- static bool
37
- sc_delay_buffer_offer (struct sc_delay_buffer * db , const AVFrame * frame ) {
38
- return db -> cbs -> on_new_frame (db , frame , db -> cbs_userdata );
39
- }
40
-
41
39
static int
42
40
run_buffering (void * data ) {
43
41
struct sc_delay_buffer * db = data ;
44
42
45
43
assert (db -> delay > 0 );
46
44
47
45
for (;;) {
48
- sc_mutex_lock (& db -> b . mutex );
46
+ sc_mutex_lock (& db -> mutex );
49
47
50
- while (!db -> b . stopped && sc_vecdeque_is_empty (& db -> b . queue )) {
51
- sc_cond_wait (& db -> b . queue_cond , & db -> b . mutex );
48
+ while (!db -> stopped && sc_vecdeque_is_empty (& db -> queue )) {
49
+ sc_cond_wait (& db -> queue_cond , & db -> mutex );
52
50
}
53
51
54
- if (db -> b . stopped ) {
55
- sc_mutex_unlock (& db -> b . mutex );
52
+ if (db -> stopped ) {
53
+ sc_mutex_unlock (& db -> mutex );
56
54
goto stopped ;
57
55
}
58
56
59
- struct sc_delayed_frame dframe = sc_vecdeque_pop (& db -> b . queue );
57
+ struct sc_delayed_frame dframe = sc_vecdeque_pop (& db -> queue );
60
58
61
59
sc_tick max_deadline = sc_tick_now () + db -> delay ;
62
60
// PTS (written by the server) are expressed in microseconds
63
61
sc_tick pts = SC_TICK_TO_US (dframe .frame -> pts );
64
62
65
63
bool timed_out = false;
66
- while (!db -> b . stopped && !timed_out ) {
67
- sc_tick deadline = sc_clock_to_system_time (& db -> b . clock , pts )
64
+ while (!db -> stopped && !timed_out ) {
65
+ sc_tick deadline = sc_clock_to_system_time (& db -> clock , pts )
68
66
+ db -> delay ;
69
67
if (deadline > max_deadline ) {
70
68
deadline = max_deadline ;
71
69
}
72
70
73
71
timed_out =
74
- !sc_cond_timedwait (& db -> b . wait_cond , & db -> b . mutex , deadline );
72
+ !sc_cond_timedwait (& db -> wait_cond , & db -> mutex , deadline );
75
73
}
76
74
77
- bool stopped = db -> b . stopped ;
78
- sc_mutex_unlock (& db -> b . mutex );
75
+ bool stopped = db -> stopped ;
76
+ sc_mutex_unlock (& db -> mutex );
79
77
80
78
if (stopped ) {
81
79
sc_delayed_frame_destroy (& dframe );
@@ -87,24 +85,24 @@ run_buffering(void *data) {
87
85
pts , dframe .push_date , sc_tick_now ());
88
86
#endif
89
87
90
- bool ok = sc_delay_buffer_offer ( db , dframe .frame );
88
+ bool ok = sc_frame_source_sinks_push ( & db -> frame_source , dframe .frame );
91
89
sc_delayed_frame_destroy (& dframe );
92
90
if (!ok ) {
93
91
LOGE ("Delayed frame could not be pushed, stopping" );
94
- sc_mutex_lock (& db -> b . mutex );
95
- // Prevent to push any new packet
96
- db -> b . stopped = true;
97
- sc_mutex_unlock (& db -> b . mutex );
92
+ sc_mutex_lock (& db -> mutex );
93
+ // Prevent to push any new frame
94
+ db -> stopped = true;
95
+ sc_mutex_unlock (& db -> mutex );
98
96
goto stopped ;
99
97
}
100
98
}
101
99
102
100
stopped :
103
- assert (db -> b . stopped );
101
+ assert (db -> stopped );
104
102
105
103
// Flush queue
106
- while (!sc_vecdeque_is_empty (& db -> b . queue )) {
107
- struct sc_delayed_frame * dframe = sc_vecdeque_popref (& db -> b . queue );
104
+ while (!sc_vecdeque_is_empty (& db -> queue )) {
105
+ struct sc_delayed_frame * dframe = sc_vecdeque_popref (& db -> queue );
108
106
sc_delayed_frame_destroy (dframe );
109
107
}
110
108
@@ -113,134 +111,136 @@ run_buffering(void *data) {
113
111
return 0 ;
114
112
}
115
113
116
- bool
117
- sc_delay_buffer_init (struct sc_delay_buffer * db , sc_tick delay ,
118
- const struct sc_delay_buffer_callbacks * cbs ,
119
- void * cbs_userdata ) {
120
- assert (delay >= 0 );
121
-
122
- if (delay ) {
123
- bool ok = sc_mutex_init (& db -> b .mutex );
124
- if (!ok ) {
125
- return false;
126
- }
127
-
128
- ok = sc_cond_init (& db -> b .queue_cond );
129
- if (!ok ) {
130
- sc_mutex_destroy (& db -> b .mutex );
131
- return false;
132
- }
114
+ static bool
115
+ sc_delay_buffer_frame_sink_open (struct sc_frame_sink * sink ,
116
+ const AVCodecContext * ctx ) {
117
+ struct sc_delay_buffer * db = DOWNCAST (sink );
118
+ (void ) ctx ;
133
119
134
- ok = sc_cond_init (& db -> b .wait_cond );
135
- if (!ok ) {
136
- sc_cond_destroy (& db -> b .queue_cond );
137
- sc_mutex_destroy (& db -> b .mutex );
138
- return false;
139
- }
120
+ bool ok = sc_mutex_init (& db -> mutex );
121
+ if (!ok ) {
122
+ return false;
123
+ }
140
124
141
- sc_clock_init (& db -> b .clock );
142
- sc_vecdeque_init (& db -> b .queue );
125
+ ok = sc_cond_init (& db -> queue_cond );
126
+ if (!ok ) {
127
+ goto error_destroy_mutex ;
143
128
}
144
129
145
- assert (cbs );
146
- assert (cbs -> on_new_frame );
130
+ ok = sc_cond_init (& db -> wait_cond );
131
+ if (!ok ) {
132
+ goto error_destroy_queue_cond ;
133
+ }
147
134
148
- db -> delay = delay ;
149
- db -> cbs = cbs ;
150
- db -> cbs_userdata = cbs_userdata ;
135
+ sc_clock_init (& db -> clock );
136
+ sc_vecdeque_init (& db -> queue );
151
137
152
- return true;
153
- }
138
+ if (!sc_frame_source_sinks_open (& db -> frame_source , ctx )) {
139
+ goto error_destroy_wait_cond ;
140
+ }
154
141
155
- bool
156
- sc_delay_buffer_start (struct sc_delay_buffer * db ) {
157
- if (db -> delay ) {
158
- bool ok =
159
- sc_thread_create (& db -> b .thread , run_buffering , "scrcpy-dbuf" , db );
160
- if (!ok ) {
161
- LOGE ("Could not start buffering thread" );
162
- return false;
163
- }
142
+ ok = sc_thread_create (& db -> thread , run_buffering , "scrcpy-dbuf" , db );
143
+ if (!ok ) {
144
+ LOGE ("Could not start buffering thread" );
145
+ goto error_close_sinks ;
164
146
}
165
147
166
148
return true;
167
- }
168
149
169
- void
170
- sc_delay_buffer_stop (struct sc_delay_buffer * db ) {
171
- if (db -> delay ) {
172
- sc_mutex_lock (& db -> b .mutex );
173
- db -> b .stopped = true;
174
- sc_cond_signal (& db -> b .queue_cond );
175
- sc_cond_signal (& db -> b .wait_cond );
176
- sc_mutex_unlock (& db -> b .mutex );
177
- }
178
- }
150
+ error_close_sinks :
151
+ sc_frame_source_sinks_close (& db -> frame_source );
152
+ error_destroy_wait_cond :
153
+ sc_cond_destroy (& db -> wait_cond );
154
+ error_destroy_queue_cond :
155
+ sc_cond_destroy (& db -> queue_cond );
156
+ error_destroy_mutex :
157
+ sc_mutex_destroy (& db -> mutex );
179
158
180
- void
181
- sc_delay_buffer_join (struct sc_delay_buffer * db ) {
182
- if (db -> delay ) {
183
- sc_thread_join (& db -> b .thread , NULL );
184
- }
159
+ return false;
185
160
}
186
161
187
- void
188
- sc_delay_buffer_destroy (struct sc_delay_buffer * db ) {
189
- if (db -> delay ) {
190
- sc_cond_destroy (& db -> b .wait_cond );
191
- sc_cond_destroy (& db -> b .queue_cond );
192
- sc_mutex_destroy (& db -> b .mutex );
193
- }
162
+ static void
163
+ sc_delay_buffer_frame_sink_close (struct sc_frame_sink * sink ) {
164
+ struct sc_delay_buffer * db = DOWNCAST (sink );
165
+
166
+ sc_mutex_lock (& db -> mutex );
167
+ db -> stopped = true;
168
+ sc_cond_signal (& db -> queue_cond );
169
+ sc_cond_signal (& db -> wait_cond );
170
+ sc_mutex_unlock (& db -> mutex );
171
+
172
+ sc_thread_join (& db -> thread , NULL );
173
+
174
+ sc_frame_source_sinks_close (& db -> frame_source );
175
+
176
+ sc_cond_destroy (& db -> wait_cond );
177
+ sc_cond_destroy (& db -> queue_cond );
178
+ sc_mutex_destroy (& db -> mutex );
194
179
}
195
180
196
- bool
197
- sc_delay_buffer_push (struct sc_delay_buffer * db , const AVFrame * frame ) {
198
- if (!db -> delay ) {
199
- // No buffering
200
- return sc_delay_buffer_offer (db , frame );
201
- }
181
+ static bool
182
+ sc_delay_buffer_frame_sink_push (struct sc_frame_sink * sink ,
183
+ const AVFrame * frame ) {
184
+ struct sc_delay_buffer * db = DOWNCAST (sink );
202
185
203
- sc_mutex_lock (& db -> b . mutex );
186
+ sc_mutex_lock (& db -> mutex );
204
187
205
- if (db -> b . stopped ) {
206
- sc_mutex_unlock (& db -> b . mutex );
188
+ if (db -> stopped ) {
189
+ sc_mutex_unlock (& db -> mutex );
207
190
return false;
208
191
}
209
192
210
193
sc_tick pts = SC_TICK_FROM_US (frame -> pts );
211
- sc_clock_update (& db -> b . clock , sc_tick_now (), pts );
212
- sc_cond_signal (& db -> b . wait_cond );
194
+ sc_clock_update (& db -> clock , sc_tick_now (), pts );
195
+ sc_cond_signal (& db -> wait_cond );
213
196
214
- if (db -> b . clock .count == 1 ) {
215
- sc_mutex_unlock (& db -> b . mutex );
216
- // First frame, offer it immediately, for two reasons:
197
+ if (db -> clock .count == 1 ) {
198
+ sc_mutex_unlock (& db -> mutex );
199
+ // First frame, push it immediately, for two reasons:
217
200
// - not to delay the opening of the scrcpy window
218
201
// - the buffering estimation needs at least two clock points, so it
219
202
// could not handle the first frame
220
- return sc_delay_buffer_offer ( db , frame );
203
+ return sc_frame_source_sinks_push ( & db -> frame_source , frame );
221
204
}
222
205
223
206
struct sc_delayed_frame dframe ;
224
207
bool ok = sc_delayed_frame_init (& dframe , frame );
225
208
if (!ok ) {
226
- sc_mutex_unlock (& db -> b . mutex );
209
+ sc_mutex_unlock (& db -> mutex );
227
210
return false;
228
211
}
229
212
230
213
#ifndef SC_BUFFERING_NDEBUG
231
214
dframe .push_date = sc_tick_now ();
232
215
#endif
233
216
234
- ok = sc_vecdeque_push (& db -> b . queue , dframe );
217
+ ok = sc_vecdeque_push (& db -> queue , dframe );
235
218
if (!ok ) {
236
- sc_mutex_unlock (& db -> b . mutex );
219
+ sc_mutex_unlock (& db -> mutex );
237
220
LOG_OOM ();
238
221
return false;
239
222
}
240
223
241
- sc_cond_signal (& db -> b . queue_cond );
224
+ sc_cond_signal (& db -> queue_cond );
242
225
243
- sc_mutex_unlock (& db -> b . mutex );
226
+ sc_mutex_unlock (& db -> mutex );
244
227
245
228
return true;
246
229
}
230
+
231
+ void
232
+ sc_delay_buffer_init (struct sc_delay_buffer * db , sc_tick delay ) {
233
+ assert (delay > 0 );
234
+
235
+ db -> delay = delay ;
236
+
237
+ sc_frame_source_init (& db -> frame_source );
238
+
239
+ static const struct sc_frame_sink_ops ops = {
240
+ .open = sc_delay_buffer_frame_sink_open ,
241
+ .close = sc_delay_buffer_frame_sink_close ,
242
+ .push = sc_delay_buffer_frame_sink_push ,
243
+ };
244
+
245
+ db -> frame_sink .ops = & ops ;
246
+ }
0 commit comments