|
| 1 | +import json |
| 2 | +import os |
| 3 | +import urllib.request |
| 4 | + |
| 5 | +import requests |
| 6 | +from tqdm import tqdm |
| 7 | + |
| 8 | + |
| 9 | +class repo_info: |
| 10 | + repo = None |
| 11 | + target_dir = None |
| 12 | + branch = None |
| 13 | + |
| 14 | + |
| 15 | +class api_urls: |
| 16 | + recursive = "https://api.github.com/repos/{}/git/trees/{}?recursive=1" |
| 17 | + no_recursive = "https://api.github.com/repos/{}/git/trees/{}" |
| 18 | + |
| 19 | + |
| 20 | +class OctodirException(Exception): |
| 21 | + pass |
| 22 | + |
| 23 | + |
| 24 | +def mkdirs(path): |
| 25 | + if not os.path.isdir(path): |
| 26 | + os.makedirs(path) |
| 27 | + |
| 28 | + |
| 29 | +class Octodir(object): |
| 30 | + |
| 31 | + def __init__(self, folder_url, output_folder): |
| 32 | + super(Octodir, self).__init__() |
| 33 | + self.folder_url = folder_url |
| 34 | + self.output_folder = output_folder |
| 35 | + |
| 36 | + self.repo = None |
| 37 | + self.target_dir = None |
| 38 | + self.branch = None |
| 39 | + |
| 40 | + def __get_raw_url(self, file_path, url): |
| 41 | + tmp_url = url.replace( |
| 42 | + 'https://api.github.com/repos/', |
| 43 | + 'https://raw.githubusercontent.com/') |
| 44 | + tmp_url = tmp_url.split('/git/blobs/')[0] |
| 45 | + tmp_url = tmp_url + '/' + self.branch + '/' + file_path |
| 46 | + |
| 47 | + return tmp_url |
| 48 | + |
| 49 | + def __get_repo_tree(self): |
| 50 | + api = requests.get( |
| 51 | + api_urls.recursive.format(self.repo, self.branch)).text |
| 52 | + files = json.loads(api) |
| 53 | + |
| 54 | + output = [] |
| 55 | + location = dict() |
| 56 | + for (k, i) in enumerate(files['tree']): |
| 57 | + # If the target dir is in file path, that file |
| 58 | + # is inside target folder |
| 59 | + if self.target_dir in i['path']: |
| 60 | + if i['type'] == 'blob': |
| 61 | + tmp = [i['path']] |
| 62 | + tmp += [self.__get_raw_url(tmp[0], i['url'])] |
| 63 | + output.append(tmp) |
| 64 | + else: |
| 65 | + location[i['path']] = k |
| 66 | + files = output |
| 67 | + location = location |
| 68 | + |
| 69 | + return (files, location) |
| 70 | + |
| 71 | + def __scrutinize_url(self, folder_url): |
| 72 | + try: |
| 73 | + cutted_url = folder_url.replace('https://github.com/', '') |
| 74 | + splitted_url = cutted_url.split('/') |
| 75 | + |
| 76 | + owner = splitted_url[0] |
| 77 | + repo = splitted_url[1] |
| 78 | + branch = splitted_url[3] |
| 79 | + |
| 80 | + target_dir = [item for item in splitted_url[4:]] |
| 81 | + |
| 82 | + repo_data = repo_info() |
| 83 | + repo_data.repo = owner + '/' + repo |
| 84 | + repo_data.branch = branch |
| 85 | + repo_data.target_dir = "/".join(target_dir) |
| 86 | + |
| 87 | + return repo_data |
| 88 | + except IndexError: |
| 89 | + raise IndexError('Invalid repo url') |
| 90 | + |
| 91 | + def __api_response(self): |
| 92 | + repo_data = self.__scrutinize_url(self.folder_url) |
| 93 | + api = requests.get(api_urls.no_recursive.format( |
| 94 | + repo_data.repo, repo_data.branch)).text |
| 95 | + response = json.loads(api) |
| 96 | + |
| 97 | + return response |
| 98 | + |
| 99 | + def __check_valid_output(self): |
| 100 | + if os.path.isdir(self.output_folder): |
| 101 | + return True |
| 102 | + else: |
| 103 | + raise OctodirException('Invalid output directory') |
| 104 | + |
| 105 | + def __download(self, target_folder='*', recursive=True): |
| 106 | + data = self.__get_repo_tree() |
| 107 | + files = data[0] |
| 108 | + location = data[1] |
| 109 | + |
| 110 | + # mkdirs(".") |
| 111 | + |
| 112 | + if target_folder == '*': |
| 113 | + start = 0 |
| 114 | + else: |
| 115 | + tmp_target = target_folder.replace('./', '') |
| 116 | + tmp_target = tmp_target.replace('../', '') |
| 117 | + |
| 118 | + # Remove "/" |
| 119 | + tmp_target = (tmp_target if tmp_target[-1] != '/' |
| 120 | + else tmp_target[:-1]) |
| 121 | + start = location[target_folder] |
| 122 | + |
| 123 | + with tqdm(total=len(files), desc="Downloading folder...") as pbar: |
| 124 | + for i in files[start:]: |
| 125 | + |
| 126 | + ndir = i[0].replace( |
| 127 | + self.target_dir, self.target_dir.split('/')[-1:][0]) |
| 128 | + if recursive or ndir.split(target_folder)[1].count('/') \ |
| 129 | + <= 1: |
| 130 | + |
| 131 | + # Check output dir variable |
| 132 | + mkdirs(os.path.join(self.output_folder, os.path.dirname(ndir))) |
| 133 | + urllib.request.urlretrieve( |
| 134 | + i[1], os.path.join(self.output_folder, ndir)) |
| 135 | + pbar.update(1) |
| 136 | + |
| 137 | + def dowload_folder(self): |
| 138 | + check_repo = self.__api_response() |
| 139 | + if 'message' in check_repo: |
| 140 | + raise OctodirException(check_repo['message']) |
| 141 | + else: |
| 142 | + if self.__check_valid_output() is True: |
| 143 | + scrutinized_url = self.__scrutinize_url(self.folder_url) |
| 144 | + self.repo = scrutinized_url.repo |
| 145 | + self.target_dir = scrutinized_url.target_dir |
| 146 | + self.branch = scrutinized_url.branch |
| 147 | + self.__download() |
0 commit comments