|
1 | 1 | # -*- coding: utf-8 -*-
|
2 | 2 | # Copyright 2017, 2018 New Vector Ltd
|
| 3 | +# Copyright 2019 Matrix.org Foundation C.I.C. |
3 | 4 | #
|
4 | 5 | # Licensed under the Apache License, Version 2.0 (the "License");
|
5 | 6 | # you may not use this file except in compliance with the License.
|
@@ -103,14 +104,35 @@ def delete_room_keys(self, user_id, version, room_id=None, session_id=None):
|
103 | 104 | rooms
|
104 | 105 | session_id(string): session ID to delete keys for, for None to delete keys
|
105 | 106 | for all sessions
|
| 107 | + Raises: |
| 108 | + NotFoundError: if the backup version does not exist |
106 | 109 | Returns:
|
107 |
| - A deferred of the deletion transaction |
| 110 | + A dict containing the count and etag for the backup version |
108 | 111 | """
|
109 | 112 |
|
110 | 113 | # lock for consistency with uploading
|
111 | 114 | with (yield self._upload_linearizer.queue(user_id)):
|
| 115 | + # make sure the backup version exists |
| 116 | + try: |
| 117 | + version_info = yield self.store.get_e2e_room_keys_version_info( |
| 118 | + user_id, version |
| 119 | + ) |
| 120 | + except StoreError as e: |
| 121 | + if e.code == 404: |
| 122 | + raise NotFoundError("Unknown backup version") |
| 123 | + else: |
| 124 | + raise |
| 125 | + |
112 | 126 | yield self.store.delete_e2e_room_keys(user_id, version, room_id, session_id)
|
113 | 127 |
|
| 128 | + version_etag = version_info["etag"] + 1 |
| 129 | + yield self.store.update_e2e_room_keys_version( |
| 130 | + user_id, version, None, version_etag |
| 131 | + ) |
| 132 | + |
| 133 | + count = yield self.store.count_e2e_room_keys(user_id, version) |
| 134 | + return {"etag": str(version_etag), "count": count} |
| 135 | + |
114 | 136 | @trace
|
115 | 137 | @defer.inlineCallbacks
|
116 | 138 | def upload_room_keys(self, user_id, version, room_keys):
|
@@ -138,6 +160,9 @@ def upload_room_keys(self, user_id, version, room_keys):
|
138 | 160 | }
|
139 | 161 | }
|
140 | 162 |
|
| 163 | + Returns: |
| 164 | + A dict containing the count and etag for the backup version |
| 165 | +
|
141 | 166 | Raises:
|
142 | 167 | NotFoundError: if there are no versions defined
|
143 | 168 | RoomKeysVersionError: if the uploaded version is not the current version
|
@@ -171,59 +196,62 @@ def upload_room_keys(self, user_id, version, room_keys):
|
171 | 196 | else:
|
172 | 197 | raise
|
173 | 198 |
|
174 |
| - # go through the room_keys. |
175 |
| - # XXX: this should/could be done concurrently, given we're in a lock. |
| 199 | + # Fetch any existing room keys for the sessions that have been |
| 200 | + # submitted. Then compare them with the submitted keys. If the |
| 201 | + # key is new, insert it; if the key should be updated, then update |
| 202 | + # it; otherwise, drop it. |
| 203 | + existing_keys = yield self.store.get_e2e_room_keys_multi( |
| 204 | + user_id, version, room_keys["rooms"] |
| 205 | + ) |
| 206 | + to_insert = [] # batch the inserts together |
| 207 | + changed = False # if anything has changed, we need to update the etag |
176 | 208 | for room_id, room in iteritems(room_keys["rooms"]):
|
177 |
| - for session_id, session in iteritems(room["sessions"]): |
178 |
| - yield self._upload_room_key( |
179 |
| - user_id, version, room_id, session_id, session |
| 209 | + for session_id, room_key in iteritems(room["sessions"]): |
| 210 | + log_kv( |
| 211 | + { |
| 212 | + "message": "Trying to upload room key", |
| 213 | + "room_id": room_id, |
| 214 | + "session_id": session_id, |
| 215 | + "user_id": user_id, |
| 216 | + } |
180 | 217 | )
|
181 |
| - |
182 |
| - @defer.inlineCallbacks |
183 |
| - def _upload_room_key(self, user_id, version, room_id, session_id, room_key): |
184 |
| - """Upload a given room_key for a given room and session into a given |
185 |
| - version of the backup. Merges the key with any which might already exist. |
186 |
| -
|
187 |
| - Args: |
188 |
| - user_id(str): the user whose backup we're setting |
189 |
| - version(str): the version ID of the backup we're updating |
190 |
| - room_id(str): the ID of the room whose keys we're setting |
191 |
| - session_id(str): the session whose room_key we're setting |
192 |
| - room_key(dict): the room_key being set |
193 |
| - """ |
194 |
| - log_kv( |
195 |
| - { |
196 |
| - "message": "Trying to upload room key", |
197 |
| - "room_id": room_id, |
198 |
| - "session_id": session_id, |
199 |
| - "user_id": user_id, |
200 |
| - } |
201 |
| - ) |
202 |
| - # get the room_key for this particular row |
203 |
| - current_room_key = None |
204 |
| - try: |
205 |
| - current_room_key = yield self.store.get_e2e_room_key( |
206 |
| - user_id, version, room_id, session_id |
207 |
| - ) |
208 |
| - except StoreError as e: |
209 |
| - if e.code == 404: |
210 |
| - log_kv( |
211 |
| - { |
212 |
| - "message": "Room key not found.", |
213 |
| - "room_id": room_id, |
214 |
| - "user_id": user_id, |
215 |
| - } |
| 218 | + current_room_key = existing_keys.get(room_id, {}).get(session_id) |
| 219 | + if current_room_key: |
| 220 | + if self._should_replace_room_key(current_room_key, room_key): |
| 221 | + log_kv({"message": "Replacing room key."}) |
| 222 | + # updates are done one at a time in the DB, so send |
| 223 | + # updates right away rather than batching them up, |
| 224 | + # like we do with the inserts |
| 225 | + yield self.store.update_e2e_room_key( |
| 226 | + user_id, version, room_id, session_id, room_key |
| 227 | + ) |
| 228 | + changed = True |
| 229 | + else: |
| 230 | + log_kv({"message": "Not replacing room_key."}) |
| 231 | + else: |
| 232 | + log_kv( |
| 233 | + { |
| 234 | + "message": "Room key not found.", |
| 235 | + "room_id": room_id, |
| 236 | + "user_id": user_id, |
| 237 | + } |
| 238 | + ) |
| 239 | + log_kv({"message": "Replacing room key."}) |
| 240 | + to_insert.append((room_id, session_id, room_key)) |
| 241 | + changed = True |
| 242 | + |
| 243 | + if len(to_insert): |
| 244 | + yield self.store.add_e2e_room_keys(user_id, version, to_insert) |
| 245 | + |
| 246 | + version_etag = version_info["etag"] |
| 247 | + if changed: |
| 248 | + version_etag = version_etag + 1 |
| 249 | + yield self.store.update_e2e_room_keys_version( |
| 250 | + user_id, version, None, version_etag |
216 | 251 | )
|
217 |
| - else: |
218 |
| - raise |
219 | 252 |
|
220 |
| - if self._should_replace_room_key(current_room_key, room_key): |
221 |
| - log_kv({"message": "Replacing room key."}) |
222 |
| - yield self.store.set_e2e_room_key( |
223 |
| - user_id, version, room_id, session_id, room_key |
224 |
| - ) |
225 |
| - else: |
226 |
| - log_kv({"message": "Not replacing room_key."}) |
| 253 | + count = yield self.store.count_e2e_room_keys(user_id, version) |
| 254 | + return {"etag": str(version_etag), "count": count} |
227 | 255 |
|
228 | 256 | @staticmethod
|
229 | 257 | def _should_replace_room_key(current_room_key, room_key):
|
@@ -314,6 +342,8 @@ def get_version_info(self, user_id, version=None):
|
314 | 342 | raise NotFoundError("Unknown backup version")
|
315 | 343 | else:
|
316 | 344 | raise
|
| 345 | + |
| 346 | + res["count"] = yield self.store.count_e2e_room_keys(user_id, res["version"]) |
317 | 347 | return res
|
318 | 348 |
|
319 | 349 | @trace
|
|
0 commit comments