55
55
from synapse .replication .tcp .streams import BackfillStream
56
56
from synapse .replication .tcp .streams .events import EventsStream
57
57
from synapse .storage ._base import SQLBaseStore , db_to_json , make_in_list_sql_clause
58
- from synapse .storage .database import DatabasePool
58
+ from synapse .storage .database import DatabasePool , LoggingTransaction
59
59
from synapse .storage .engines import PostgresEngine
60
+ from synapse .storage .types import Connection
60
61
from synapse .storage .util .id_generators import MultiWriterIdGenerator , StreamIdGenerator
61
62
from synapse .storage .util .sequence import build_sequence_generator
62
63
from synapse .types import JsonDict , get_domain_from_id
@@ -86,6 +87,47 @@ class _EventCacheEntry:
86
87
redacted_event : Optional [EventBase ]
87
88
88
89
90
+ @attr .s (slots = True , frozen = True , auto_attribs = True )
91
+ class _EventRow :
92
+ """
93
+ An event, as pulled from the database.
94
+
95
+ Properties:
96
+ event_id: The event ID of the event.
97
+
98
+ stream_ordering: stream ordering for this event
99
+
100
+ json: json-encoded event structure
101
+
102
+ internal_metadata: json-encoded internal metadata dict
103
+
104
+ format_version: The format of the event. Hopefully one of EventFormatVersions.
105
+ 'None' means the event predates EventFormatVersions (so the event is format V1).
106
+
107
+ room_version_id: The version of the room which contains the event. Hopefully
108
+ one of RoomVersions.
109
+
110
+ Due to historical reasons, there may be a few events in the database which
111
+ do not have an associated room; in this case None will be returned here.
112
+
113
+ rejected_reason: if the event was rejected, the reason why.
114
+
115
+ redactions: a list of event-ids which (claim to) redact this event.
116
+
117
+ outlier: True if this event is an outlier.
118
+ """
119
+
120
+ event_id : str
121
+ stream_ordering : int
122
+ json : str
123
+ internal_metadata : str
124
+ format_version : Optional [int ]
125
+ room_version_id : Optional [int ]
126
+ rejected_reason : Optional [str ]
127
+ redactions : List [str ]
128
+ outlier : bool
129
+
130
+
89
131
class EventRedactBehaviour (Names ):
90
132
"""
91
133
What to do when retrieving a redacted event from the database.
@@ -686,7 +728,7 @@ async def get_stripped_room_state_from_event_context(
686
728
for e in state_to_include .values ()
687
729
]
688
730
689
- def _do_fetch (self , conn ) :
731
+ def _do_fetch (self , conn : Connection ) -> None :
690
732
"""Takes a database connection and waits for requests for events from
691
733
the _event_fetch_list queue.
692
734
"""
@@ -713,13 +755,15 @@ def _do_fetch(self, conn):
713
755
714
756
self ._fetch_event_list (conn , event_list )
715
757
716
- def _fetch_event_list (self , conn , event_list ):
758
+ def _fetch_event_list (
759
+ self , conn : Connection , event_list : List [Tuple [List [str ], defer .Deferred ]]
760
+ ) -> None :
717
761
"""Handle a load of requests from the _event_fetch_list queue
718
762
719
763
Args:
720
- conn (twisted.enterprise.adbapi.Connection) : database connection
764
+ conn: database connection
721
765
722
- event_list (list[Tuple[list[str], Deferred]]) :
766
+ event_list:
723
767
The fetch requests. Each entry consists of a list of event
724
768
ids to be fetched, and a deferred to be completed once the
725
769
events have been fetched.
@@ -788,7 +832,7 @@ async def _get_events_from_db(
788
832
row = row_map .get (event_id )
789
833
fetched_events [event_id ] = row
790
834
if row :
791
- redaction_ids .update (row [ " redactions" ] )
835
+ redaction_ids .update (row . redactions )
792
836
793
837
events_to_fetch = redaction_ids .difference (fetched_events .keys ())
794
838
if events_to_fetch :
@@ -799,32 +843,32 @@ async def _get_events_from_db(
799
843
for event_id , row in fetched_events .items ():
800
844
if not row :
801
845
continue
802
- assert row [ " event_id" ] == event_id
846
+ assert row . event_id == event_id
803
847
804
- rejected_reason = row [ " rejected_reason" ]
848
+ rejected_reason = row . rejected_reason
805
849
806
850
# If the event or metadata cannot be parsed, log the error and act
807
851
# as if the event is unknown.
808
852
try :
809
- d = db_to_json (row [ " json" ] )
853
+ d = db_to_json (row . json )
810
854
except ValueError :
811
855
logger .error ("Unable to parse json from event: %s" , event_id )
812
856
continue
813
857
try :
814
- internal_metadata = db_to_json (row [ " internal_metadata" ] )
858
+ internal_metadata = db_to_json (row . internal_metadata )
815
859
except ValueError :
816
860
logger .error (
817
861
"Unable to parse internal_metadata from event: %s" , event_id
818
862
)
819
863
continue
820
864
821
- format_version = row [ " format_version" ]
865
+ format_version = row . format_version
822
866
if format_version is None :
823
867
# This means that we stored the event before we had the concept
824
868
# of a event format version, so it must be a V1 event.
825
869
format_version = EventFormatVersions .V1
826
870
827
- room_version_id = row [ " room_version_id" ]
871
+ room_version_id = row . room_version_id
828
872
829
873
if not room_version_id :
830
874
# this should only happen for out-of-band membership events which
@@ -889,16 +933,16 @@ async def _get_events_from_db(
889
933
internal_metadata_dict = internal_metadata ,
890
934
rejected_reason = rejected_reason ,
891
935
)
892
- original_ev .internal_metadata .stream_ordering = row [ " stream_ordering" ]
893
- original_ev .internal_metadata .outlier = row [ " outlier" ]
936
+ original_ev .internal_metadata .stream_ordering = row . stream_ordering
937
+ original_ev .internal_metadata .outlier = row . outlier
894
938
895
939
event_map [event_id ] = original_ev
896
940
897
941
# finally, we can decide whether each one needs redacting, and build
898
942
# the cache entries.
899
943
result_map = {}
900
944
for event_id , original_ev in event_map .items ():
901
- redactions = fetched_events [event_id ][ " redactions" ]
945
+ redactions = fetched_events [event_id ]. redactions
902
946
redacted_event = self ._maybe_redact_event_row (
903
947
original_ev , redactions , event_map
904
948
)
@@ -912,17 +956,17 @@ async def _get_events_from_db(
912
956
913
957
return result_map
914
958
915
- async def _enqueue_events (self , events ) :
959
+ async def _enqueue_events (self , events : Iterable [ str ]) -> Dict [ str , _EventRow ] :
916
960
"""Fetches events from the database using the _event_fetch_list. This
917
961
allows batch and bulk fetching of events - it allows us to fetch events
918
962
without having to create a new transaction for each request for events.
919
963
920
964
Args:
921
- events (Iterable[str]) : events to be fetched.
965
+ events: events to be fetched.
922
966
923
967
Returns:
924
- Dict[str, Dict]: map from event id to row data from the database.
925
- May contain events that weren't requested.
968
+ A map from event id to row data from the database. May contain events
969
+ that weren't requested.
926
970
"""
927
971
928
972
events_d = defer .Deferred ()
@@ -949,43 +993,19 @@ async def _enqueue_events(self, events):
949
993
950
994
return row_map
951
995
952
- def _fetch_event_rows (self , txn , event_ids ):
996
+ def _fetch_event_rows (
997
+ self , txn : LoggingTransaction , event_ids : Iterable [str ]
998
+ ) -> Dict [str , _EventRow ]:
953
999
"""Fetch event rows from the database
954
1000
955
1001
Events which are not found are omitted from the result.
956
1002
957
- The returned per-event dicts contain the following keys:
958
-
959
- * event_id (str)
960
-
961
- * stream_ordering (int): stream ordering for this event
962
-
963
- * json (str): json-encoded event structure
964
-
965
- * internal_metadata (str): json-encoded internal metadata dict
966
-
967
- * format_version (int|None): The format of the event. Hopefully one
968
- of EventFormatVersions. 'None' means the event predates
969
- EventFormatVersions (so the event is format V1).
970
-
971
- * room_version_id (str|None): The version of the room which contains the event.
972
- Hopefully one of RoomVersions.
973
-
974
- Due to historical reasons, there may be a few events in the database which
975
- do not have an associated room; in this case None will be returned here.
976
-
977
- * rejected_reason (str|None): if the event was rejected, the reason
978
- why.
979
-
980
- * redactions (List[str]): a list of event-ids which (claim to) redact
981
- this event.
982
-
983
1003
Args:
984
- txn (twisted.enterprise.adbapi.Connection):
985
- event_ids (Iterable[str]) : event IDs to fetch
1004
+ txn: The database transaction.
1005
+ event_ids: event IDs to fetch
986
1006
987
1007
Returns:
988
- Dict[str, Dict]: a map from event id to event info.
1008
+ A map from event id to event info.
989
1009
"""
990
1010
event_dict = {}
991
1011
for evs in batch_iter (event_ids , 200 ):
@@ -1013,17 +1033,17 @@ def _fetch_event_rows(self, txn, event_ids):
1013
1033
1014
1034
for row in txn :
1015
1035
event_id = row [0 ]
1016
- event_dict [event_id ] = {
1017
- " event_id" : event_id ,
1018
- " stream_ordering" : row [1 ],
1019
- " internal_metadata" : row [2 ],
1020
- " json" : row [3 ],
1021
- " format_version" : row [4 ],
1022
- " room_version_id" : row [5 ],
1023
- " rejected_reason" : row [6 ],
1024
- " redactions" : [],
1025
- " outlier" : row [7 ],
1026
- }
1036
+ event_dict [event_id ] = _EventRow (
1037
+ event_id = event_id ,
1038
+ stream_ordering = row [1 ],
1039
+ internal_metadata = row [2 ],
1040
+ json = row [3 ],
1041
+ format_version = row [4 ],
1042
+ room_version_id = row [5 ],
1043
+ rejected_reason = row [6 ],
1044
+ redactions = [],
1045
+ outlier = row [7 ],
1046
+ )
1027
1047
1028
1048
# check for redactions
1029
1049
redactions_sql = "SELECT event_id, redacts FROM redactions WHERE "
@@ -1035,7 +1055,7 @@ def _fetch_event_rows(self, txn, event_ids):
1035
1055
for (redacter , redacted ) in txn :
1036
1056
d = event_dict .get (redacted )
1037
1057
if d :
1038
- d [ " redactions" ] .append (redacter )
1058
+ d . redactions .append (redacter )
1039
1059
1040
1060
return event_dict
1041
1061
0 commit comments