|
| 1 | +# /// script |
| 2 | +# dependencies = [ |
| 3 | +# "google-search-results>=2.4.2", |
| 4 | +# "pyyaml>=6.0.2", |
| 5 | +# ] |
| 6 | +# /// |
| 7 | +"""Script to fetch papers that cite pymatviz from Google Scholar and update readme. |
| 8 | +
|
| 9 | +Invoke with 64-character SERPAPI_KEY: |
| 10 | +
|
| 11 | +SERPAPI_KEY=ccd7f7ea8... python assets/fetch_citations.py |
| 12 | +""" |
| 13 | +# ruff: noqa: T201 |
| 14 | + |
| 15 | +from __future__ import annotations |
| 16 | + |
| 17 | +import gzip |
| 18 | +import os |
| 19 | +import re |
| 20 | +import shutil |
| 21 | +import sys |
| 22 | +from datetime import datetime, timedelta, timezone |
| 23 | +from typing import NotRequired, TypedDict |
| 24 | + |
| 25 | +import yaml |
| 26 | +from serpapi import GoogleSearch |
| 27 | + |
| 28 | +from pymatviz import ROOT |
| 29 | + |
| 30 | + |
| 31 | +class ScholarPaper(TypedDict): |
| 32 | + """Type for a paper fetched from Google Scholar.""" |
| 33 | + |
| 34 | + title: str |
| 35 | + link: str |
| 36 | + result_id: str |
| 37 | + authors: list[str] |
| 38 | + summary: str | None |
| 39 | + year: int | None |
| 40 | + citations: int |
| 41 | + fetch_date: str |
| 42 | + # Additional metadata fields |
| 43 | + snippet: NotRequired[str] # Paper abstract/description |
| 44 | + resources: NotRequired[list[dict[str, str]]] # Additional links (PDF, HTML, etc.) |
| 45 | + publication_info: NotRequired[dict[str, str]] # Full publication metadata |
| 46 | + inline_links: NotRequired[dict[str, str]] # Related links (citations, versions, ..) |
| 47 | + list_index: NotRequired[int] # list index in search results |
| 48 | + |
| 49 | + |
| 50 | +def should_update(filename: str, update_freq_days: int = 7) -> bool: |
| 51 | + """Check if the file should be updated based on its last modified time. |
| 52 | +
|
| 53 | + Args: |
| 54 | + filename (str): Path to the file to check. |
| 55 | + update_freq_days (int): Number of days to wait between updates. |
| 56 | +
|
| 57 | + Returns: |
| 58 | + bool: True if file doesn't exist or is older than update_freq_days. |
| 59 | + """ |
| 60 | + try: |
| 61 | + mtime = os.path.getmtime(filename) |
| 62 | + last_modified = datetime.fromtimestamp(mtime, tz=timezone.utc) |
| 63 | + return (datetime.now(tz=timezone.utc) - last_modified) > timedelta( |
| 64 | + days=update_freq_days |
| 65 | + ) |
| 66 | + except FileNotFoundError: |
| 67 | + return True |
| 68 | + |
| 69 | + |
| 70 | +def create_backup(filename: str) -> str | None: |
| 71 | + """Backup the specified file with timestamp in new name. |
| 72 | +
|
| 73 | + Args: |
| 74 | + filename (str): Path to the file to backup. |
| 75 | +
|
| 76 | + Returns: |
| 77 | + str | None: Path to the backup file if created, None if source doesn't exist. |
| 78 | + """ |
| 79 | + if not os.path.isfile(filename): |
| 80 | + return None |
| 81 | + |
| 82 | + # Get last modified time and format for filename |
| 83 | + mtime = datetime.fromtimestamp(os.path.getmtime(filename), tz=timezone.utc) |
| 84 | + timestamp = mtime.strftime("%Y%m%d-%H%M%S") |
| 85 | + |
| 86 | + # Create backup filename with timestamp |
| 87 | + base = filename.removesuffix(".yml.gz") |
| 88 | + backup_path = os.path.join(os.path.dirname(filename), f"{base}-{timestamp}.yml.gz") |
| 89 | + shutil.copy2(filename, backup_path) # copy2 preserves metadata |
| 90 | + return str(backup_path) |
| 91 | + |
| 92 | + |
| 93 | +def fetch_scholar_papers( |
| 94 | + api_key: str | None = None, query: str = "pymatviz", num_pages: int = 3 |
| 95 | +) -> list[ScholarPaper]: |
| 96 | + """Fetch papers from Google Scholar that mention pymatviz. |
| 97 | +
|
| 98 | + Args: |
| 99 | + api_key (str | None): SerpAPI key. If None, will try to read from SERPAPI_KEY |
| 100 | + env var. |
| 101 | + query (str): Search query. Defaults to "pymatviz". |
| 102 | + num_pages (int): Number of pages to fetch. Defaults to 3. Increase this number |
| 103 | + as more mentions of pymatviz in literature are found. |
| 104 | +
|
| 105 | + Returns: |
| 106 | + list[ScholarPaper]: List of papers with their metadata including title, authors, |
| 107 | + publication info, year, and citation count. |
| 108 | + """ |
| 109 | + if api_key is None: |
| 110 | + api_key = os.getenv("SERPAPI_KEY") |
| 111 | + if not api_key: |
| 112 | + raise ValueError( |
| 113 | + "No API key provided. Either pass as argument or set SERPAPI_KEY env var." |
| 114 | + ) |
| 115 | + |
| 116 | + papers: list[ScholarPaper] = [] |
| 117 | + today = f"{datetime.now(tz=timezone.utc):%Y-%m-%d}" |
| 118 | + |
| 119 | + for page in range(num_pages): |
| 120 | + params = { |
| 121 | + "api_key": api_key, |
| 122 | + "engine": "google_scholar", |
| 123 | + "q": query, |
| 124 | + "hl": "en", # language |
| 125 | + "start": page * 10, # Google Scholar uses 10 results per page |
| 126 | + } |
| 127 | + |
| 128 | + search = GoogleSearch(params) |
| 129 | + results = search.get_dict() |
| 130 | + |
| 131 | + if "error" in results: |
| 132 | + print(f"Error on page {page + 1}: {results['error']}", file=sys.stderr) |
| 133 | + continue |
| 134 | + |
| 135 | + if "organic_results" not in results: |
| 136 | + print(f"No results found on page {page + 1}", file=sys.stderr) |
| 137 | + break |
| 138 | + |
| 139 | + for idx, result in enumerate(results["organic_results"], start=1): |
| 140 | + # Skip if no title or link |
| 141 | + if not result.get("title") or not result.get("link"): |
| 142 | + continue |
| 143 | + |
| 144 | + # Extract year from publication info if available |
| 145 | + year = None |
| 146 | + pub_info = result.get("publication_info", {}) |
| 147 | + if "summary" in pub_info and ( |
| 148 | + year_match := re.search(r"\b(19|20)\d{2}\b", pub_info["summary"]) |
| 149 | + ): |
| 150 | + year = int(year_match.group()) |
| 151 | + |
| 152 | + # Extract authors from publication info |
| 153 | + authors = [] |
| 154 | + if isinstance(pub_info, dict) and "authors" in pub_info: |
| 155 | + authors = [ |
| 156 | + author["name"] |
| 157 | + for author in pub_info.pop("authors") |
| 158 | + if isinstance(author, dict) and "name" in author |
| 159 | + ] |
| 160 | + |
| 161 | + # Store all metadata from the result, overwrite only processed fields |
| 162 | + paper: ScholarPaper = { # type:ignore[typeddict-item] |
| 163 | + **result, # Keep all original fields |
| 164 | + "authors": authors |
| 165 | + or result.get("authors", []), # Use processed authors |
| 166 | + "year": year, # Use extracted year |
| 167 | + "fetch_date": today, # Add fetch date |
| 168 | + # Add pagination-unwrapped index in search result |
| 169 | + "list_index": idx + page * 10, |
| 170 | + "citations": result.get("inline_links", {}) |
| 171 | + .get("cited_by", {}) |
| 172 | + .get("total", 0), |
| 173 | + "summary": pub_info.get("summary", ""), |
| 174 | + } |
| 175 | + if not paper.get("authors"): |
| 176 | + continue # don't add papers without authors to YAML file |
| 177 | + papers.append(paper) |
| 178 | + |
| 179 | + return papers |
| 180 | + |
| 181 | + |
| 182 | +def save_papers( |
| 183 | + papers: list[ScholarPaper], filename: str = "scholar-papers.yml.gz" |
| 184 | +) -> None: |
| 185 | + """Save papers to a gzipped YAML file. |
| 186 | +
|
| 187 | + Args: |
| 188 | + papers (list[ScholarPaper]): List of papers to save. |
| 189 | + filename (str): Name of the output file. |
| 190 | + """ |
| 191 | + # Load existing papers for diff if file exists |
| 192 | + old_papers: list[ScholarPaper] = [] |
| 193 | + if os.path.isfile(filename): |
| 194 | + with gzip.open(filename, mode="rt", encoding="utf-8") as file: |
| 195 | + old_papers = yaml.safe_load(file) or [] |
| 196 | + |
| 197 | + # Create backup of existing file |
| 198 | + if backup_path := create_backup(filename): |
| 199 | + print(f"\nCreated backup at {backup_path}") |
| 200 | + # Print diff if we have old data |
| 201 | + if old_papers: |
| 202 | + print(f"\nPaper count: {len(old_papers)} → {len(papers)}") |
| 203 | + |
| 204 | + with gzip.open(filename, mode="wt", encoding="utf-8") as file: |
| 205 | + yaml.dump(papers, file, default_flow_style=False, allow_unicode=True) |
| 206 | + |
| 207 | + |
| 208 | +def update_readme( |
| 209 | + papers: list[ScholarPaper], readme_path: str = f"{ROOT}/readme.md" |
| 210 | +) -> None: |
| 211 | + """Update the readme with a list of papers sorted by citations. |
| 212 | +
|
| 213 | + Args: |
| 214 | + papers (list[ScholarPaper]): List of papers to add to readme. |
| 215 | + readme_path (str): Path to the readme file. |
| 216 | + """ |
| 217 | + # Sort papers by citations |
| 218 | + sorted_papers = sorted(papers, key=lambda x: x["citations"], reverse=True) |
| 219 | + |
| 220 | + # Read current readme |
| 221 | + with open(readme_path, encoding="utf-8") as file: |
| 222 | + content = file.read() |
| 223 | + |
| 224 | + # Remove existing papers section if it exists |
| 225 | + if "## Papers using" in content: |
| 226 | + pattern = r"## Papers using.*?$" |
| 227 | + content = re.sub(pattern, "", content, flags=re.DOTALL).rstrip() |
| 228 | + |
| 229 | + # Prepare the new section |
| 230 | + today = f"{datetime.now(tz=timezone.utc):%Y-%m-%d}" |
| 231 | + papers_section = "\n\n## Papers using `pymatviz`\n\n" |
| 232 | + papers_section += ( |
| 233 | + f"Sorted by number of citations. Last updated {today}. " |
| 234 | + "Auto-generated from Google Scholar. Manual additions via PR welcome.\n\n" |
| 235 | + ) |
| 236 | + |
| 237 | + for paper in sorted_papers: |
| 238 | + if not paper["authors"]: |
| 239 | + continue |
| 240 | + authors_str = ", ".join(paper["authors"][:3]) |
| 241 | + if len(paper["authors"]) > 3: |
| 242 | + authors_str += " et al." |
| 243 | + |
| 244 | + year_str = f" ({paper['year']})" if paper["year"] else "" |
| 245 | + cite_str = f" (cited by {paper['citations']})" if paper["citations"] else "" |
| 246 | + |
| 247 | + papers_section += ( |
| 248 | + f"1. {authors_str}{year_str}. [{paper['title']}]({paper['link']})" |
| 249 | + f"{cite_str}\n" |
| 250 | + ) |
| 251 | + |
| 252 | + # Add papers section at the very end of the readme |
| 253 | + content = content.rstrip() + papers_section |
| 254 | + |
| 255 | + # Write updated content |
| 256 | + with open(readme_path, mode="w", encoding="utf-8") as file: |
| 257 | + file.write(content) |
| 258 | + |
| 259 | + |
| 260 | +def main(update_freq_days: int = 7) -> None: |
| 261 | + """Main function to fetch papers and update readme. |
| 262 | +
|
| 263 | + Args: |
| 264 | + update_freq_days (int): Number of days to wait between updates. |
| 265 | + """ |
| 266 | + data_file = f"{ROOT}/assets/scholar-papers.yml.gz" |
| 267 | + |
| 268 | + # Load existing papers |
| 269 | + if os.path.isfile(data_file): |
| 270 | + with gzip.open(data_file, mode="rt", encoding="utf-8") as file: |
| 271 | + existing_papers = yaml.safe_load(file) |
| 272 | + else: |
| 273 | + existing_papers = [] |
| 274 | + |
| 275 | + # Check if we need to update |
| 276 | + if not should_update(data_file, update_freq_days): |
| 277 | + print( |
| 278 | + f"{data_file=} is less than {update_freq_days} days old, skipping update." |
| 279 | + ) |
| 280 | + # Still update readme with existing data |
| 281 | + update_readme(existing_papers) |
| 282 | + return |
| 283 | + |
| 284 | + # Fetch new papers |
| 285 | + new_papers = fetch_scholar_papers() |
| 286 | + |
| 287 | + # Merge papers, keeping the most recent citation counts |
| 288 | + paper_dict: dict[str, ScholarPaper] = { |
| 289 | + paper["title"]: paper for paper in existing_papers |
| 290 | + } | {paper["title"]: paper for paper in new_papers} |
| 291 | + |
| 292 | + # Convert back to list |
| 293 | + all_papers = list(paper_dict.values()) |
| 294 | + |
| 295 | + # Save updated papers |
| 296 | + save_papers(all_papers, data_file) |
| 297 | + |
| 298 | + # Update readme |
| 299 | + update_readme(all_papers) |
| 300 | + |
| 301 | + |
| 302 | +if __name__ == "__main__": |
| 303 | + import argparse |
| 304 | + |
| 305 | + parser = argparse.ArgumentParser( |
| 306 | + description="Fetch papers citing pymatviz and update readme." |
| 307 | + ) |
| 308 | + parser.add_argument( |
| 309 | + "--update-freq", |
| 310 | + type=int, |
| 311 | + default=7, |
| 312 | + help="Number of days to wait between updates (default: 7)", |
| 313 | + ) |
| 314 | + args = parser.parse_args() |
| 315 | + |
| 316 | + main(args.update_freq) |
0 commit comments