bobozi's code cheatsheet
# 2025.06.26
# ai_commiter.py python==3.10.4 openai==1.55.3
from functools import wraps
import json
from typing import Callable, Optional, TypeVar, Dict, List
import traceback
import asyncio
T = TypeVar('T')
def retry(max_retries: int = 1, exceptions: tuple = (Exception, ), context_initializer: Optional[Callable[[], Dict]] = None):
def decorator(func: Callable[..., T]) -> Callable[..., T]:
context = context_initializer() if context_initializer else {}
@wraps(func)
async def wrapper(*args, **kwargs) -> T:
nonlocal context
retries = 1
while retries <= max_retries:
try:
kwargs['retry_context'] = context.copy()
result = await func(*args, **kwargs)
return result
except exceptions as e:
retries += 1
if retries > max_retries:
raise Exception(f"Failed to execute {func.__name__} with {max_retries} retried.")
await asyncio.sleep(0.5)
if hasattr(e, 'update_retry_context'):
context.update(e.update_retry_context)
return wrapper
return decorator
if __name__ == "__main__":
import os
from dotenv import load_dotenv
load_dotenv(override=True)
from openai import AsyncOpenAI
api_key = os.getenv("OPENAI_API_KEY", "sk-")
base_url = os.getenv("OPENAI_BASE_URL", "http://xxx/openai/v1")
model = 'gpt-4o-mini'
class CustomException(Exception):
def __init__(self, message, context_update=None) -> None:
super().__init__(message)
self.context_update = context_update or {}
@property
def update_retry_context(self):
return self.context_update
@retry(max_retries=3, exceptions=(CustomException,), context_initializer=lambda : {'attempt_history': []})
async def guess_number(llm: AsyncOpenAI, number = 10, retry_context=None):
retry_context = retry_context or {}
attempts: List = retry_context['attempt_history']
output_format = """
{
"guess_number": 1 # 你猜测的数字
}
"""
content = [
f"我心里想一个数字(0 - 1000), 你需要猜测这个数字是几, 我会告诉你猜大了或者小了.\n",
f"你的输出格式必须为:\n{output_format}\n",
]
if len(attempts) >= 1:
content.append(f"你之前做过的尝试的结果/报错会记录在下面:\n<history>\n")
for i, attempt in enumerate(attempts):
content.append(f"[try-{i}] {attempt}\n")
content.append(f"\n</history>\n")
content.append("\nYour final output should consist only of the JSON object.\n")
try:
messages = [{"role": "user", "content": "".join(content)}]
resp = await llm.chat.completions.create(model=model, messages=messages)
data = json.loads(resp.choices[0].message.content)
guessed_number = int(data["guess_number"])
except Exception as e:
traceback.print_exc(limit=2)
retry_context['attempt_history'].append(f"你的输出为: {resp.content}, 在解析json的时候发生了错误: {e}")
raise CustomException(message=e, context_update=retry_context)
print(f"AI guess: {guessed_number}")
if number > guessed_number:
retry_context['attempt_history'].append(f"你猜的数字为: {guessed_number}, 和我心里的数字相比: 小了")
raise CustomException(message="", context_update=retry_context)
elif number < guessed_number:
retry_context['attempt_history'].append(f"你猜的数字为: {guessed_number}, 和我心里的数字相比: 大了")
raise CustomException(message="", context_update=retry_context)
else:
print("猜中了!")
llm = AsyncOpenAI(api_key=api_key, base_url=base_url)
asyncio.run(guess_number(llm, 12))
# 2025.06.19
# ai_commiter.py python==3.10.4 openai==1.55.3
# usage: git diff --cached | python ./ai_commiter.py
from openai import OpenAI
import sys, os
system_prompt = """你是一个AI助手, 需要根据用户对代码的修改, 生成相关的 commit 信息.
输出的格式应该为:
[tag] messages
你可以选择的 tag 包括:
feat: 新功能
fix: 修复 bug
docs: 修改或者新增温度
style: 格式变动
refactor: 重构代码
perf: 性能优化
test: 增或修改测试用例
hotfix: 紧急修复
merge: 分支合并
upgrade: 依赖或版本升级
misc: 其他情况
messages 要求用动词开头, 用英文表述, 控制在 100 字以内.
下面是一个输出的样例:
[feat] Add a tool to filter useless elements.
"""
api_key = os.getenv("OPENAI_API_KEY")
base_url = os.getenv("OPENAI_BASE_URL")
model = "gpt-4o-mini"
client = OpenAI(api_key=api_key, base_url=base_url)
messages = []
messages.append({"role": "system", "content": system_prompt})
patchs = sys.stdin.read()
if patchs == "":
print("No changes")
exit(0)
messages.append({"role": "user", "content": f"用户对当前仓库的修改记录如下:\n<git_diff>\n{patchs}\n</git_diff>\n"})
resp = client.chat.completions.create(model=model, messages=messages)
print("Commit messages:")
print(resp.choices[0].message.content)
// 2025.05.14
// get_mem.cpp c++ >= 20
#include <iostream>
#include <fstream>
#include <vector>
using namespace std;
void get_mem_info() {
ifstream file("/proc/self/status");
string line;
size_t vms = 0;
size_t rss = 0;
while (std::getline(file, line)) {
if (line.find("VmRSS") != string::npos) {
sscanf(line.c_str(), "VmRSS: %zu", &rss); // kB
}
if (line.find("VmSize") != string::npos) {
sscanf(line.c_str(), "VmSize: %zu", &vms); // kB
}
}
cout << "RSS: " << rss << " kB, VMS: " << vms << " kB\n";
}
int main() {
get_mem_info();
vector<int> v(10'000'000, 42);
get_mem_info();
}
# 2025.05.13
# get_args.sh
#!/bin/bash
USER_COMMAND=$1
USER_FILE=$2
function help() {
echo "Usgae: $0 <command> <file>"
exit 1
}
if [ $# -ne 2 ]; then
help
fi
echo "Your command is [${USER_COMMAND}]"
echo "Your file is [${USER_FILE}]"
set -x # set verbose
real_command=`which ${USER_COMMAND}`
real_path=`realpath ${USER_FILE}`
set +x # unset verbose
echo "${USER_COMMAND} is ${real_command}"
echo "${USER_FILE} is ${real_path}"
# 2025.04.23
# print_color.py python==3.10.4
import sys
from typing import Literal
def get_platform() -> Literal["macOS", "Linux", "Windows", "Unknown"]:
if sys.platform.startswith('darwin'):
return "MacOS"
elif sys.platform.startswith('linux'):
return "Linux"
elif sys.platform.startswith('win32'):
return "Windows"
else:
return "Unknown"
print(f"Current platform: {get_platform()}")
// 2025.4.20
// new_tracker.cpp c++ >= 20
#include <new> // for std::align_val_t
#include <cstdio>
#include <cstdlib> // for malloc() and aligned_alloc()
#ifdef _MSV_VERT
#include <malloc.h> // for _aligned_malloc() and _aligned_free()
#endif
using namespace std;
class NewTraker {
private:
static inline int n_malloc = 0;
static inline size_t sum_size = 0; // total alloced bytes
static inline bool do_trace = false; // begin trace
static inline bool in_new = false; // not trace overload new's output
public:
static void reset() {
n_malloc = 0;
sum_size = 0;
}
static void trace(bool b) {
do_trace = b;
}
static void* allocate(size_t size, size_t align, const char* call) {
++n_malloc;
sum_size += size;
void *p;
if (align == 0) {
p = malloc(size);
} else {
#ifdef _MSC_VER
p = _aligned_malloc(size, align); // windows api
#else
p = aligned_alloc(align, size); // c++17 api
#endif
}
if (do_trace) {
// do not use std::cout there, it may alloc memory
printf("#%d %s ", n_malloc, call);
printf("%zu bytes, ", size);
if (align > 0) {
printf("%zu-byte aligned) ", align);
} else {
printf("def-aligned) ");
}
printf("=> %p (total: %zu bytes)\n", (void *)p, sum_size);
}
return p;
}
static void status() {
printf("%d allocations for %zu bytes\n", n_malloc, sum_size);
}
};
[[nodiscard]]
void* operator new(size_t size) {
return NewTraker::allocate(size, 0, "::new");
}
[[nodiscard]]
void* operator new(size_t size, align_val_t align) {
return NewTraker::allocate(size, static_cast<size_t>(align), "::new aligned");
}
[[nodiscard]]
void* operator new[](size_t size) {
return NewTraker::allocate(size, 0, "::new[]");
}
[[nodiscard]]
void* operator new[](size_t size, align_val_t align) {
return NewTraker::allocate(size, static_cast<size_t>(align), "::new[] aligned");
}
void operator delete(void *p) noexcept {
free(p);
}
void operator delete (void* p, size_t) noexcept {
::operator delete(p);
}
void operator delete(void *p, align_val_t) noexcept {
#ifdef _MSC_VER
_aligned_free(p);
#else
free(p);
#endif
}
void operator delete(void *p, size_t, align_val_t align) noexcept {
::operator delete(p, align);
}
#include <string>
int main() {
NewTraker::reset();
NewTraker::trace(true);
string s = "string value with 26 chars";
auto p1 = new string{"an initial value with even 35 chars"};
auto p2 = new (align_val_t{64}) string[4];
auto p3 = new string[4] {"7 chars", "x", "or 11 chars", "a string value with 28 chars"};
NewTraker::status();
delete p1;
delete[] p2;
delete[] p3;
}
# 2025.04.17
# test_conn.py python==3.10.4 openai==1.55.3 pillow==11.0.0
import base64
import os
from PIL import Image
from io import BytesIO
from openai import OpenAI
def compress_image(image_content, quality=85, size=None):
if quality == 100:
return image_content
image = Image.open(BytesIO(image_content))
if size:
image = Image.resize(size, Image.Resampling.LANCZOS)
output = BytesIO()
image.save(output, format="png", quality=quality)
cimage_content = output.getvalue()
print(f"Compress image from {len(image_content)} to {len(cimage_content)} [quality = {quality}]")
return cimage_content
image_path = "./snapshot.png"
with open(image_path, "rb") as image_file:
base64_image = base64.b64encode(compress_image(image_file.read())).decode("utf-8")
api_key = os.getenv("OPENAI_API_KEY")
base_url = os.getenv("OPENAI_BASE_URL")
model = "gpt-4o"
client = OpenAI(api_key=api_key, base_url=base_url)
messages = []
messages.append({
"role": "user",
"content": [
{"type": "text", "text": "请问这张图里出现了什么动物?"},
{
"type": "image_url",
"image_url": {
"url": f"data:image/png;base64,{base64_image}",
"detail": "high",
}
},
]
})
resp = client.chat.completions.create(model=model, messages=messages)
print(f"AI: {resp.choices[0].message.content}")
// 2025.04.16
// timer.cpp c++ >= 20
#include <chrono>
#include <iostream>
#include <queue>
#include <vector>
#include <functional>
using namespace std;
template <typename Duration = std::chrono::milliseconds, int Times = 1>
struct Benchmarker {
template<typename Callable, typename ...Args>
static auto benchmark(Callable&& func, Args&&... args) {
static_assert(std::is_invocable_v<Callable, Args...>,
"Callable must be invocable with the given arguments.");
static_assert(Times >= 1, "Times must >= 1");
// warmup
std::invoke(std::forward<Callable>(func), std::forward<Args>(args)...);
std::vector<decltype(std::chrono::high_resolution_clock::now())> running_time(Times + 1);
running_time[0] = std::chrono::high_resolution_clock::now();
for (int i = 1; i <= Times; i++) {
std::invoke(std::forward<Callable>(func), std::forward<Args>(args)...);
running_time[i] = std::chrono::high_resolution_clock::now();
}
std::string unit = "ms";
if constexpr (std::is_same_v<Duration, std::chrono::nanoseconds>) {
unit = "ns";
} else if constexpr (std::is_same_v<Duration, std::chrono::microseconds>) {
unit = "us";
}
for (int i = 1; i <= Times; i++) {
auto dur = std::chrono::duration_cast<Duration>(running_time[i] - running_time[i - 1]);
std::cout << "[" << i-1 << "]: " << dur.count() << unit << "\n";
}
auto total_dur = std::chrono::duration_cast<Duration>(running_time[Times] - running_time[0]);
std::cout << "Average Cost: " << total_dur.count() / Times << unit << "\n";
}
};
struct Foo {
void test(int n) {
std::queue<int> q;
for (int i = 0; i < n; i++) {
q.push(i);
}
}
void internal_bench() {
std::cout << "============== Internal Bench ==============\n";
Benchmarker<chrono::microseconds, 4> bm;
bm.benchmark(&Foo::test, this, 100000);
}
};
int main() {
{
Benchmarker<chrono::microseconds, 3> bm;
auto push_back = [](int n) {
std::vector<int> v;
for (int i = 0; i < n; i++) {
v.push_back(i);
}
};
bm.benchmark(push_back, 10000);
bm.benchmark(push_back, 100000);
Foo foo;
bm.benchmark(&Foo::test, foo, 10000);
bm.benchmark(&Foo::test, foo, 100000);
foo.internal_bench();
}
}
# 2025.04.15
# print_color.py python==3.10.4 colorama==0.4.6
from colorama import Fore, Back, Style
# Fore: 字体颜色, Back: 字体背景颜色, Style: 字体格式
DEBUG = f"{Style.DIM}[DEBUG]{Style.RESET_ALL}"
INFO = f"{Fore.GREEN}[INFO]{Fore.RESET}"
WARN = f"{Fore.YELLOW}[WARN]{Fore.RESET}"
ERROR = f"{Fore.RED}{Style.BRIGHT}[ERROR]{Style.RESET_ALL}"
print(DEBUG, "This is a debug infomartion")
print(INFO, "This is a normal infomartion")
print(WARN, "This is a warning infomartion")
print(ERROR, "This is a error infomartion")
// timer.cpp c++ >= 14
#include <chrono>
#include <iostream>
#include <thread>
using namespace std;
struct Timer {
chrono::time_point<chrono::system_clock> start_time;
Timer() : start_time(chrono::system_clock::now()) {}
~Timer() {
auto end_time{chrono::system_clock::now()};
auto dur = chrono::duration_cast<chrono::milliseconds>(end_time - start_time);
cout << "Execution Time: " << dur.count() << " ms.\n";
}
};
int main() {
{
Timer t;
this_thread::sleep_for(1s);
}
}
# 2025.04.08
# reflect_func.py python==3.10.4
import inspect
class Operator:
def do_actions(self, actions):
for act, *args in actions:
func = getattr(self, f"_do_{act}", None) # 找到自己的成员函数
if func:
signature = inspect.signature(func)
if len(signature.parameters) != len(args):
print(f"Signature not match: {signature} != {args}")
else:
ret = func(*args)
print(f"{act}({args}) = {ret}")
else:
print(f"Cannot found function: {act}")
def _do_add(self, x: int, y: int):
return x + y
def _do_subtract(self, x, y):
return x - y
def _do_self_increase(self, a):
return a + 1
def do_actions(obj, actions):
for act, *args in actions:
find_result = inspect.getmembers(obj, predicate=lambda mem: inspect.ismethod(mem) and mem.__name__ == f"_do_{act}")
if len(find_result) != 1:
print(f"Cannot found function: {act}")
else:
func_name, func = find_result[0]
signature = inspect.signature(func)
if len(signature.parameters) != len(args):
print(f"Signature not match: {signature} != {args}")
else:
ret = func(*args)
print(f"{act}({args}) = {ret}")
actions = [("add", 1, 2), ("subtract", 3, 9), ("times", 3, 4), ("self_increase", 4), ("self_increase", 1, 3)]
## 在类的内部通过函数名调用具体函数
op = Operator()
op.do_actions(actions)
print("=" * 100)
## 在类的外部通过函数名调用具体函数
do_actions(op, actions)
# 2025.04.08
# cmd.py python==3.10.4
import argparse
from pathlib import Path
parser = argparse.ArgumentParser(description="This is a basic argparse demo.")
# 给解析器增加一个接收文件的参数,设置类型为 Path 会自动解析 str 成 Path,指定这个参数是必填的
parser.add_argument("-f", "--file", type=Path, help="Get file path", required=True)
# 给解析器增加一个重复次数的参数,设置类型为 int,提供默认值为 2,可选范围设置为 1-4
parser.add_argument("-t", "--times", type=int, help="Repeat times", default=2, choices=[1, 2, 3, 4])
args = parser.parse_args()
for i in range(args.times):
print(f"[{i+1}] : {args.file.absolute()}")
- 参考:CSS Selector
- 参考:CSS选择器定位方法
// 2025.04.10
// test url: https://en.wikipedia.org/wiki/Main_Page
// 选择所有元素。
document.querySelectorAll("*")
// 按照给定的节点名称,选择所有匹配的元素
document.querySelectorAll("div")
// 按照给定的 class 属性的值,选择所有匹配的元素
document.querySelectorAll(".MainPageBG")
// 按照 id 属性选择一个与之匹配的元素
// ⚠️:虽然原则上id不能重复,但是出现重复id也不会报错
document.querySelectorAll("#mw-content-text")
// 按照给定的属性,选择所有匹配的元素
document.querySelectorAll("[style]") // 带有 style 属性的元素
document.querySelectorAll('[style*="display: none"]') // style 属性带有 display: none 的元素
// 组合,选择器之间不要包含空格
document.querySelectorAll('input[type=checkbox]')
document.querySelectorAll('input[type=checkbox].mw-interlanguage-selector')
// 逗号表示 或
document.querySelectorAll('input,div')
// 选择前一个元素的后代节点
document.querySelectorAll("body div")
// 选择前一个元素的直接子代的节点
document.querySelectorAll("body>div")
// 伪选择器
document.querySelectorAll(":enabled") // 选择启用的表单控件
document.querySelectorAll(':checked') // 表示任何处于选中状态的radio, checkbox, select
// 组合
document.querySelector(':checked').parentElement // 找到某个radio所在的父节点
document.querySelector('div:nth-child(1)') // 找到第一个div的第一个子元素
# 2025.04.08
# test_conn.py python==3.10.4 openai==1.55.3
from openai import OpenAI
import os
api_key = os.getenv("OPENAI_API_KEY", "...")
base_url = os.getenv("OPENAI_BASE_URL", "http://xxx/v1")
model = "gpt-4o-mini"
# print(f"base_url: {base_url}, api_key: {api_key}")
client = OpenAI(api_key=api_key, base_url=base_url)
messages = []
messages.append({"role": "user", "content": "hello"})
resp = client.chat.completions.create(model=model, messages=messages)
print(f"User: {messages[0]['content']}")
print(f"AI: {resp.choices[0].message.content}")