Skip to content
This repository was archived by the owner on Apr 26, 2024. It is now read-only.

Commit 46f4be9

Browse files
authored
Fix race for concurrent downloads of remote media. (#8682)
Fixes #6755
1 parent 4504151 commit 46f4be9

File tree

6 files changed

+431
-71
lines changed

6 files changed

+431
-71
lines changed

changelog.d/8682.bugfix

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Fix exception during handling multiple concurrent requests for remote media when using multiple media repositories.

synapse/rest/media/v1/media_repository.py

Lines changed: 105 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -305,15 +305,12 @@ async def _get_remote_media_impl(
305305
# file_id is the ID we use to track the file locally. If we've already
306306
# seen the file then reuse the existing ID, otherwise genereate a new
307307
# one.
308-
if media_info:
309-
file_id = media_info["filesystem_id"]
310-
else:
311-
file_id = random_string(24)
312-
313-
file_info = FileInfo(server_name, file_id)
314308

315309
# If we have an entry in the DB, try and look for it
316310
if media_info:
311+
file_id = media_info["filesystem_id"]
312+
file_info = FileInfo(server_name, file_id)
313+
317314
if media_info["quarantined_by"]:
318315
logger.info("Media is quarantined")
319316
raise NotFoundError()
@@ -324,14 +321,34 @@ async def _get_remote_media_impl(
324321

325322
# Failed to find the file anywhere, lets download it.
326323

327-
media_info = await self._download_remote_file(server_name, media_id, file_id)
324+
try:
325+
media_info = await self._download_remote_file(server_name, media_id,)
326+
except SynapseError:
327+
raise
328+
except Exception as e:
329+
# An exception may be because we downloaded media in another
330+
# process, so let's check if we magically have the media.
331+
media_info = await self.store.get_cached_remote_media(server_name, media_id)
332+
if not media_info:
333+
raise e
334+
335+
file_id = media_info["filesystem_id"]
336+
file_info = FileInfo(server_name, file_id)
337+
338+
# We generate thumbnails even if another process downloaded the media
339+
# as a) it's conceivable that the other download request dies before it
340+
# generates thumbnails, but mainly b) we want to be sure the thumbnails
341+
# have finished being generated before responding to the client,
342+
# otherwise they'll request thumbnails and get a 404 if they're not
343+
# ready yet.
344+
await self._generate_thumbnails(
345+
server_name, media_id, file_id, media_info["media_type"]
346+
)
328347

329348
responder = await self.media_storage.fetch_media(file_info)
330349
return responder, media_info
331350

332-
async def _download_remote_file(
333-
self, server_name: str, media_id: str, file_id: str
334-
) -> dict:
351+
async def _download_remote_file(self, server_name: str, media_id: str,) -> dict:
335352
"""Attempt to download the remote file from the given server name,
336353
using the given file_id as the local id.
337354
@@ -346,6 +363,8 @@ async def _download_remote_file(
346363
The media info of the file.
347364
"""
348365

366+
file_id = random_string(24)
367+
349368
file_info = FileInfo(server_name=server_name, file_id=file_id)
350369

351370
with self.media_storage.store_into_file(file_info) as (f, fname, finish):
@@ -401,22 +420,32 @@ async def _download_remote_file(
401420

402421
await finish()
403422

404-
media_type = headers[b"Content-Type"][0].decode("ascii")
405-
upload_name = get_filename_from_headers(headers)
406-
time_now_ms = self.clock.time_msec()
423+
media_type = headers[b"Content-Type"][0].decode("ascii")
424+
upload_name = get_filename_from_headers(headers)
425+
time_now_ms = self.clock.time_msec()
426+
427+
# Multiple remote media download requests can race (when using
428+
# multiple media repos), so this may throw a violation constraint
429+
# exception. If it does we'll delete the newly downloaded file from
430+
# disk (as we're in the ctx manager).
431+
#
432+
# However: we've already called `finish()` so we may have also
433+
# written to the storage providers. This is preferable to the
434+
# alternative where we call `finish()` *after* this, where we could
435+
# end up having an entry in the DB but fail to write the files to
436+
# the storage providers.
437+
await self.store.store_cached_remote_media(
438+
origin=server_name,
439+
media_id=media_id,
440+
media_type=media_type,
441+
time_now_ms=self.clock.time_msec(),
442+
upload_name=upload_name,
443+
media_length=length,
444+
filesystem_id=file_id,
445+
)
407446

408447
logger.info("Stored remote media in file %r", fname)
409448

410-
await self.store.store_cached_remote_media(
411-
origin=server_name,
412-
media_id=media_id,
413-
media_type=media_type,
414-
time_now_ms=self.clock.time_msec(),
415-
upload_name=upload_name,
416-
media_length=length,
417-
filesystem_id=file_id,
418-
)
419-
420449
media_info = {
421450
"media_type": media_type,
422451
"media_length": length,
@@ -425,8 +454,6 @@ async def _download_remote_file(
425454
"filesystem_id": file_id,
426455
}
427456

428-
await self._generate_thumbnails(server_name, media_id, file_id, media_type)
429-
430457
return media_info
431458

432459
def _get_thumbnail_requirements(self, media_type):
@@ -692,42 +719,60 @@ async def _generate_thumbnails(
692719
if not t_byte_source:
693720
continue
694721

695-
try:
696-
file_info = FileInfo(
697-
server_name=server_name,
698-
file_id=file_id,
699-
thumbnail=True,
700-
thumbnail_width=t_width,
701-
thumbnail_height=t_height,
702-
thumbnail_method=t_method,
703-
thumbnail_type=t_type,
704-
url_cache=url_cache,
705-
)
706-
707-
output_path = await self.media_storage.store_file(
708-
t_byte_source, file_info
709-
)
710-
finally:
711-
t_byte_source.close()
712-
713-
t_len = os.path.getsize(output_path)
722+
file_info = FileInfo(
723+
server_name=server_name,
724+
file_id=file_id,
725+
thumbnail=True,
726+
thumbnail_width=t_width,
727+
thumbnail_height=t_height,
728+
thumbnail_method=t_method,
729+
thumbnail_type=t_type,
730+
url_cache=url_cache,
731+
)
714732

715-
# Write to database
716-
if server_name:
717-
await self.store.store_remote_media_thumbnail(
718-
server_name,
719-
media_id,
720-
file_id,
721-
t_width,
722-
t_height,
723-
t_type,
724-
t_method,
725-
t_len,
726-
)
727-
else:
728-
await self.store.store_local_thumbnail(
729-
media_id, t_width, t_height, t_type, t_method, t_len
730-
)
733+
with self.media_storage.store_into_file(file_info) as (f, fname, finish):
734+
try:
735+
await self.media_storage.write_to_file(t_byte_source, f)
736+
await finish()
737+
finally:
738+
t_byte_source.close()
739+
740+
t_len = os.path.getsize(fname)
741+
742+
# Write to database
743+
if server_name:
744+
# Multiple remote media download requests can race (when
745+
# using multiple media repos), so this may throw a violation
746+
# constraint exception. If it does we'll delete the newly
747+
# generated thumbnail from disk (as we're in the ctx
748+
# manager).
749+
#
750+
# However: we've already called `finish()` so we may have
751+
# also written to the storage providers. This is preferable
752+
# to the alternative where we call `finish()` *after* this,
753+
# where we could end up having an entry in the DB but fail
754+
# to write the files to the storage providers.
755+
try:
756+
await self.store.store_remote_media_thumbnail(
757+
server_name,
758+
media_id,
759+
file_id,
760+
t_width,
761+
t_height,
762+
t_type,
763+
t_method,
764+
t_len,
765+
)
766+
except Exception as e:
767+
thumbnail_exists = await self.store.get_remote_media_thumbnail(
768+
server_name, media_id, t_width, t_height, t_type,
769+
)
770+
if not thumbnail_exists:
771+
raise e
772+
else:
773+
await self.store.store_local_thumbnail(
774+
media_id, t_width, t_height, t_type, t_method, t_len
775+
)
731776

732777
return {"width": m_width, "height": m_height}
733778

synapse/rest/media/v1/media_storage.py

Lines changed: 20 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ def __init__(
5252
storage_providers: Sequence["StorageProviderWrapper"],
5353
):
5454
self.hs = hs
55+
self.reactor = hs.get_reactor()
5556
self.local_media_directory = local_media_directory
5657
self.filepaths = filepaths
5758
self.storage_providers = storage_providers
@@ -70,13 +71,16 @@ async def store_file(self, source: IO, file_info: FileInfo) -> str:
7071

7172
with self.store_into_file(file_info) as (f, fname, finish_cb):
7273
# Write to the main repository
73-
await defer_to_thread(
74-
self.hs.get_reactor(), _write_file_synchronously, source, f
75-
)
74+
await self.write_to_file(source, f)
7675
await finish_cb()
7776

7877
return fname
7978

79+
async def write_to_file(self, source: IO, output: IO):
80+
"""Asynchronously write the `source` to `output`.
81+
"""
82+
await defer_to_thread(self.reactor, _write_file_synchronously, source, output)
83+
8084
@contextlib.contextmanager
8185
def store_into_file(self, file_info: FileInfo):
8286
"""Context manager used to get a file like object to write into, as
@@ -112,14 +116,20 @@ def store_into_file(self, file_info: FileInfo):
112116

113117
finished_called = [False]
114118

115-
async def finish():
116-
for provider in self.storage_providers:
117-
await provider.store_file(path, file_info)
118-
119-
finished_called[0] = True
120-
121119
try:
122120
with open(fname, "wb") as f:
121+
122+
async def finish():
123+
# Ensure that all writes have been flushed and close the
124+
# file.
125+
f.flush()
126+
f.close()
127+
128+
for provider in self.storage_providers:
129+
await provider.store_file(path, file_info)
130+
131+
finished_called[0] = True
132+
123133
yield f, fname, finish
124134
except Exception:
125135
try:
@@ -210,7 +220,7 @@ async def ensure_media_is_in_local_cache(self, file_info: FileInfo) -> str:
210220
if res:
211221
with res:
212222
consumer = BackgroundFileConsumer(
213-
open(local_path, "wb"), self.hs.get_reactor()
223+
open(local_path, "wb"), self.reactor
214224
)
215225
await res.write_to_consumer(consumer)
216226
await consumer.wait()

synapse/storage/databases/main/media_repository.py

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -452,6 +452,33 @@ async def get_remote_media_thumbnails(
452452
desc="get_remote_media_thumbnails",
453453
)
454454

455+
async def get_remote_media_thumbnail(
456+
self, origin: str, media_id: str, t_width: int, t_height: int, t_type: str,
457+
) -> Optional[Dict[str, Any]]:
458+
"""Fetch the thumbnail info of given width, height and type.
459+
"""
460+
461+
return await self.db_pool.simple_select_one(
462+
table="remote_media_cache_thumbnails",
463+
keyvalues={
464+
"media_origin": origin,
465+
"media_id": media_id,
466+
"thumbnail_width": t_width,
467+
"thumbnail_height": t_height,
468+
"thumbnail_type": t_type,
469+
},
470+
retcols=(
471+
"thumbnail_width",
472+
"thumbnail_height",
473+
"thumbnail_method",
474+
"thumbnail_type",
475+
"thumbnail_length",
476+
"filesystem_id",
477+
),
478+
allow_none=True,
479+
desc="get_remote_media_thumbnail",
480+
)
481+
455482
async def store_remote_media_thumbnail(
456483
self,
457484
origin,

0 commit comments

Comments
 (0)