18
18
import logging
19
19
from collections import deque
20
20
from typing import (
21
+ Any ,
21
22
Awaitable ,
22
23
Callable ,
23
24
Collection ,
104
105
)
105
106
106
107
107
- @attr .s (auto_attribs = True , frozen = True , slots = True )
108
+ @attr .s (auto_attribs = True , slots = True )
108
109
class _EventPersistQueueItem :
109
110
events_and_contexts : List [Tuple [EventBase , EventContext ]]
110
111
backfilled : bool
111
112
deferred : ObservableDeferred
112
- opentracing_context : Optional [object ]
113
+
114
+ parent_opentracing_span_contexts : List = []
115
+ """A list of opentracing spans waiting for this batch"""
116
+
117
+ opentracing_span_context : Any = None
118
+ """The opentracing span under which the persistence actually happened"""
113
119
114
120
115
121
_PersistResult = TypeVar ("_PersistResult" )
@@ -164,20 +170,36 @@ async def add_to_queue(
164
170
end_item = queue [- 1 ]
165
171
else :
166
172
# need to make a new queue item
167
- span = opentracing .active_span ()
168
173
deferred = ObservableDeferred (defer .Deferred (), consumeErrors = True )
169
174
170
175
end_item = _EventPersistQueueItem (
171
176
events_and_contexts = [],
172
177
backfilled = backfilled ,
173
178
deferred = deferred ,
174
- opentracing_context = span .context if span else None ,
175
179
)
176
180
queue .append (end_item )
177
181
182
+ # add our events to the queue item
178
183
end_item .events_and_contexts .extend (events_and_contexts )
184
+
185
+ # also add our active opentracing span to the item so that we get a link back
186
+ span = opentracing .active_span ()
187
+ if span :
188
+ end_item .parent_opentracing_span_contexts .append (span .context )
189
+
190
+ # start a processor for the queue, if there isn't one already
179
191
self ._handle_queue (room_id )
180
- return await make_deferred_yieldable (end_item .deferred .observe ())
192
+
193
+ # wait for the queue item to complete
194
+ res = await make_deferred_yieldable (end_item .deferred .observe ())
195
+
196
+ # add another opentracing span which links to the persist trace.
197
+ with opentracing .start_active_span_follows_from (
198
+ "persist_event_batch_complete" , (end_item .opentracing_span_context ,)
199
+ ):
200
+ pass
201
+
202
+ return res
181
203
182
204
def _handle_queue (self , room_id ):
183
205
"""Attempts to handle the queue for a room if not already being handled.
@@ -204,10 +226,14 @@ async def handle_queue_loop():
204
226
queue = self ._get_drainining_queue (room_id )
205
227
for item in queue :
206
228
try :
207
- with opentracing .start_active_span (
229
+ with opentracing .start_active_span_follows_from (
208
230
"persist_event_batch" ,
209
- child_of = item .opentracing_context ,
210
- ):
231
+ item .parent_opentracing_span_contexts ,
232
+ inherit_force_tracing = True ,
233
+ ) as scope :
234
+ if scope :
235
+ item .opentracing_span_context = scope .span .context
236
+
211
237
ret = await self ._per_item_callback (
212
238
item .events_and_contexts , item .backfilled
213
239
)
0 commit comments