Skip to content

bobozi-cmd/cheatsheet

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

18 Commits
 
 
 
 

Repository files navigation

cheatsheet

bobozi's code cheatsheet

可以传递上下文信息的 retry 装饰器

# 2025.06.26
# ai_commiter.py python==3.10.4 openai==1.55.3
from functools import wraps
import json
from typing import Callable, Optional, TypeVar, Dict, List
import traceback
import asyncio

T = TypeVar('T')

def retry(max_retries: int = 1, exceptions: tuple = (Exception, ), context_initializer: Optional[Callable[[], Dict]] = None):
    def decorator(func: Callable[..., T]) -> Callable[..., T]:
        context = context_initializer() if context_initializer else {}

        @wraps(func)
        async def wrapper(*args, **kwargs) -> T:
            nonlocal context
            retries = 1
            
            while retries <= max_retries:
                try:
                    kwargs['retry_context'] = context.copy()
                    result = await func(*args, **kwargs)
                    return result
                except exceptions as e:
                    retries += 1
                    if retries > max_retries:
                        raise Exception(f"Failed to execute {func.__name__} with {max_retries} retried.")
                    
                    await asyncio.sleep(0.5)

                    if hasattr(e, 'update_retry_context'):
                        context.update(e.update_retry_context)
            
        return wrapper
    return decorator

if __name__ == "__main__":
    import os
    from dotenv import load_dotenv
    load_dotenv(override=True)

    from openai import AsyncOpenAI

    api_key = os.getenv("OPENAI_API_KEY", "sk-")
    base_url = os.getenv("OPENAI_BASE_URL", "http://xxx/openai/v1")
    model = 'gpt-4o-mini'

    class CustomException(Exception):
        def __init__(self, message, context_update=None) -> None:
            super().__init__(message)
            self.context_update = context_update or {}
        
        @property
        def update_retry_context(self):
            return self.context_update


    @retry(max_retries=3, exceptions=(CustomException,), context_initializer=lambda : {'attempt_history': []})
    async def guess_number(llm: AsyncOpenAI, number = 10, retry_context=None):
        retry_context = retry_context or {}
        attempts: List = retry_context['attempt_history']

        output_format = """
{
    "guess_number": 1 # 你猜测的数字
}
"""

        content = [
            f"我心里想一个数字(0 - 1000), 你需要猜测这个数字是几, 我会告诉你猜大了或者小了.\n",
            f"你的输出格式必须为:\n{output_format}\n",
        ]
        if len(attempts) >= 1:
            content.append(f"你之前做过的尝试的结果/报错会记录在下面:\n<history>\n")
            for i, attempt in enumerate(attempts):
                content.append(f"[try-{i}] {attempt}\n")
            content.append(f"\n</history>\n")

        content.append("\nYour final output should consist only of the JSON object.\n")

        try:
            messages = [{"role": "user", "content": "".join(content)}]
            resp = await llm.chat.completions.create(model=model, messages=messages)
            data = json.loads(resp.choices[0].message.content)
            guessed_number = int(data["guess_number"])
        except Exception as e:
            traceback.print_exc(limit=2)
            retry_context['attempt_history'].append(f"你的输出为: {resp.content}, 在解析json的时候发生了错误: {e}")
            raise CustomException(message=e, context_update=retry_context)

        print(f"AI guess: {guessed_number}")

        if number > guessed_number:
            retry_context['attempt_history'].append(f"你猜的数字为: {guessed_number}, 和我心里的数字相比: 小了")
            raise CustomException(message="", context_update=retry_context)
        elif number < guessed_number:
            retry_context['attempt_history'].append(f"你猜的数字为: {guessed_number}, 和我心里的数字相比: 大了")
            raise CustomException(message="", context_update=retry_context)
        else:
            print("猜中了!")

    llm = AsyncOpenAI(api_key=api_key, base_url=base_url)
    asyncio.run(guess_number(llm, 12))

使用 AI 生成 commit 信息

# 2025.06.19
# ai_commiter.py python==3.10.4 openai==1.55.3
# usage: git diff --cached | python ./ai_commiter.py
from openai import OpenAI
import sys, os

