1
1
import os
2
2
import re
3
+ import sys
3
4
import json
4
5
import requests
5
6
from time import sleep
6
- from typing import Iterable
7
+ from typing import Iterable , Any
8
+ from datetime import datetime
7
9
8
- from snapmap_archiver .Coordinates import Coordinates
9
- from snapmap_archiver .snap import Snap
10
-
11
-
12
- MAX_RADIUS = 85_000
13
- ISSUES_URL = "https://github.com/king-millez/snapmap-archiver/issues/new/choose"
10
+ from snapmap_archiver .coordinates import Coordinates
11
+ from snapmap_archiver .snap import Snap , SnapJSONEncoder
14
12
15
13
16
14
class SnapmapArchiver :
15
+ MAX_RADIUS = 85_000
16
+ ISSUES_URL = "https://github.com/king-millez/snapmap-archiver/issues/new/choose"
17
+ SNAP_PATTERN = re .compile (
18
+ r"(?:https?:\/\/map\.snapchat\.com\/ttp\/snap\/)?(W7_(?:[aA-zZ0-9\-_\+]{22})(?:[aA-zZ0-9-_\+]{28})AAAAA[AQ])(?:\/?@-?[0-9]{1,3}\.?[0-9]{0,},-?[0-9]{1,3}\.?[0-9]{0,}(?:,[0-9]{1,3}\.?[0-9]{0,}z))?"
19
+ )
20
+
17
21
def __init__ (self , * args , ** kwargs ) -> None :
22
+ if sys .version_info < (3 , 10 ):
23
+ raise RuntimeError (
24
+ "Python 3.10 or above is required to use snapmap-archiver!"
25
+ )
26
+
18
27
self .write_json = kwargs .get ("write_json" )
19
- self .all_snaps : dict [str , dict [ str , str ] ] = {}
28
+ self .all_snaps : dict [str , Snap ] = {}
20
29
self .arg_snaps = args
21
30
self .coords_list = []
22
31
self .radius = 10_000
@@ -27,7 +36,7 @@ def __init__(self, *args, **kwargs) -> None:
27
36
28
37
if not kwargs ["locations" ] and not args and not kwargs ["input_file" ]:
29
38
raise ValueError (
30
- "Some sort of input is required; location (-l), input file (--file ), and raw Snap IDs are all valid options."
39
+ "Some sort of input is required; location (-l), input file (-f ), and raw Snap IDs are all valid options."
31
40
)
32
41
33
42
if not kwargs ["output_dir" ]:
@@ -42,7 +51,9 @@ def __init__(self, *args, **kwargs) -> None:
42
51
43
52
if kwargs .get ("radius" ):
44
53
self .radius = (
45
- MAX_RADIUS if kwargs ["radius" ] > MAX_RADIUS else kwargs ["radius" ]
54
+ self .MAX_RADIUS
55
+ if kwargs ["radius" ] > self .MAX_RADIUS
56
+ else kwargs ["radius" ]
46
57
)
47
58
48
59
# Query provided coordinates for Snaps
@@ -55,7 +66,7 @@ def __init__(self, *args, **kwargs) -> None:
55
66
if kwargs .get ("input_file" ):
56
67
self .input_file = kwargs ["input_file" ]
57
68
58
- def download_snaps (self , group : Iterable [Snap ] | Snap ):
69
+ def download_snaps (self , group : list [Snap ] | Snap ):
59
70
if isinstance (group , Snap ):
60
71
group = [group ]
61
72
for snap in group :
@@ -67,29 +78,31 @@ def download_snaps(self, group: Iterable[Snap] | Snap):
67
78
f .write (requests .get (snap .url ).content )
68
79
print (f" - Downloaded { fpath } ." )
69
80
70
- def query_snaps (self , snaps : str | Iterable [str ]) -> list [Snap | None ]:
81
+ def query_snaps (self , snaps : str | Iterable [str ]) -> list [Snap ]:
71
82
if isinstance (snaps , str ):
72
83
snaps = [
73
84
snaps
74
85
] # The Snap query endpoint can take multiple IDs, so here we can query 1 or more snaps with ease.
75
86
to_query = []
76
87
for snap_id in snaps :
77
88
rgx_match = re .search (
78
- r"(?:https?:\/\/map\.snapchat\.com\/ttp\/snap\/)?(W7_(?:[aA-zZ0-9\-_\+]{22})(?:[aA-zZ0-9-_\+]{28})AAAAA[AQ])(?:\/?@-?[0-9]{1,3}\.?[0-9]{0,},-?[0-9]{1,3}\.?[0-9]{0,}(?:,[0-9]{1,3}\.?[0-9]{0,}z))?" ,
89
+ self . SNAP_PATTERN ,
79
90
snap_id ,
80
91
)
81
92
if not rgx_match :
82
93
print (f"{ snap_id } is not a valid Snap URL or ID." )
83
94
continue
84
95
to_query .append (rgx_match .group (1 ))
85
96
try :
86
- return [
87
- self ._parse_snap (snap )
88
- for snap in requests .post (
89
- "https://ms.sc-jpl.com/web/getStoryElements" ,
90
- json = {"snapIds" : to_query },
91
- ).json ()["elements" ]
92
- ]
97
+ retl = []
98
+ for snap in requests .post (
99
+ "https://ms.sc-jpl.com/web/getStoryElements" ,
100
+ json = {"snapIds" : to_query },
101
+ ).json ()["elements" ]:
102
+ s = self ._parse_snap (snap )
103
+ if s :
104
+ retl .append (s )
105
+ return retl
93
106
except requests .exceptions .JSONDecodeError :
94
107
return []
95
108
@@ -155,7 +168,7 @@ def main(self):
155
168
if self .input_file :
156
169
if os .path .isfile (self .input_file ):
157
170
with open (self .input_file , "r" ) as f :
158
- to_format = f .read ().split ("\n " )
171
+ to_format = [ ln for ln in f .read ().split ("\n " ) if ln . strip ()]
159
172
self .download_snaps (self .query_snaps (to_format ))
160
173
else :
161
174
raise FileNotFoundError ("Input file does not exist." )
@@ -164,30 +177,50 @@ def main(self):
164
177
self .download_snaps (self .query_snaps (self .arg_snaps ))
165
178
166
179
if self .write_json :
167
- with open (os .path .join (self .output_dir , "archive.json" ), "w" ) as f :
168
- f .write (json .dumps (self ._transform_index (self .all_snaps ), indent = 2 ))
180
+ with open (
181
+ os .path .join (
182
+ self .output_dir , f"archive_{ int (datetime .now ().timestamp ())} .json"
183
+ ),
184
+ "w" ,
185
+ ) as f :
186
+ f .write (
187
+ json .dumps (
188
+ self ._transform_index (self .all_snaps ),
189
+ indent = 2 ,
190
+ cls = SnapJSONEncoder ,
191
+ )
192
+ )
169
193
170
194
def _transform_index (self , index : dict [str , Snap ]):
171
195
return [v for v in index .values ()]
172
196
173
- def _parse_snap (self , snap : dict ):
174
- data_dict = {"create_time" : snap ["timestamp" ], "snap_id" : snap ["id" ]}
197
+ def _parse_snap (
198
+ self ,
199
+ snap : dict [
200
+ str , Any
201
+ ], # I don't like the Any type but this dict is so dynamic there isn't much point hinting it accurately.
202
+ ) -> Snap | None :
203
+ data_dict = {
204
+ "create_time" : round (int (snap ["timestamp" ]) * 10 ** - 3 , 3 ),
205
+ "snap_id" : snap ["id" ],
206
+ }
175
207
if snap ["snapInfo" ].get ("snapMediaType" ):
176
208
data_dict ["file_type" ] = "mp4"
177
209
elif snap ["snapInfo" ].get ("streamingMediaInfo" ):
178
210
data_dict ["file_type" ] = "jpg"
179
211
else :
180
212
print (
181
- f'**Unknown Snap type detected!**\n \t ID: { snap ["id" ]} \n \t Snap data: { json .dumps (snap )} \n Please report this at { ISSUES_URL } \n '
213
+ f'**Unknown Snap type detected!**\n \t ID: { snap ["id" ]} \n \t Snap data: { json .dumps (snap )} \n Please report this at { self . ISSUES_URL } \n '
182
214
)
183
- return
215
+ return None
184
216
url = snap ["snapInfo" ]["streamingMediaInfo" ].get ("mediaUrl" )
185
217
if not url :
186
- return
218
+ return None
187
219
data_dict ["url" ] = url
220
+ s = Snap (** data_dict )
188
221
if not self .all_snaps .get (snap ["id" ]):
189
- self .all_snaps [snap ["id" ]] = data_dict
190
- return Snap ( ** data_dict )
222
+ self .all_snaps [snap ["id" ]] = s
223
+ return s
191
224
192
225
def _get_epoch (self ):
193
226
epoch_endpoint = requests .post (
@@ -201,7 +234,7 @@ def _get_epoch(self):
201
234
return entry ["id" ]["epoch" ]
202
235
else :
203
236
raise self .MissingEpochError (
204
- f"The API epoch could not be obtained.\n \n Please report this at { ISSUES_URL } "
237
+ f"The API epoch could not be obtained.\n \n Please report this at { self . ISSUES_URL } "
205
238
)
206
239
207
240
class MissingEpochError (Exception ):
0 commit comments