From f60236b3850afdbc8cabdb65ad9e28a846ac917b Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Thu, 12 May 2022 00:44:31 -0500 Subject: [PATCH 1/9] Raw commit from debugging "Current state for room {room_id} is empty" error --- scripts-dev/complement.sh | 2 +- synapse/handlers/federation.py | 2 ++ synapse/handlers/federation_event.py | 41 +++++++++++++++++++++--- synapse/rest/client/room.py | 16 +++++++++ synapse/storage/databases/main/events.py | 33 +++++++++++++++++++ synapse/storage/databases/main/state.py | 4 +++ synapse/storage/persist_events.py | 22 ++++++++++++- 7 files changed, 114 insertions(+), 6 deletions(-) diff --git a/scripts-dev/complement.sh b/scripts-dev/complement.sh index 190df6909a6a..d63c1b2d054e 100755 --- a/scripts-dev/complement.sh +++ b/scripts-dev/complement.sh @@ -73,4 +73,4 @@ docker build -t $COMPLEMENT_BASE_IMAGE -f "docker/complement/$COMPLEMENT_DOCKERF # Run the tests! echo "Images built; running complement" cd "$COMPLEMENT_DIR" -go test -v -tags synapse_blacklist,msc2716,msc3030,faster_joins -count=1 "${extra_test_args[@]}" "$@" ./tests/... +go test -v -tags synapse_blacklist,msc2716,msc3030,faster_joins -count=1 "${extra_test_args[@]}" "$@" ./tests/ diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 38dc5b1f6edf..2c888e6204ca 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -461,6 +461,8 @@ async def do_invite_join( # room stuff after join currently doesn't work on workers. assert self.config.worker.worker_app is None + logger.info("traceFrom(do_invite_join) for _update_current_state_txn") + logger.debug("Joining %s to %s", joinee, room_id) origin, event, room_version_obj = await self._make_and_verify_event( diff --git a/synapse/handlers/federation_event.py b/synapse/handlers/federation_event.py index 6cf927e4ff7b..04a0a81b2d54 100644 --- a/synapse/handlers/federation_event.py +++ b/synapse/handlers/federation_event.py @@ -454,6 +454,10 @@ async def process_remote_join( room_id, itertools.chain(auth_events, state) ) + logger.info( + "process_remote_join state=%s partial_state=%s", state, partial_state + ) + # and now persist the join event itself. logger.info( "Peristing join-via-remote %s (partial_state: %s)", event, partial_state @@ -475,7 +479,19 @@ async def process_remote_join( # and discover that we do not have it. event.internal_metadata.proactively_send = False - return await self.persist_events_and_notify(room_id, [(event, context)]) + stream_id_after_persist = await self.persist_events_and_notify( + room_id, [(event, context)] + ) + + # Do this after the state from the remote join was persisted (via + # `persist_events_and_notify`). Otherwise we can run into a + # situation where the create event doesn't exist yet in the + # `current_state_events` + for e in state: + await self._handle_marker_event(origin, e) + # TODO: Loop through previous state to find other markers + + return stream_id_after_persist async def update_state_for_partial_state_event( self, destination: str, event: EventBase @@ -1200,25 +1216,40 @@ async def _handle_marker_event(self, origin: str, marker_event: EventBase) -> No """ if marker_event.type != EventTypes.MSC2716_MARKER: + # logger.info( + # "_handle_marker_event not a marker event marker_event.type=%s", + # marker_event.type, + # ) # Not a marker event return + logger.info("_handle_marker_event next 0000000000000000000000000000000000") + if marker_event.rejected_reason is not None: + logger.info( + "_handle_marker_event rejected %s", marker_event.rejected_reason + ) # Rejected event return + logger.info("_handle_marker_event next 1111111111111111111111111111111111") + # Skip processing a marker event if the room version doesn't # support it or the event is not from the room creator. room_version = await self._store.get_room_version(marker_event.room_id) + logger.info("_handle_marker_event next 2222222222222222222222222222222222") create_event = await self._store.get_create_event_for_room(marker_event.room_id) + logger.info("_handle_marker_event next 3333333333333333333333333333333333") room_creator = create_event.content.get(EventContentFields.ROOM_CREATOR) + logger.info("_handle_marker_event next 4444444444444444444444444444444444") if not room_version.msc2716_historical and ( not self._config.experimental.msc2716_enabled or marker_event.sender != room_creator ): + logger.info("_handle_marker_event skipping room_version=%s", room_version) return - logger.debug("_handle_marker_event: received %s", marker_event) + logger.info("_handle_marker_event: received %s", marker_event) insertion_event_id = marker_event.content.get( EventContentFields.MSC2716_MARKER_INSERTION @@ -1228,7 +1259,7 @@ async def _handle_marker_event(self, origin: str, marker_event: EventBase) -> No # Nothing to retrieve then (invalid marker) return - logger.debug( + logger.info( "_handle_marker_event: backfilling insertion event %s", insertion_event_id ) @@ -1260,7 +1291,7 @@ async def _handle_marker_event(self, origin: str, marker_event: EventBase) -> No insertion_event_id, marker_event.room_id ) - logger.debug( + logger.info( "_handle_marker_event: insertion extremity added for %s from marker event %s", insertion_event, marker_event, @@ -1947,6 +1978,8 @@ async def persist_events_and_notify( Returns: The stream ID after which all events have been persisted. """ + # logger.info("persist_events_and_notify event_and_contexts(%d)=%s", len(event_and_contexts), event_and_contexts) + if not event_and_contexts: return self._store.get_room_max_stream_ordering() diff --git a/synapse/rest/client/room.py b/synapse/rest/client/room.py index 906fe09e9713..998ba57fe629 100644 --- a/synapse/rest/client/room.py +++ b/synapse/rest/client/room.py @@ -330,6 +330,14 @@ async def on_POST( remote_room_hosts, ) + logger.info("/join/:aliasOrId target_user=%s", requester.user) + logger.info( + "++++---------------------------------------------------------------++++" + ) + logger.info( + "++++---------------------------------------------------------------++++" + ) + await self.room_member_handler.update_membership( requester=requester, target=requester.user, @@ -840,6 +848,14 @@ async def on_POST( ) -> Tuple[int, JsonDict]: requester = await self.auth.get_user_by_req(request, allow_guest=True) + logger.info("/%s target_user=%s", membership_action, requester.user) + logger.info( + "-----------------------------------------------------------------------" + ) + logger.info( + "-----------------------------------------------------------------------" + ) + if requester.is_guest and membership_action not in { Membership.JOIN, Membership.LEAVE, diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py index ed29a0a5e2db..803c65b1b204 100644 --- a/synapse/storage/databases/main/events.py +++ b/synapse/storage/databases/main/events.py @@ -159,6 +159,13 @@ async def _persist_events_and_state_updates( Resolves when the events have been persisted """ + logger.info( + "_persist_events_and_state_updates events_and_contexts(%d) current_state_for_room(%d) state_delta_for_room(%d)", + len(events_and_contexts), + len(current_state_for_room), + len(state_delta_for_room), + ) + # We want to calculate the stream orderings as late as possible, as # we only notify after all events with a lesser stream ordering have # been persisted. I.e. if we spend 10s inside the with block then @@ -999,7 +1006,17 @@ def _update_current_state_txn( state_delta_by_room: Dict[str, DeltaState], stream_id: int, ): + logger.info( + "_update_current_state_txn state_delta_by_room=%s", state_delta_by_room + ) for room_id, delta_state in state_delta_by_room.items(): + logger.info( + "_update_current_state_txn room_id=%s delta_state=%s", + room_id, + delta_state, + exc_info=True, + ) + to_delete = delta_state.to_delete to_insert = delta_state.to_insert @@ -1037,11 +1054,21 @@ def _update_current_state_txn( users_in_room = self.store.get_users_in_room_txn(txn, room_id) members_changed.update(users_in_room) + logger.info( + "_update_current_state_txn no_longer_in_room deleting all state for room_id=%s (before)", + room_id, + ) + self.db_pool.simple_delete_txn( txn, table="current_state_events", keyvalues={"room_id": room_id}, ) + + logger.info( + "_update_current_state_txn no_longer_in_room deleting all state for room_id=%s (after)", + room_id, + ) else: # We're still in the room, so we update the current state as normal. @@ -1092,6 +1119,12 @@ def _update_current_state_txn( ), ) + logger.info( + "_update_current_state_txn inserting current_state_events to_insert=%s to_delete=%s", + to_insert, + to_delete, + ) + # We include the membership in the current state table, hence we do # a lookup when we insert. This assumes that all events have already # been inserted into room_memberships. diff --git a/synapse/storage/databases/main/state.py b/synapse/storage/databases/main/state.py index 18ae8aee295d..e1861c42d8c5 100644 --- a/synapse/storage/databases/main/state.py +++ b/synapse/storage/databases/main/state.py @@ -179,6 +179,10 @@ async def get_create_event_for_room(self, room_id: str) -> EventBase: """ state_ids = await self.get_current_state_ids(room_id) + logger.info( + "get_create_event_for_room room_id=%s state_ids=%s", room_id, state_ids + ) + if not state_ids: raise NotFoundError(f"Current state for room {room_id} is empty") diff --git a/synapse/storage/persist_events.py b/synapse/storage/persist_events.py index 97118045a1ad..87438ad69896 100644 --- a/synapse/storage/persist_events.py +++ b/synapse/storage/persist_events.py @@ -313,11 +313,15 @@ async def persist_events( matched the transcation ID; the existing event is returned in such a case. """ + + # logger.info("persist_events events_and_contexts(%d)", len(events_and_contexts)) + partitioned: Dict[str, List[Tuple[EventBase, EventContext]]] = {} for event, ctx in events_and_contexts: partitioned.setdefault(event.room_id, []).append((event, ctx)) async def enqueue(item): + # logger.info("persist_events enqueue=%s", item) room_id, evs_ctxs = item return await self._event_persist_queue.add_to_queue( room_id, evs_ctxs, backfilled=backfilled @@ -450,6 +454,12 @@ async def _persist_event_batch( if not events_and_contexts: return replaced_events + logger.info( + "traceFrom(_persist_event_batch) for _update_current_state_txn events_and_contexts(%d) backfilled=%s", + len(events_and_contexts), + backfilled, + ) + # Check if any of the events have a transaction ID that has already been # persisted, and if so we don't persist it again. # @@ -515,7 +525,9 @@ async def _persist_event_batch( (event, context) ) - for room_id, ev_ctx_rm in events_by_room.items(): + events_by_room_items = events_by_room.items() + + for room_id, ev_ctx_rm in events_by_room_items: latest_event_ids = set( await self.main_store.get_latest_event_ids_in_room(room_id) ) @@ -523,6 +535,8 @@ async def _persist_event_batch( room_id, ev_ctx_rm, latest_event_ids ) + logger.info("persist_event_batch new_latest_event_ids=%s latest_event_ids=%s", new_latest_event_ids, latest_event_ids) + if new_latest_event_ids == latest_event_ids: # No change in extremities, so no change in state continue @@ -590,6 +604,12 @@ async def _persist_event_batch( new_forward_extremities[room_id] = new_latest_event_ids + # TODO: Left off here. need to see why + # `state_delta_for_room` is empty in the case where it's + # not working. Need to check how the delta is being + # calculated + logger.info("persist_event_batch delta_ids=%s", delta_ids) + # If either are not None then there has been a change, # and we need to work out the delta (or use that # given) From a367c35cafd25b75b740d912dabea6df84b2235e Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Thu, 12 May 2022 00:44:51 -0500 Subject: [PATCH 2/9] Revert "Raw commit from debugging "Current state for room {room_id} is empty" error" This reverts commit f60236b3850afdbc8cabdb65ad9e28a846ac917b. --- scripts-dev/complement.sh | 2 +- synapse/handlers/federation.py | 2 -- synapse/handlers/federation_event.py | 41 +++--------------------- synapse/rest/client/room.py | 16 --------- synapse/storage/databases/main/events.py | 33 ------------------- synapse/storage/databases/main/state.py | 4 --- synapse/storage/persist_events.py | 22 +------------ 7 files changed, 6 insertions(+), 114 deletions(-) diff --git a/scripts-dev/complement.sh b/scripts-dev/complement.sh index d63c1b2d054e..190df6909a6a 100755 --- a/scripts-dev/complement.sh +++ b/scripts-dev/complement.sh @@ -73,4 +73,4 @@ docker build -t $COMPLEMENT_BASE_IMAGE -f "docker/complement/$COMPLEMENT_DOCKERF # Run the tests! echo "Images built; running complement" cd "$COMPLEMENT_DIR" -go test -v -tags synapse_blacklist,msc2716,msc3030,faster_joins -count=1 "${extra_test_args[@]}" "$@" ./tests/ +go test -v -tags synapse_blacklist,msc2716,msc3030,faster_joins -count=1 "${extra_test_args[@]}" "$@" ./tests/... diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 2c888e6204ca..38dc5b1f6edf 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -461,8 +461,6 @@ async def do_invite_join( # room stuff after join currently doesn't work on workers. assert self.config.worker.worker_app is None - logger.info("traceFrom(do_invite_join) for _update_current_state_txn") - logger.debug("Joining %s to %s", joinee, room_id) origin, event, room_version_obj = await self._make_and_verify_event( diff --git a/synapse/handlers/federation_event.py b/synapse/handlers/federation_event.py index 04a0a81b2d54..6cf927e4ff7b 100644 --- a/synapse/handlers/federation_event.py +++ b/synapse/handlers/federation_event.py @@ -454,10 +454,6 @@ async def process_remote_join( room_id, itertools.chain(auth_events, state) ) - logger.info( - "process_remote_join state=%s partial_state=%s", state, partial_state - ) - # and now persist the join event itself. logger.info( "Peristing join-via-remote %s (partial_state: %s)", event, partial_state @@ -479,19 +475,7 @@ async def process_remote_join( # and discover that we do not have it. event.internal_metadata.proactively_send = False - stream_id_after_persist = await self.persist_events_and_notify( - room_id, [(event, context)] - ) - - # Do this after the state from the remote join was persisted (via - # `persist_events_and_notify`). Otherwise we can run into a - # situation where the create event doesn't exist yet in the - # `current_state_events` - for e in state: - await self._handle_marker_event(origin, e) - # TODO: Loop through previous state to find other markers - - return stream_id_after_persist + return await self.persist_events_and_notify(room_id, [(event, context)]) async def update_state_for_partial_state_event( self, destination: str, event: EventBase @@ -1216,40 +1200,25 @@ async def _handle_marker_event(self, origin: str, marker_event: EventBase) -> No """ if marker_event.type != EventTypes.MSC2716_MARKER: - # logger.info( - # "_handle_marker_event not a marker event marker_event.type=%s", - # marker_event.type, - # ) # Not a marker event return - logger.info("_handle_marker_event next 0000000000000000000000000000000000") - if marker_event.rejected_reason is not None: - logger.info( - "_handle_marker_event rejected %s", marker_event.rejected_reason - ) # Rejected event return - logger.info("_handle_marker_event next 1111111111111111111111111111111111") - # Skip processing a marker event if the room version doesn't # support it or the event is not from the room creator. room_version = await self._store.get_room_version(marker_event.room_id) - logger.info("_handle_marker_event next 2222222222222222222222222222222222") create_event = await self._store.get_create_event_for_room(marker_event.room_id) - logger.info("_handle_marker_event next 3333333333333333333333333333333333") room_creator = create_event.content.get(EventContentFields.ROOM_CREATOR) - logger.info("_handle_marker_event next 4444444444444444444444444444444444") if not room_version.msc2716_historical and ( not self._config.experimental.msc2716_enabled or marker_event.sender != room_creator ): - logger.info("_handle_marker_event skipping room_version=%s", room_version) return - logger.info("_handle_marker_event: received %s", marker_event) + logger.debug("_handle_marker_event: received %s", marker_event) insertion_event_id = marker_event.content.get( EventContentFields.MSC2716_MARKER_INSERTION @@ -1259,7 +1228,7 @@ async def _handle_marker_event(self, origin: str, marker_event: EventBase) -> No # Nothing to retrieve then (invalid marker) return - logger.info( + logger.debug( "_handle_marker_event: backfilling insertion event %s", insertion_event_id ) @@ -1291,7 +1260,7 @@ async def _handle_marker_event(self, origin: str, marker_event: EventBase) -> No insertion_event_id, marker_event.room_id ) - logger.info( + logger.debug( "_handle_marker_event: insertion extremity added for %s from marker event %s", insertion_event, marker_event, @@ -1978,8 +1947,6 @@ async def persist_events_and_notify( Returns: The stream ID after which all events have been persisted. """ - # logger.info("persist_events_and_notify event_and_contexts(%d)=%s", len(event_and_contexts), event_and_contexts) - if not event_and_contexts: return self._store.get_room_max_stream_ordering() diff --git a/synapse/rest/client/room.py b/synapse/rest/client/room.py index 998ba57fe629..906fe09e9713 100644 --- a/synapse/rest/client/room.py +++ b/synapse/rest/client/room.py @@ -330,14 +330,6 @@ async def on_POST( remote_room_hosts, ) - logger.info("/join/:aliasOrId target_user=%s", requester.user) - logger.info( - "++++---------------------------------------------------------------++++" - ) - logger.info( - "++++---------------------------------------------------------------++++" - ) - await self.room_member_handler.update_membership( requester=requester, target=requester.user, @@ -848,14 +840,6 @@ async def on_POST( ) -> Tuple[int, JsonDict]: requester = await self.auth.get_user_by_req(request, allow_guest=True) - logger.info("/%s target_user=%s", membership_action, requester.user) - logger.info( - "-----------------------------------------------------------------------" - ) - logger.info( - "-----------------------------------------------------------------------" - ) - if requester.is_guest and membership_action not in { Membership.JOIN, Membership.LEAVE, diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py index 803c65b1b204..ed29a0a5e2db 100644 --- a/synapse/storage/databases/main/events.py +++ b/synapse/storage/databases/main/events.py @@ -159,13 +159,6 @@ async def _persist_events_and_state_updates( Resolves when the events have been persisted """ - logger.info( - "_persist_events_and_state_updates events_and_contexts(%d) current_state_for_room(%d) state_delta_for_room(%d)", - len(events_and_contexts), - len(current_state_for_room), - len(state_delta_for_room), - ) - # We want to calculate the stream orderings as late as possible, as # we only notify after all events with a lesser stream ordering have # been persisted. I.e. if we spend 10s inside the with block then @@ -1006,17 +999,7 @@ def _update_current_state_txn( state_delta_by_room: Dict[str, DeltaState], stream_id: int, ): - logger.info( - "_update_current_state_txn state_delta_by_room=%s", state_delta_by_room - ) for room_id, delta_state in state_delta_by_room.items(): - logger.info( - "_update_current_state_txn room_id=%s delta_state=%s", - room_id, - delta_state, - exc_info=True, - ) - to_delete = delta_state.to_delete to_insert = delta_state.to_insert @@ -1054,21 +1037,11 @@ def _update_current_state_txn( users_in_room = self.store.get_users_in_room_txn(txn, room_id) members_changed.update(users_in_room) - logger.info( - "_update_current_state_txn no_longer_in_room deleting all state for room_id=%s (before)", - room_id, - ) - self.db_pool.simple_delete_txn( txn, table="current_state_events", keyvalues={"room_id": room_id}, ) - - logger.info( - "_update_current_state_txn no_longer_in_room deleting all state for room_id=%s (after)", - room_id, - ) else: # We're still in the room, so we update the current state as normal. @@ -1119,12 +1092,6 @@ def _update_current_state_txn( ), ) - logger.info( - "_update_current_state_txn inserting current_state_events to_insert=%s to_delete=%s", - to_insert, - to_delete, - ) - # We include the membership in the current state table, hence we do # a lookup when we insert. This assumes that all events have already # been inserted into room_memberships. diff --git a/synapse/storage/databases/main/state.py b/synapse/storage/databases/main/state.py index e1861c42d8c5..18ae8aee295d 100644 --- a/synapse/storage/databases/main/state.py +++ b/synapse/storage/databases/main/state.py @@ -179,10 +179,6 @@ async def get_create_event_for_room(self, room_id: str) -> EventBase: """ state_ids = await self.get_current_state_ids(room_id) - logger.info( - "get_create_event_for_room room_id=%s state_ids=%s", room_id, state_ids - ) - if not state_ids: raise NotFoundError(f"Current state for room {room_id} is empty") diff --git a/synapse/storage/persist_events.py b/synapse/storage/persist_events.py index 87438ad69896..97118045a1ad 100644 --- a/synapse/storage/persist_events.py +++ b/synapse/storage/persist_events.py @@ -313,15 +313,11 @@ async def persist_events( matched the transcation ID; the existing event is returned in such a case. """ - - # logger.info("persist_events events_and_contexts(%d)", len(events_and_contexts)) - partitioned: Dict[str, List[Tuple[EventBase, EventContext]]] = {} for event, ctx in events_and_contexts: partitioned.setdefault(event.room_id, []).append((event, ctx)) async def enqueue(item): - # logger.info("persist_events enqueue=%s", item) room_id, evs_ctxs = item return await self._event_persist_queue.add_to_queue( room_id, evs_ctxs, backfilled=backfilled @@ -454,12 +450,6 @@ async def _persist_event_batch( if not events_and_contexts: return replaced_events - logger.info( - "traceFrom(_persist_event_batch) for _update_current_state_txn events_and_contexts(%d) backfilled=%s", - len(events_and_contexts), - backfilled, - ) - # Check if any of the events have a transaction ID that has already been # persisted, and if so we don't persist it again. # @@ -525,9 +515,7 @@ async def _persist_event_batch( (event, context) ) - events_by_room_items = events_by_room.items() - - for room_id, ev_ctx_rm in events_by_room_items: + for room_id, ev_ctx_rm in events_by_room.items(): latest_event_ids = set( await self.main_store.get_latest_event_ids_in_room(room_id) ) @@ -535,8 +523,6 @@ async def _persist_event_batch( room_id, ev_ctx_rm, latest_event_ids ) - logger.info("persist_event_batch new_latest_event_ids=%s latest_event_ids=%s", new_latest_event_ids, latest_event_ids) - if new_latest_event_ids == latest_event_ids: # No change in extremities, so no change in state continue @@ -604,12 +590,6 @@ async def _persist_event_batch( new_forward_extremities[room_id] = new_latest_event_ids - # TODO: Left off here. need to see why - # `state_delta_for_room` is empty in the case where it's - # not working. Need to check how the delta is being - # calculated - logger.info("persist_event_batch delta_ids=%s", delta_ids) - # If either are not None then there has been a change, # and we need to work out the delta (or use that # given) From 32ff3a1d8d43873500dfa76295fbb343343c3728 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Thu, 12 May 2022 01:09:56 -0500 Subject: [PATCH 3/9] Process new marker state after remote join --- synapse/handlers/federation_event.py | 18 +++++++++++++++++- 1 file changed, 17 insertions(+), 1 deletion(-) diff --git a/synapse/handlers/federation_event.py b/synapse/handlers/federation_event.py index 6cf927e4ff7b..e0a042e8fe8f 100644 --- a/synapse/handlers/federation_event.py +++ b/synapse/handlers/federation_event.py @@ -475,7 +475,23 @@ async def process_remote_join( # and discover that we do not have it. event.internal_metadata.proactively_send = False - return await self.persist_events_and_notify(room_id, [(event, context)]) + stream_id_after_persist = await self.persist_events_and_notify( + room_id, [(event, context)] + ) + + # If we're joining the room again, check if there is new marker + # state indicating that there is new history imported somewhere in + # the DAG. + # + # Do this after the state from the remote join was persisted (via + # `persist_events_and_notify`). Otherwise we can run into a + # situation where the create event doesn't exist yet in the + # `current_state_events` + for e in state: + await self._handle_marker_event(origin, e) + # TODO: Loop through previous state to find other markers + + return stream_id_after_persist async def update_state_for_partial_state_event( self, destination: str, event: EventBase From 02903ee5b7884c29ad9db79b17485c6c7d9e8a3a Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Thu, 12 May 2022 03:41:52 -0500 Subject: [PATCH 4/9] Recursively grab state before each marker to check for other markers back in time --- scripts-dev/complement.sh | 2 +- synapse/handlers/federation_event.py | 33 ++++++++++++++++++++++++++-- 2 files changed, 32 insertions(+), 3 deletions(-) diff --git a/scripts-dev/complement.sh b/scripts-dev/complement.sh index 190df6909a6a..d63c1b2d054e 100755 --- a/scripts-dev/complement.sh +++ b/scripts-dev/complement.sh @@ -73,4 +73,4 @@ docker build -t $COMPLEMENT_BASE_IMAGE -f "docker/complement/$COMPLEMENT_DOCKERF # Run the tests! echo "Images built; running complement" cd "$COMPLEMENT_DIR" -go test -v -tags synapse_blacklist,msc2716,msc3030,faster_joins -count=1 "${extra_test_args[@]}" "$@" ./tests/... +go test -v -tags synapse_blacklist,msc2716,msc3030,faster_joins -count=1 "${extra_test_args[@]}" "$@" ./tests/ diff --git a/synapse/handlers/federation_event.py b/synapse/handlers/federation_event.py index e0a042e8fe8f..f81ae9e57e11 100644 --- a/synapse/handlers/federation_event.py +++ b/synapse/handlers/federation_event.py @@ -489,7 +489,6 @@ async def process_remote_join( # `current_state_events` for e in state: await self._handle_marker_event(origin, e) - # TODO: Loop through previous state to find other markers return stream_id_after_persist @@ -1031,7 +1030,7 @@ async def _get_state_after_missing_prev_event( async def _get_state_and_persist( self, destination: str, room_id: str, event_id: str - ) -> None: + ) -> List[EventBase]: """Get the complete room state at a given event, and persist any new events as outliers""" room_version = await self._store.get_room_version(room_id) @@ -1040,6 +1039,14 @@ async def _get_state_and_persist( ) logger.info("/state returned %i events", len(auth_events) + len(state_events)) + logger.info( + "_get_state_and_persist auth_events(%d)=%s state_events(%d)=%s", + len(auth_events), + auth_events, + len(state_events), + state_events, + ) + await self._auth_and_persist_outliers( room_id, itertools.chain(auth_events, state_events) ) @@ -1050,6 +1057,8 @@ async def _get_state_and_persist( destination=destination, room_id=room_id, event_ids=(event_id,) ) + return auth_events + state_events + async def _process_received_pdu( self, origin: str, @@ -1236,6 +1245,26 @@ async def _handle_marker_event(self, origin: str, marker_event: EventBase) -> No logger.debug("_handle_marker_event: received %s", marker_event) + # TODO: Move this to a background queue + async def handle_marker_queue(marker_event: EventBase) -> None: + # Get the state before the marker event + state_events = await self._get_state_and_persist( + origin, marker_event.room_id, marker_event.event_id + ) + logger.info( + "handle_marker_queue marker_event=%s state_events=%s", + marker_event.event_id, + state_events, + ) + + # TODO: No need to keep going if the marker is already `have_seen_event` + + for e in state_events: + await self._handle_marker_event(origin, e) + + # TODO: add_to_queue + await handle_marker_queue(marker_event) + insertion_event_id = marker_event.content.get( EventContentFields.MSC2716_MARKER_INSERTION ) From 4576c90239dcb4de068b8d746b031d8ecc29f64b Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Thu, 19 May 2022 00:20:28 -0500 Subject: [PATCH 5/9] Only process marker events in the current state --- synapse/handlers/federation_event.py | 24 +----------------------- 1 file changed, 1 insertion(+), 23 deletions(-) diff --git a/synapse/handlers/federation_event.py b/synapse/handlers/federation_event.py index f81ae9e57e11..554a959b57ac 100644 --- a/synapse/handlers/federation_event.py +++ b/synapse/handlers/federation_event.py @@ -1030,7 +1030,7 @@ async def _get_state_after_missing_prev_event( async def _get_state_and_persist( self, destination: str, room_id: str, event_id: str - ) -> List[EventBase]: + ) -> None: """Get the complete room state at a given event, and persist any new events as outliers""" room_version = await self._store.get_room_version(room_id) @@ -1057,8 +1057,6 @@ async def _get_state_and_persist( destination=destination, room_id=room_id, event_ids=(event_id,) ) - return auth_events + state_events - async def _process_received_pdu( self, origin: str, @@ -1245,26 +1243,6 @@ async def _handle_marker_event(self, origin: str, marker_event: EventBase) -> No logger.debug("_handle_marker_event: received %s", marker_event) - # TODO: Move this to a background queue - async def handle_marker_queue(marker_event: EventBase) -> None: - # Get the state before the marker event - state_events = await self._get_state_and_persist( - origin, marker_event.room_id, marker_event.event_id - ) - logger.info( - "handle_marker_queue marker_event=%s state_events=%s", - marker_event.event_id, - state_events, - ) - - # TODO: No need to keep going if the marker is already `have_seen_event` - - for e in state_events: - await self._handle_marker_event(origin, e) - - # TODO: add_to_queue - await handle_marker_queue(marker_event) - insertion_event_id = marker_event.content.get( EventContentFields.MSC2716_MARKER_INSERTION ) From 7f1f7ffa6f4ed4d67a55bd24bc793aa89596c0dc Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Thu, 19 May 2022 00:54:43 -0500 Subject: [PATCH 6/9] Remove debug log --- synapse/handlers/federation_event.py | 8 -------- 1 file changed, 8 deletions(-) diff --git a/synapse/handlers/federation_event.py b/synapse/handlers/federation_event.py index 554a959b57ac..061d4ae1c374 100644 --- a/synapse/handlers/federation_event.py +++ b/synapse/handlers/federation_event.py @@ -1039,14 +1039,6 @@ async def _get_state_and_persist( ) logger.info("/state returned %i events", len(auth_events) + len(state_events)) - logger.info( - "_get_state_and_persist auth_events(%d)=%s state_events(%d)=%s", - len(auth_events), - auth_events, - len(state_events), - state_events, - ) - await self._auth_and_persist_outliers( room_id, itertools.chain(auth_events, state_events) ) From 1d2928e1b3837cbebc8e9efc7c365a53e7bd2af8 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Thu, 19 May 2022 00:59:34 -0500 Subject: [PATCH 7/9] Add changelog --- changelog.d/12718.feature | 1 + scripts-dev/complement.sh | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) create mode 100644 changelog.d/12718.feature diff --git a/changelog.d/12718.feature b/changelog.d/12718.feature new file mode 100644 index 000000000000..1056f519a4c1 --- /dev/null +++ b/changelog.d/12718.feature @@ -0,0 +1 @@ +Update [MSC2716](https://github.com/matrix-org/matrix-spec-proposals/pull/2716) implementation to process marker events from the current state to avoid markers being lost in timeline gaps for federated servers which would cause the imported history to be undiscovered. diff --git a/scripts-dev/complement.sh b/scripts-dev/complement.sh index d63c1b2d054e..190df6909a6a 100755 --- a/scripts-dev/complement.sh +++ b/scripts-dev/complement.sh @@ -73,4 +73,4 @@ docker build -t $COMPLEMENT_BASE_IMAGE -f "docker/complement/$COMPLEMENT_DOCKERF # Run the tests! echo "Images built; running complement" cd "$COMPLEMENT_DIR" -go test -v -tags synapse_blacklist,msc2716,msc3030,faster_joins -count=1 "${extra_test_args[@]}" "$@" ./tests/ +go test -v -tags synapse_blacklist,msc2716,msc3030,faster_joins -count=1 "${extra_test_args[@]}" "$@" ./tests/... From c9b90514b6dde9b417d4029aa00bd24da1a3a7e8 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Thu, 19 May 2022 01:12:01 -0500 Subject: [PATCH 8/9] Avoid re-processing insertion events we have already seen --- synapse/handlers/federation_event.py | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/synapse/handlers/federation_event.py b/synapse/handlers/federation_event.py index 061d4ae1c374..2ab9af354ee5 100644 --- a/synapse/handlers/federation_event.py +++ b/synapse/handlers/federation_event.py @@ -481,7 +481,8 @@ async def process_remote_join( # If we're joining the room again, check if there is new marker # state indicating that there is new history imported somewhere in - # the DAG. + # the DAG. Multiple markers can exist in the current state with + # unique state_keys. # # Do this after the state from the remote join was persisted (via # `persist_events_and_notify`). Otherwise we can run into a @@ -1243,6 +1244,15 @@ async def _handle_marker_event(self, origin: str, marker_event: EventBase) -> No # Nothing to retrieve then (invalid marker) return + already_seen_insertion_event = await self._store.have_seen_event( + marker_event.room_id, + insertion_event_id + ) + if already_seen_insertion_event: + # No need to process a marker again if we have already seen the + # insertion event that it was pointing to + return + logger.debug( "_handle_marker_event: backfilling insertion event %s", insertion_event_id ) From 5a7f431bc465c4224a0fc396fdbe1db2fd037e2c Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Thu, 19 May 2022 01:15:49 -0500 Subject: [PATCH 9/9] Fix lint --- synapse/handlers/federation_event.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/synapse/handlers/federation_event.py b/synapse/handlers/federation_event.py index 2ab9af354ee5..9e3d4f4dd30c 100644 --- a/synapse/handlers/federation_event.py +++ b/synapse/handlers/federation_event.py @@ -1245,8 +1245,7 @@ async def _handle_marker_event(self, origin: str, marker_event: EventBase) -> No return already_seen_insertion_event = await self._store.have_seen_event( - marker_event.room_id, - insertion_event_id + marker_event.room_id, insertion_event_id ) if already_seen_insertion_event: # No need to process a marker again if we have already seen the