system_prompt = """你是一个AI助手, 需要根据用户对代码的修改, 生成相关的 commit 信息.
输出的格式应该为:
[tag] messages

你可以选择的 tag 包括:
    feat: 新功能
    fix: 修复 bug
    docs: 修改或者新增温度
    style: 格式变动
    refactor: 重构代码
    perf: 性能优化
    test: 增或修改测试用例
    hotfix: 紧急修复
    merge: 分支合并
    upgrade: 依赖或版本升级
    misc: 其他情况

messages 要求用动词开头, 用英文表述, 控制在 100 字以内.

下面是一个输出的样例:
    [feat] Add a tool to filter useless elements.
"""

api_key = os.getenv("OPENAI_API_KEY")
base_url = os.getenv("OPENAI_BASE_URL")
model = "gpt-4o-mini"

client = OpenAI(api_key=api_key, base_url=base_url)
messages = []
messages.append({"role": "system", "content": system_prompt})

patchs = sys.stdin.read()
if patchs == "":
    print("No changes")
    exit(0)

messages.append({"role": "user", "content": f"用户对当前仓库的修改记录如下:\n<git_diff>\n{patchs}\n</git_diff>\n"})

resp = client.chat.completions.create(model=model, messages=messages)
print("Commit messages:")
print(resp.choices[0].message.content)

Linux 获取程序运行内存占用情况

// 2025.05.14
// get_mem.cpp c++ >= 20
#include <iostream>
#include <fstream>
#include <vector>

using namespace std;

void get_mem_info() {
    ifstream file("/proc/self/status");
    string line;
    size_t vms = 0;
    size_t rss = 0;

    while (std::getline(file, line)) {
        if (line.find("VmRSS") != string::npos) {
            sscanf(line.c_str(), "VmRSS: %zu", &rss); // kB
        }
        if (line.find("VmSize") != string::npos) {
            sscanf(line.c_str(), "VmSize: %zu", &vms); // kB
        }
    }
    cout << "RSS: " << rss << " kB, VMS: " << vms << " kB\n";
}

int main() {
    get_mem_info();
    vector<int> v(10'000'000, 42);
    get_mem_info();
}

Bash 接受命令行参数

# 2025.05.13
# get_args.sh
#!/bin/bash

USER_COMMAND=$1
USER_FILE=$2

function help() {
    echo "Usgae: $0 <command> <file>"
    exit 1
}

