27
27
IO ,
28
28
TYPE_CHECKING ,
29
29
Any ,
30
- Awaitable ,
30
+ AsyncIterator ,
31
31
BinaryIO ,
32
32
Callable ,
33
- Generator ,
34
33
Optional ,
35
34
Sequence ,
36
35
Tuple ,
@@ -97,11 +96,9 @@ async def store_file(self, source: IO, file_info: FileInfo) -> str:
97
96
the file path written to in the primary media store
98
97
"""
99
98
100
- with self .store_into_file (file_info ) as (f , fname , finish_cb ):
99
+ async with self .store_into_file (file_info ) as (f , fname ):
101
100
# Write to the main media repository
102
101
await self .write_to_file (source , f )
103
- # Write to the other storage providers
104
- await finish_cb ()
105
102
106
103
return fname
107
104
@@ -111,32 +108,27 @@ async def write_to_file(self, source: IO, output: IO) -> None:
111
108
await defer_to_thread (self .reactor , _write_file_synchronously , source , output )
112
109
113
110
@trace_with_opname ("MediaStorage.store_into_file" )
114
- @contextlib .contextmanager
115
- def store_into_file (
111
+ @contextlib .asynccontextmanager
112
+ async def store_into_file (
116
113
self , file_info : FileInfo
117
- ) -> Generator [Tuple [BinaryIO , str , Callable [[], Awaitable [ None ]]], None , None ]:
118
- """Context manager used to get a file like object to write into, as
114
+ ) -> AsyncIterator [Tuple [BinaryIO , str ] ]:
115
+ """Async Context manager used to get a file like object to write into, as
119
116
described by file_info.
120
117
121
- Actually yields a 3 -tuple (file, fname, finish_cb ), where file is a file
122
- like object that can be written to, fname is the absolute path of file
123
- on disk, and finish_cb is a function that returns an awaitable .
118
+ Actually yields a 2 -tuple (file, fname,), where file is a file
119
+ like object that can be written to and fname is the absolute path of file
120
+ on disk.
124
121
125
122
fname can be used to read the contents from after upload, e.g. to
126
123
generate thumbnails.
127
124
128
- finish_cb must be called and waited on after the file has been successfully been
129
- written to. Should not be called if there was an error. Checks for spam and
130
- stores the file into the configured storage providers.
131
-
132
125
Args:
133
126
file_info: Info about the file to store
134
127
135
128
Example:
136
129
137
- with media_storage.store_into_file(info) as (f, fname, finish_cb ):
130
+ async with media_storage.store_into_file(info) as (f, fname,):
138
131
# .. write into f ...
139
- await finish_cb()
140
132
"""
141
133
142
134
path = self ._file_info_to_path (file_info )
@@ -145,62 +137,42 @@ def store_into_file(
145
137
dirname = os .path .dirname (fname )
146
138
os .makedirs (dirname , exist_ok = True )
147
139
148
- finished_called = [False ]
149
-
150
140
main_media_repo_write_trace_scope = start_active_span (
151
141
"writing to main media repo"
152
142
)
153
143
main_media_repo_write_trace_scope .__enter__ ()
154
144
155
- try :
156
- with open (fname , "wb" ) as f :
157
-
158
- async def finish () -> None :
159
- # When someone calls finish, we assume they are done writing to the main media repo
160
- main_media_repo_write_trace_scope .__exit__ (None , None , None )
161
-
162
- with start_active_span ("writing to other storage providers" ):
163
- # Ensure that all writes have been flushed and close the
164
- # file.
165
- f .flush ()
166
- f .close ()
167
-
168
- spam_check = await self ._spam_checker_module_callbacks .check_media_file_for_spam (
169
- ReadableFileWrapper (self .clock , fname ), file_info
170
- )
171
- if spam_check != self ._spam_checker_module_callbacks .NOT_SPAM :
172
- logger .info ("Blocking media due to spam checker" )
173
- # Note that we'll delete the stored media, due to the
174
- # try/except below. The media also won't be stored in
175
- # the DB.
176
- # We currently ignore any additional field returned by
177
- # the spam-check API.
178
- raise SpamMediaException (errcode = spam_check [0 ])
179
-
180
- for provider in self .storage_providers :
181
- with start_active_span (str (provider )):
182
- await provider .store_file (path , file_info )
183
-
184
- finished_called [0 ] = True
185
-
186
- yield f , fname , finish
187
- except Exception as e :
145
+ with main_media_repo_write_trace_scope :
188
146
try :
189
- main_media_repo_write_trace_scope .__exit__ (
190
- type (e ), None , e .__traceback__
191
- )
192
- os .remove (fname )
193
- except Exception :
194
- pass
147
+ with open (fname , "wb" ) as f :
148
+ yield f , fname
195
149
196
- raise e from None
150
+ except Exception as e :
151
+ try :
152
+ os .remove (fname )
153
+ except Exception :
154
+ pass
197
155
198
- if not finished_called :
199
- exc = Exception ("Finished callback not called" )
200
- main_media_repo_write_trace_scope .__exit__ (
201
- type (exc ), None , exc .__traceback__
156
+ raise e from None
157
+
158
+ with start_active_span ("writing to other storage providers" ):
159
+ spam_check = (
160
+ await self ._spam_checker_module_callbacks .check_media_file_for_spam (
161
+ ReadableFileWrapper (self .clock , fname ), file_info
162
+ )
202
163
)
203
- raise exc
164
+ if spam_check != self ._spam_checker_module_callbacks .NOT_SPAM :
165
+ logger .info ("Blocking media due to spam checker" )
166
+ # Note that we'll delete the stored media, due to the
167
+ # try/except below. The media also won't be stored in
168
+ # the DB.
169
+ # We currently ignore any additional field returned by
170
+ # the spam-check API.
171
+ raise SpamMediaException (errcode = spam_check [0 ])
172
+
173
+ for provider in self .storage_providers :
174
+ with start_active_span (str (provider )):
175
+ await provider .store_file (path , file_info )
204
176
205
177
async def fetch_media (self , file_info : FileInfo ) -> Optional [Responder ]:
206
178
"""Attempts to fetch media described by file_info from the local cache
0 commit comments