Skip to content

Commit a07ba33

Browse files
committed
indexer: Index all recently uploaded projects
1 parent 59d7d41 commit a07ba33

File tree

4 files changed

+265
-24
lines changed

4 files changed

+265
-24
lines changed

py_wtf/cli/__init__.py

Lines changed: 50 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@
88
import json
99
import logging
1010
import shutil
11+
from concurrent.futures import ThreadPoolExecutor
12+
from datetime import datetime
1113
from functools import partial, wraps
1214
from pathlib import Path
1315

@@ -22,7 +24,7 @@
2224
from py_wtf.indexer import index_dir, index_file, index_project
2325
from py_wtf.indexer.pypi import parse_deps, parse_upload_time
2426
from py_wtf.logging import setup_logging
25-
from py_wtf.repository import converter, ProjectRepository
27+
from py_wtf.repository import converter, METADATA_FILENAME, ProjectRepository
2628
from py_wtf.types import (
2729
Documentation,
2830
FQName,
@@ -135,6 +137,53 @@ async def index_dir_cmd(dir: str) -> None:
135137
rich.print(f"Found {cnt} modules in total.")
136138

137139

140+
@py_wtf.command(name="index-since")
141+
@click.option("--since", type=click.DateTime(), required=True)
142+
@click.argument("directory")
143+
@coroutine
144+
async def index_since(directory: str, since: datetime) -> None:
145+
from google.cloud import bigquery
146+
147+
client = bigquery.Client(project="pypinfo-214114")
148+
time_format = "%Y-%m-%d %H:%M:%S"
149+
threadpool = ThreadPoolExecutor()
150+
rows = await asyncio.get_running_loop().run_in_executor(
151+
threadpool,
152+
client.query_and_wait,
153+
f"""
154+
SELECT distinct name
155+
FROM
156+
`bigquery-public-data.pypi.distribution_metadata`
157+
WHERE
158+
TIMESTAMP(upload_time) >= TIMESTAMP("{since.strftime(time_format)}")
159+
""",
160+
)
161+
rows = list(rows)
162+
logger.info(f"Found {len(rows)} new projects to index")
163+
out_dir = Path(directory)
164+
out_dir.mkdir(parents=True, exist_ok=True)
165+
async with httpx.AsyncClient() as client:
166+
resp = await client.get(f"https://py.wtf/_index/{METADATA_FILENAME}")
167+
resp.raise_for_status()
168+
(out_dir / METADATA_FILENAME).write_bytes(resp.content)
169+
170+
logger.info("Fetched prod index")
171+
repo = ProjectRepository(out_dir)
172+
rets = await asyncio.gather(
173+
*[
174+
repo.get(ProjectName(row.name), partial(index_project, repo=repo))
175+
for row in rows
176+
],
177+
return_exceptions=True,
178+
)
179+
for ret in rets:
180+
if isinstance(ret, Exception):
181+
logger.exception(ret)
182+
logger.info("Done indexing")
183+
repo.update_index()
184+
logger.info("Wrote new index")
185+
186+
138187
@py_wtf.command()
139188
@click.argument("dir", required=False)
140189
@coroutine

py_wtf/repository.py

Lines changed: 43 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
import logging
33
from asyncio import Future
44
from collections import Counter, defaultdict
5-
from dataclasses import dataclass, field
5+
from dataclasses import dataclass, field, replace
66
from pathlib import Path
77
from time import time
88
from typing import AsyncIterable, Callable, Tuple
@@ -71,33 +71,36 @@ async def get(
7171
raise ValueError(f"{key} was never yielded by {factory}")
7272
return fut.result()
7373

74-
def generate_index(self, timestamp: int | None) -> Index:
74+
def generate_index(
75+
self, timestamp: int | None, skip_hydration: bool = False
76+
) -> Index:
7577
max_counts = 5
7678
latest_project_mtimes: list[Tuple[ProjectName, float]] = []
7779
all_project_names: list[ProjectName] = []
7880
dep_counts: Counter[str] = Counter()
7981

80-
for item in self.directory.iterdir():
81-
if item.suffix != ".json" or not item.is_file():
82-
continue
83-
name = ProjectName(item.stem)
84-
all_project_names.append(name)
85-
if name not in self._cache:
86-
self._load_from_disk(name)
87-
project = self._cache[name].result()
88-
89-
for dep in project.metadata.dependencies:
90-
dep_counts[dep] += 1
91-
92-
mtime = project.metadata.upload_time
93-
if len(latest_project_mtimes) < max_counts:
94-
latest_project_mtimes.append((name, mtime))
95-
continue
96-
for i in reversed(range(len(latest_project_mtimes))):
97-
if mtime < latest_project_mtimes[i][1]:
98-
latest_project_mtimes.insert(i + 1, (name, mtime))
99-
while len(latest_project_mtimes) > max_counts:
100-
latest_project_mtimes.pop()
82+
if not skip_hydration:
83+
for item in self.directory.iterdir():
84+
if item.suffix != ".json" or not item.is_file():
85+
continue
86+
name = ProjectName(item.stem)
87+
all_project_names.append(name)
88+
if name not in self._cache:
89+
self._load_from_disk(name)
90+
project = self._cache[name].result()
91+
92+
for dep in project.metadata.dependencies:
93+
dep_counts[dep] += 1
94+
95+
mtime = project.metadata.upload_time
96+
if len(latest_project_mtimes) < max_counts:
97+
latest_project_mtimes.append((name, mtime))
98+
continue
99+
for i in reversed(range(len(latest_project_mtimes))):
100+
if mtime < latest_project_mtimes[i][1]:
101+
latest_project_mtimes.insert(i + 1, (name, mtime))
102+
while len(latest_project_mtimes) > max_counts:
103+
latest_project_mtimes.pop()
101104

102105
latest_projects = [
103106
self._cache[name].result().metadata for name, _ in latest_project_mtimes
@@ -120,3 +123,20 @@ def generate_index(self, timestamp: int | None) -> Index:
120123
def write_index(self, timestamp: int | None = None) -> None:
121124
metadata = self.directory / METADATA_FILENAME
122125
metadata.write_text(converter.dumps(self.generate_index(timestamp)))
126+
127+
def update_index(self) -> None:
128+
metadata = self.directory / METADATA_FILENAME
129+
index = converter.loads(metadata.read_text(), Index)
130+
new_index = self.generate_index(int(time()), skip_hydration=True)
131+
index = replace(
132+
index,
133+
generated_at=new_index.generated_at,
134+
latest_projects=sorted(
135+
index.latest_projects + new_index.latest_projects,
136+
key=lambda m: -m.upload_time,
137+
),
138+
all_project_names=sorted(
139+
{*index.all_project_names, *new_index.all_project_names}
140+
),
141+
)
142+
metadata.write_text(converter.dumps(index))

pyproject.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ dependencies = [
2121
"appdirs==1.4.4",
2222
"cattrs==22.1.0",
2323
"click>=8.1.3",
24+
"google-cloud-bigquery>=3.23.1",
2425
"httpx==0.23.0",
2526
"rich>=12.5.1",
2627
"libcst>=1.1.0",

0 commit comments

Comments
 (0)