if [ $# -ne 2 ]; then 
    help
fi

echo "Your command is [${USER_COMMAND}]"
echo "Your file is [${USER_FILE}]"

set -x # set verbose
real_command=`which ${USER_COMMAND}`
real_path=`realpath ${USER_FILE}`
set +x # unset verbose

echo "${USER_COMMAND} is ${real_command}"
echo "${USER_FILE} is ${real_path}"

获取当前平台信息

# 2025.04.23
# print_color.py python==3.10.4
import sys
from typing import Literal

def get_platform() -> Literal["macOS", "Linux", "Windows", "Unknown"]:
    if sys.platform.startswith('darwin'):
        return "MacOS"
    elif sys.platform.startswith('linux'):
        return "Linux"
    elif sys.platform.startswith('win32'):
        return "Windows"
    else:
        return "Unknown"

print(f"Current platform: {get_platform()}")

追踪所有 ::new 分配的内存

// 2025.4.20
// new_tracker.cpp c++ >= 20
#include <new>     // for std::align_val_t
#include <cstdio>
#include <cstdlib> // for malloc() and aligned_alloc()

#ifdef _MSV_VERT
#include <malloc.h> // for _aligned_malloc() and _aligned_free()
#endif

using namespace std;

class NewTraker {
private:
    static inline int n_malloc = 0;
    static inline size_t sum_size = 0;   // total alloced bytes
    static inline bool do_trace = false; // begin trace
    static inline bool in_new = false;   // not trace overload new's output

public:
    static void reset() {
        n_malloc = 0;
        sum_size = 0;
    }

    static void trace(bool b) {
        do_trace = b;
    }

    static void* allocate(size_t size, size_t align, const char* call) {
        ++n_malloc;
        sum_size += size;
        void *p;
        if (align == 0) {
            p = malloc(size);
        } else {
#ifdef _MSC_VER
            p = _aligned_malloc(size, align); // windows api
#else
            p = aligned_alloc(align, size); // c++17 api
#endif
        }
        if (do_trace) {
            // do not use std::cout there, it may alloc memory
            printf("#%d %s ", n_malloc, call);
            printf("%zu bytes, ", size);
            if (align > 0) {
                printf("%zu-byte aligned) ", align);
            } else {
                printf("def-aligned) ");
            }
            printf("=> %p (total: %zu bytes)\n", (void *)p, sum_size);
        }
        return p;
    }
    static void status() {
        printf("%d allocations for %zu bytes\n", n_malloc, sum_size);
    }
};

[[nodiscard]]
void* operator new(size_t size) {
    return NewTraker::allocate(size, 0, "::new");
}

[[nodiscard]]
void* operator new(size_t size, align_val_t align) {
    return NewTraker::allocate(size, static_cast<size_t>(align), "::new aligned");
}

[[nodiscard]]
void* operator new[](size_t size) {
    return NewTraker::allocate(size, 0, "::new[]");
}

[[nodiscard]]
void* operator new[](size_t size, align_val_t align) {
    return NewTraker::allocate(size, static_cast<size_t>(align), "::new[] aligned");
}

void operator delete(void *p) noexcept {
    free(p);
}

void operator delete (void* p, size_t) noexcept {
    ::operator delete(p);
}

void operator delete(void *p, align_val_t) noexcept {
#ifdef _MSC_VER
    _aligned_free(p);
#else
    free(p);
#endif
}

void operator delete(void *p, size_t, align_val_t align) noexcept {
    ::operator delete(p, align);
}

#include <string>

int main() {
    NewTraker::reset();
    NewTraker::trace(true);

    string s = "string value with 26 chars";
    auto p1 = new string{"an initial value with even 35 chars"};
    auto p2 = new (align_val_t{64}) string[4];
    auto p3 = new string[4] {"7 chars", "x", "or 11 chars", "a string value with 28 chars"};

    NewTraker::status();

    delete p1;
    delete[] p2;
    delete[] p3;
}

使用 openai 的接口调用VLM,提供图片压缩的功能

# 2025.04.17
# test_conn.py python==3.10.4 openai==1.55.3 pillow==11.0.0
import base64
import os
from PIL import Image
from io import BytesIO
from openai import OpenAI

def compress_image(image_content, quality=85, size=None):
    if quality == 100:
        return image_content
    
    image = Image.open(BytesIO(image_content))
    if size:
        image = Image.resize(size, Image.Resampling.LANCZOS)
    output = BytesIO()

    image.save(output, format="png", quality=quality)
    cimage_content = output.getvalue()
    print(f"Compress image from {len(image_content)} to {len(cimage_content)} [quality = {quality}]")
    return cimage_content


image_path = "./snapshot.png"
with open(image_path, "rb") as image_file:
    base64_image = base64.b64encode(compress_image(image_file.read())).decode("utf-8")

api_key = os.getenv("OPENAI_API_KEY")
base_url = os.getenv("OPENAI_BASE_URL")
model = "gpt-4o"

client = OpenAI(api_key=api_key, base_url=base_url)

messages = []
messages.append({
    "role": "user",
    "content": [
        {"type": "text", "text": "请问这张图里出现了什么动物?"},
        {
            "type": "image_url", 
            "image_url": {
                "url": f"data:image/png;base64,{base64_image}",
                "detail": "high",
            }
        },
    ]
})

resp = client.chat.completions.create(model=model, messages=messages)
print(f"AI: {resp.choices[0].message.content}")

使用 chrono 统计平均执行时间

// 2025.04.16
// timer.cpp c++ >= 20
#include <chrono>
#include <iostream>
#include <queue>
#include <vector>
#include <functional>
using namespace std;

template <typename Duration = std::chrono::milliseconds, int Times = 1>
struct Benchmarker {
    template<typename Callable, typename ...Args>
    static auto benchmark(Callable&& func, Args&&... args) {
        static_assert(std::is_invocable_v<Callable, Args...>, 
            "Callable must be invocable with the given arguments.");
        static_assert(Times >= 1, "Times must >= 1");
        // warmup
        std::invoke(std::forward<Callable>(func), std::forward<Args>(args)...);
        
        std::vector<decltype(std::chrono::high_resolution_clock::now())> running_time(Times + 1);
        running_time[0] = std::chrono::high_resolution_clock::now();

        for (int i = 1; i <= Times; i++) {
            std::invoke(std::forward<Callable>(func), std::forward<Args>(args)...);
            running_time[i] = std::chrono::high_resolution_clock::now();
        }

        std::string unit = "ms";
        if constexpr (std::is_same_v<Duration, std::chrono::nanoseconds>) {
            unit = "ns";
        } else if constexpr (std::is_same_v<Duration, std::chrono::microseconds>) {
            unit = "us";
        }

        for (int i = 1; i <= Times; i++) {
            auto dur = std::chrono::duration_cast<Duration>(running_time[i] - running_time[i - 1]);
            std::cout << "[" << i-1 << "]: " <<  dur.count() << unit << "\n";
        }

        auto total_dur = std::chrono::duration_cast<Duration>(running_time[Times] - running_time[0]);
        std::cout << "Average Cost: " << total_dur.count() / Times << unit << "\n";
    }
};

struct Foo {
    void test(int n) {
        std::queue<int> q;
        for (int i = 0; i < n; i++) {
            q.push(i);
        }
    }

    void internal_bench() {
        std::cout << "============== Internal Bench ==============\n";
        Benchmarker<chrono::microseconds, 4> bm;
        bm.benchmark(&Foo::test, this, 100000);
    }
};

int main() {
    {
        Benchmarker<chrono::microseconds, 3> bm;
        auto push_back = [](int n) {
            std::vector<int> v;
            for (int i = 0; i < n; i++) {
                v.push_back(i);
            }
        };
        bm.benchmark(push_back, 10000);
        bm.benchmark(push_back, 100000);

        Foo foo;
        bm.benchmark(&Foo::test, foo, 10000);
        bm.benchmark(&Foo::test, foo, 100000);

        foo.internal_bench();
    }
}

使用 colorama 打印彩色命令行文字

# 2025.04.15
# print_color.py python==3.10.4 colorama==0.4.6
from colorama import Fore, Back, Style
# Fore: 字体颜色, Back: 字体背景颜色, Style: 字体格式

DEBUG = f"{Style.DIM}[DEBUG]{Style.RESET_ALL}"
INFO = f"{Fore.GREEN}[INFO]{Fore.RESET}"
WARN = f"{Fore.YELLOW}[WARN]{Fore.RESET}"
ERROR = f"{Fore.RED}{Style.BRIGHT}[ERROR]{Style.RESET_ALL}"

print(DEBUG, "This is a debug infomartion")
print(INFO, "This is a normal infomartion")
print(WARN, "This is a warning infomartion")
print(ERROR, "This is a error infomartion")

使用 chrono 统计当前代码的执行时间

// timer.cpp c++ >= 14
#include <chrono>
#include <iostream>
#include <thread>
using namespace std;

struct Timer {

    chrono::time_point<chrono::system_clock> start_time;

    Timer() : start_time(chrono::system_clock::now()) {}
    ~Timer() {
        auto end_time{chrono::system_clock::now()};
        auto dur = chrono::duration_cast<chrono::milliseconds>(end_time - start_time);
        cout << "Execution Time: " << dur.count() << " ms.\n";
    }
};

int main() {
    {
        Timer t;
        this_thread::sleep_for(1s);
    }
}

使用 inspect 通过函数名获取类的成员函数

# 2025.04.08
# reflect_func.py python==3.10.4
import inspect

class Operator:

    def do_actions(self, actions):
        for act, *args in actions:
            func = getattr(self, f"_do_{act}", None) # 找到自己的成员函数
            
            if func:
                signature = inspect.signature(func)
                if len(signature.parameters) != len(args):
                    print(f"Signature not match: {signature} != {args}")
                else:
                    ret = func(*args)
                    print(f"{act}({args}) = {ret}")
            else:
                print(f"Cannot found function: {act}")

    def _do_add(self, x: int, y: int):
        return x + y

    def _do_subtract(self, x, y):
        return x - y

    def _do_self_increase(self, a):
        return a + 1


def do_actions(obj, actions):
    for act, *args in actions:
        find_result = inspect.getmembers(obj, predicate=lambda mem: inspect.ismethod(mem) and mem.__name__ == f"_do_{act}")
        if len(find_result) != 1:
            print(f"Cannot found function: {act}")
        else:
            func_name, func = find_result[0]
            signature = inspect.signature(func)
            if len(signature.parameters) != len(args):
                print(f"Signature not match: {signature} != {args}")
            else:
                ret = func(*args)
                print(f"{act}({args}) = {ret}")


actions = [("add", 1, 2), ("subtract", 3, 9), ("times", 3, 4), ("self_increase", 4), ("self_increase", 1, 3)]

## 在类的内部通过函数名调用具体函数
op = Operator()
op.do_actions(actions)
print("=" * 100)
## 在类的外部通过函数名调用具体函数
do_actions(op, actions)

使用 argparse 构建基础命令行工具

# 2025.04.08
# cmd.py python==3.10.4
import argparse
from pathlib import Path

parser = argparse.ArgumentParser(description="This is a basic argparse demo.")
# 给解析器增加一个接收文件的参数,设置类型为 Path 会自动解析 str 成 Path,指定这个参数是必填的
parser.add_argument("-f", "--file", type=Path, help="Get file path", required=True)
# 给解析器增加一个重复次数的参数,设置类型为 int,提供默认值为 2,可选范围设置为 1-4
parser.add_argument("-t", "--times", type=int, help="Repeat times", default=2, choices=[1, 2, 3, 4])

args = parser.parse_args()

for i in range(args.times):
    print(f"[{i+1}] : {args.file.absolute()}")

在 chrome 中利用 CSS 选择器获取 DOM 元素

// 2025.04.10
// test url: https://en.wikipedia.org/wiki/Main_Page
// 选择所有元素。
document.querySelectorAll("*")
// 按照给定的节点名称,选择所有匹配的元素
document.querySelectorAll("div")
// 按照给定的 class 属性的值,选择所有匹配的元素
document.querySelectorAll(".MainPageBG")
// 按照 id 属性选择一个与之匹配的元素
// ⚠️:虽然原则上id不能重复,但是出现重复id也不会报错
document.querySelectorAll("#mw-content-text")
// 按照给定的属性,选择所有匹配的元素
document.querySelectorAll("[style]") // 带有 style 属性的元素
document.querySelectorAll('[style*="display: none"]') // style 属性带有 display: none 的元素
// 组合,选择器之间不要包含空格
document.querySelectorAll('input[type=checkbox]')
document.querySelectorAll('input[type=checkbox].mw-interlanguage-selector')
// 逗号表示 或
document.querySelectorAll('input,div')

// 选择前一个元素的后代节点
document.querySelectorAll("body div")
// 选择前一个元素的直接子代的节点
document.querySelectorAll("body>div")

// 伪选择器
document.querySelectorAll(":enabled") // 选择启用的表单控件
document.querySelectorAll(':checked') // 表示任何处于选中状态的radio, checkbox, select

// 组合
document.querySelector(':checked').parentElement // 找到某个radio所在的父节点
document.querySelector('div:nth-child(1)') // 找到第一个div的第一个子元素

利用 openai 的接口调用LLM

# 2025.04.08
# test_conn.py python==3.10.4 openai==1.55.3
from openai import OpenAI
import os

api_key = os.getenv("OPENAI_API_KEY", "...")
base_url = os.getenv("OPENAI_BASE_URL", "http://xxx/v1")
model = "gpt-4o-mini"
# print(f"base_url: {base_url}, api_key: {api_key}")

client = OpenAI(api_key=api_key, base_url=base_url)
messages = []
messages.append({"role": "user", "content": "hello"})
resp = client.chat.completions.create(model=model, messages=messages)

print(f"User: {messages[0]['content']}")
print(f"AI: {resp.choices[0].message.content}")

About

bobozi's code cheatsheet

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published