Skip to main content

New Blog

代码人生:编织技术与生活的博客之旅

  • 显示最近创建的笔记,可以考虑是否要发布到博客
  • 频繁发布前 修改 '.vscode\settings.json' 下
    • '"noteSync.enableNoteSync": true, -> "noteSync.enableNoteSync": false,'

输入

  • 开始时间 (--since): 定义“最近”的时间起点,例如 "2025-01-01"
  • 过滤地址/路径 (path_filter): 限定笔记文件所在的目录,例如 note/docs/备忘/note/docs/备忘
  • 文件状态 (--diff-filter=A): 仅查找新增 (Added) 的文件。

git log 方式

方案一:Git Bash/Shell 命令行方式

这种方式简洁高效,但需要解决编码乱码路径去重的问题。

  • 基础 Git 命令(查找新增文件路径):
git log --since="2025-01-01" --name-status --pretty=format:'' \
| grep -E '^A' \
| awk '{print $2}' \
| grep -E '^(note|docs|备忘)/' \
| sort -u
  • 不支持 powershell

  • 编码配置

  • 为了解决 Git Bash/Shell 下中文路径或内容乱码问题:

git config --global core.quotepath false
git config --global i18n.commitencoding utf-8
git config --global i18n.logencoding utf-8
export LESSCHARSET=utf-8 # 针对 git log 等分页显示工具

git log --since="2025-01-01" --name-status --pretty=format:"COMMIT_TIME:%cd" --date=format:"%Y-%m-%d %H:%M:%S" |
Where-Object { $_ -match '^(COMMIT_TIME:|A\s+note\\docs)' } |
ForEach-Object {
if ($_ -match '^COMMIT_TIME:(.*)') {
$script:commitTime = $matches[1].Trim() # 记录当前提交时间
} else {
$filePath = ($_ -split '\s+', 2)[1] # 提取文件路径(兼容空格)
"$commitTime`t$filePath" # 时间+制表符+路径
}
} |
Group-Object -Property { $_ -split '\t' | Select-Object -Last 1 } | # 按路径去重
ForEach-Object { $_.Group[0] } |
Sort-Object # 按时间排序

  • 乱码 git log 后输出的应该是 utf8, 和后面的 windows 命令有关系导致不显示了。放弃。
  • 考虑到换环境,不想每次都重复这种操作,那么考虑做成环境设置脚本,既然都脚本了,不如 python 搞定。

Python 脚本方式

  • 还是乱码,又不想改默认环境,干脆换成 python 脚本
import subprocess
import re
from datetime import datetime
# 移除了 os, time 模块,因为不再需要文件系统操作

def decode_octal(data_str):
"""解码 Git 输出中的八进制编码。"""

def byte_creator(match):
octal_str = match.group(1)
decimal_val = int(octal_str, 8)
# 使用 latin-1 编码桥接,确保单字节字符正确返回
return bytes([decimal_val]).decode('latin-1')

intermediate_str = re.sub(r'\\(\d{3})', byte_creator, data_str)
final_bytes = intermediate_str.encode('latin-1')

# 最终使用 UTF-8 解码为中文
return final_bytes.decode('utf-8', errors='replace')

def get_git_info(since_date="2025-01-01", path_filter=None):
"""执行 Git 命令,获取指定日期至今的新增文件及其首次提交时间。"""

# 1. 运行 Git 命令获取新增文件列表、提交哈希和日期
GIT_FORMAT = '%H|%ad' # 格式:[哈希]|[作者日期]
try:
result = subprocess.run(
[
"git", "log", f"--since={since_date}",
"--diff-filter=A", "--name-status",
f"--pretty=format:{GIT_FORMAT}", "--date=format:%Y-%m-%d %H:%M:%S"
],
check=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)

output_data = result.stdout.decode('utf-8', errors='replace').strip()

except subprocess.CalledProcessError as e:
# 解码错误信息并打印
print(f"Git 命令执行失败:{e.stderr.decode('utf-8', errors='replace')}")
return []
except FileNotFoundError:
print("错误:未找到 Git 命令,请确保 Git 已安装并添加到系统环境变量")
return []

# 2. 解析输出,提取信息,并处理去重和过滤
file_info_map = {} # { 文件路径:{git_time, hash, status} }

current_hash = None
current_time = None

for line in output_data.split('\n'):
line = line.strip()

# 检查是否是提交信息行
if '|' in line and re.match(r'^[0-9a-f]{40}\|', line):
parts = line.split('|', 1)
current_hash = parts[0]
current_time = parts[1].strip()
continue

# 检查是否是文件状态行 (如 A\tpath/to/file)
if len(line) > 2 and line[1].isspace():
status = line[0]
file_path_encoded = line[2:].strip()
file_path = decode_octal(file_path_encoded)

# 仅处理新增文件 ('A')
if status != 'A':
continue

# 过滤功能:跳过不匹配路径的文件
# 注意:如果 path_filter 是 None 或空字符串,则不进行过滤
if path_filter and path_filter in file_path:
pass # 路径匹配,继续
elif path_filter:
continue # 路径不匹配,跳过

# 记录文件的首次提交信息 (由于 git log 默认是新的在前,第一次遇到即为最早记录)
if file_path not in file_info_map:
file_info_map[file_path] = {
"status": status,
"git_time": current_time,
"commit_hash": current_hash,
"path": file_path
}

# 3. 整理和排序结果
final_list = list(file_info_map.values())

# 按 Git 首次提交时间排序
return sorted(final_list, key=lambda x: x['git_time'])

# ------------------- 主执行逻辑 -------------------
if __name__ == "__main__":
# --- 配置项 ---
target_date = "2025-02-13" # 查询起始日期

# 路径过滤器:设置您想要包含的地址片段。
# 例如:如果要查找 'note/docs' 目录下的文件,设置为 'note/docs'
# 如果不需要过滤,设置为 None 或空字符串。
path_filter = "note/docs/备忘"
# -------------

print(f"正在查询 {target_date} 至今的 Git 新增文件。..")
if path_filter:
print(f"应用地址过滤器:'{path_filter}'")

new_files = get_git_info(target_date, path_filter)

if new_files:
print(f"\n 共找到 {len(new_files)} 个新增文件(去重后):")
print("-" * 65)
# 打印表头
print(f"{'No.':<4}{'状态':<4}{'Git 首次提交时间':<22}{'文件路径'}")
print("-" * 65)

for idx, file_info in enumerate(new_files, 1):
print(f"{idx:<4}{file_info['status']:<4}{file_info['git_time']:<22}{file_info['path']}")
print("-" * 65)
else:
print(f"\n 未找到 {target_date} 至今的新增文件,或所有文件已被过滤器排除。")

方案二:读 md 文件的 header

既然都读 python 文件了,那干脆读 md 文件的 header 信息,直接读取创建时间

  • 写一个 python 脚本
  • 读取指定文件夹下的所有 md 文件
  • 过滤 header 中 date 数据小于输入时间的文件
  • 排序由远到近输出到控制台
import os
import frontmatter # 新增导入
from datetime import datetime

# --- 配置区 ---
# 1. 待扫描的根文件夹路径
TARGET_DIR = "note\docs\备忘"
# 2. 筛选日期阈值(只保留日期晚于此时间的笔记)
FILTER_DATE_STR = "2025-04-20"
# ----------------

def parse_md_header_date(filepath: str) -> datetime | None:
"""
使用 'frontmatter' 库读取 Markdown 文件,提取并返回 date 字段的 datetime 对象。
"""
try:
post = frontmatter.load(filepath)

if 'date' not in post.metadata:
print(f"警告:文件 {filepath} 缺少 'date' 字段。")
return None

date_value = post.metadata['date']

# 检查是否已经是 datetime 对象(由 YAML 解析器自动处理)
if isinstance(date_value, datetime):
return date_value

# 否则,尝试将字符串转换为 datetime 对象
date_str = str(date_value).strip()

try:
return datetime.strptime(date_str, '%Y-%m-%d %H:%M:%S')
except ValueError:
try:
return datetime.strptime(date_str, '%Y-%m-%d')
except ValueError:
print(f"警告:文件 {filepath} 的 date 格式无法识别:'{date_str}'")
return None

except FileNotFoundError:
print(f"错误:文件未找到 {filepath}")
return None
except frontmatter.FrontmatterError as e:
# 当文件开头没有合法的 '---' 区块时会抛出此错误
# print(f"警告:文件 {filepath} 无法解析 YAML Front Matter: {e}")
return None
except Exception as e:
print(f"处理文件 {filepath} 时发生未知错误: {e}")
return None


def main():
"""主执行逻辑:扫描、筛选、排序并输出结果。"""

# 处理 TARGET_DIR 路径并获取绝对路径
absolute_target_dir = os.path.abspath(TARGET_DIR)

# 检查文件夹是否存在
if not os.path.isdir(absolute_target_dir):
print(f"错误:目标文件夹 '{TARGET_DIR}' (绝对路径: {absolute_target_dir}) 不存在。")
return

try:
# 将输入的筛选时间字符串转换为 datetime 对象
filter_date = datetime.strptime(FILTER_DATE_STR.split()[0], '%Y-%m-%d')
except ValueError:
print(f"错误:输入的筛选日期格式 '{FILTER_DATE_STR}' 无效。请使用 YYYY-MM-DD 格式。")
return

print(f"--- 笔记筛选工具 ---")
print(f"原始目标目录: {TARGET_DIR}")
print(f"**绝对路径**: {absolute_target_dir}")
print(f"筛选时间阈值 (晚于): {filter_date.strftime('%Y-%m-%d')}")
print("-" * 30)

filtered_notes = []

for root, _, files in os.walk(absolute_target_dir):
for filename in files:
if filename.endswith(('.md', '.markdown')):
filepath = os.path.join(root, filename)

# 使用优化的解析函数
note_date = parse_md_header_date(filepath)

if note_date and note_date > filter_date:
filtered_notes.append((note_date, filepath))

# 排序:按日期由远到近(升序)排序
filtered_notes.sort(key=lambda x: x[0])

# 输出结果
if filtered_notes:
print(f"\n✅ 找到 {len(filtered_notes)} 篇符合条件的笔记(日期晚于 {filter_date.strftime('%Y-%m-%d')}):")
print("-" * 60)
print(f"{'No.':<4}{'文件日期':<22}{'文件相对路径'}")
print("-" * 60)

for idx, (note_date, filepath) in enumerate(filtered_notes, 1):
relative_path = os.path.relpath(filepath, absolute_target_dir)
print(f"{idx:<4}{note_date.strftime('%Y-%m-%d'):<12}{relative_path}")
print("-" * 60)
else:
print("\n❌ 未找到符合条件的笔记。")

if __name__ == "__main__":
main()
Python6 min read

  • 批量转换视频

  • 有进度条

  • 其他格式的视频都支持

  • 转换好的视频用 "原视频名 - converted.mp4" 表示

  • pip install ffmpeg_progress_yield tqdm

gpu 加速检查

ffmpeg -encoders | findstr nvenc
ffmpeg -decoders | findstr cuda

完整代码

import os
from tqdm import tqdm
from ffmpeg_progress_yield import FfmpegProgress

dir_path = r"D:\Downloads\tab_recorder"

def convert_webm_to_mp4(filename):
"""转换单个 WEBM 文件为 MP4,返回转换是否成功"""

webm_path = os.path.join(dir_path, filename)
mp4_path = os.path.splitext(webm_path)[0] + " - converted.mp4"

# 检查是否已存在有效 MP4 文件
if os.path.exists(mp4_path) and os.path.getsize(mp4_path) > 0:
print(f"已跳过:{mp4_path} (已存在且有效)")
return

# fmt: off
# 构建 FFmpeg 命令
cmd = [
"ffmpeg",
"-hwaccel", "cuda",
"-i", webm_path,
"-c:v", "h264_nvenc", # "libx264",
"-preset", "p7", # NVENC 支持的高质量预设(替代 veryslow)
"-tune", "hq", # 高质量调优
# "-vf", "scale=iw:ih", # 保持原分辨率 不写就继承
# "-r", "30", # 设置帧率为 30fps 不写就继承
"-c:a", "aac", # 音频编码(确保兼容性)
"-ac", "2", # 声道数设置为 2(立体声)
"-ar", "48000", # 音频采样频率设置为 48000Hz
mp4_path,
]
# fmt: on

# 使用进度条执行转换
with FfmpegProgress(cmd) as ff:
with tqdm(total=100, desc=f"{filename} 转换进度", unit="%") as pbar:
for progress in ff.run_command_with_progress():
pbar.update(progress - pbar.n)

def main():
# 获取目录下所有 WEBM 文件
webm_files = [f for f in os.listdir(dir_path) if f.lower().endswith(".webm")]

# 遍历处理每个文件
for filename in tqdm(webm_files, desc="总进度", unit="个"):
# 执行转换
convert_webm_to_mp4(filename)

if __name__ == "__main__":
main()

项目2 min read

清理重复的日历事件

这个 Python 脚本旨在帮助你自动清理 Microsoft 365 账户中重复的日历事件。它通过并发请求的方式高效地识别并删除多余的日程,从而保持你的日历整洁有序。

主要功能:

  • 高效查重:脚本会获取指定月份的所有日历事件,并识别出那些在同一分钟内创建、且主题完全相同的重复事件。

  • 并发删除:它利用异步编程(asyncioaiohttp),以并发的方式批量发送删除请求,大大加快了处理速度。

  • 智能重试:脚本内置了针对 API 限流(429 错误)和令牌过期(401 错误)的重试机制,确保操作的稳定性和成功率。

  • ACCESS_TOKEN 需要自己弄,过期时间很短


import asyncio
import datetime
from collections import defaultdict

import aiohttp

# 您的访问令牌。请替换为实际获取到的 access token。
# 注意:在实际应用中,不应将令牌硬编码在这里,应动态获取。
ACCESS_TOKEN = ""

# Microsoft Graph API 的基本 URL
GRAPH_API_URL = "https://graph.microsoft.com/v1.0"

# 全局的访问令牌,在需要时可以更新
global_access_token = ACCESS_TOKEN

# 请求头,包含访问令牌
def get_headers():
return {
"Authorization": f"Bearer {global_access_token}",
"Content-Type": "application/json",
}

# 模拟令牌刷新函数,替换为你实际的获取令牌逻辑
async def refresh_access_token():
"""
模拟刷新访问令牌。
在实际应用中,这里应调用 _get_ms_token() 或其他刷新令牌的函数。
"""
print("访问令牌可能已过期,正在尝试刷新。..")
# 你的 _get_ms_token() 函数是同步的,为了在异步中使用,需要用 run_in_executor
# 但由于没有你的真实认证逻辑,这里仅用一个示例字符串代替
new_token = "新_访问_令牌" # _get_ms_token()
global global_access_token
global_access_token = new_token
print("令牌已刷新。")

async def get_calendar_events_by_month(session, start_date: datetime.datetime, end_date: datetime.datetime):
"""
按月份查询日历事件,并处理分页以获取所有结果。
"""
all_events = []
endpoint = f"{GRAPH_API_URL}/me/calendarview"
params = {
"startdatetime": start_date.isoformat(),
"enddatetime": end_date.isoformat(),
"$select": "subject,start,end,id",
}
url = endpoint

while url:
try:
headers = get_headers()
if url == endpoint:
async with session.get(url, headers=headers, params=params) as response:
response.raise_for_status()
data = await response.json()
else:
async with session.get(url, headers=headers) as response:
response.raise_for_status()
data = await response.json()

all_events.extend(data.get("value", []))
url = data.get("@odata.nextLink")
if url:
print(f"发现下一页数据,正在获取。.. {len(all_events)} 个事件")

except aiohttp.ClientResponseError as err:
print(f"查询日历事件失败:{err}")
return []
return all_events

def find_duplicate_events(events: list):
"""
识别同一分钟内主题完全相同的重复事件,并返回事件 ID、名称和日期。
"""
event_map = defaultdict(list)
for event in events:
start_time_str = event["start"]["dateTime"]
# 获取事件开始时间并精确到分钟
start_minute = start_time_str[:16]
# 获取事件日期
start_date = start_time_str.split("T")[0]
subject = event["subject"]
event_id = event["id"]
# 查重键只包括开始时间和主题
event_map[(start_minute, subject)].append({
"id": event_id,
"subject": subject,
"date": start_date
})

# 修改返回格式,包含所有需要的信息
duplicate_events = {key: ids for key, ids in event_map.items() if len(ids) > 1}
return duplicate_events

async def delete_event(session, event_info: dict, retry_count=3):
"""
异步地删除单个日历事件,并处理重试。

Args:
session: aiohttp.ClientSession 对象。
event_info: 包含 'id', 'subject', 'date' 的字典。
retry_count: 重试次数。
"""
event_id = event_info['id']
subject = event_info['subject']
event_date = event_info['date']
endpoint = f"{GRAPH_API_URL}/me/events/{event_id}"
headers = get_headers()

for attempt in range(retry_count):
try:
async with session.delete(endpoint, headers=headers) as response:
response.raise_for_status()
# 打印更详细的成功信息
print(f"事件 '{subject}' (ID: {event_id}, 日期:{event_date}) 删除成功。")
return True
except aiohttp.ClientResponseError as err:
# 处理 429 和 401 错误
if err.status == 429:
retry_after = int(err.headers.get("Retry-After", 5))
print(f"收到 429 错误,等待 {retry_after} 秒后重试。.. (第 {attempt + 1}/{retry_count} 次尝试)")
await asyncio.sleep(retry_after)
elif err.status == 401:
print(f"收到 401 错误,令牌可能已过期。正在刷新令牌。.. (第 {attempt + 1}/{retry_count} 次尝试)")
await refresh_access_token()
headers = get_headers() # 使用新令牌更新头部
else:
print(f"删除事件 '{subject}' (ID: {event_id}, 日期:{event_date}) 失败:{err}")
return False

print(f"删除事件 '{subject}' (ID: {event_id}, 日期:{event_date}) 失败,重试 {retry_count} 次后仍未成功。")
return False

async def async_main_process():
"""
主流程函数,按月份循环执行“查询 -> 识别 -> 删除”操作。
"""
year = 2024
months_to_process = list(range(1, 12))

async with aiohttp.ClientSession() as session:
for month in months_to_process:
start_of_month = datetime.datetime(year, month, 1, tzinfo=datetime.timezone.utc)
if month == 12:
end_of_month = datetime.datetime(year + 1, 1, 1, tzinfo=datetime.timezone.utc) - datetime.timedelta(
seconds=1)
else:
end_of_month = datetime.datetime(year, month + 1, 1, tzinfo=datetime.timezone.utc) - datetime.timedelta(
seconds=1)

print(f"\n--- 正在处理 {year}{month} 月 ---")

events_data = await get_calendar_events_by_month(session, start_of_month, end_of_month)

if not events_data:
print("当月没有找到事件或查询失败。")
continue

print(f"找到 {len(events_data)} 个事件。")
duplicate_events_map = find_duplicate_events(events_data)

if not duplicate_events_map:
print("没有发现重复事件。")
continue

print(f"找到 {len(duplicate_events_map)} 组重复事件。")

all_delete_tasks = []
for (minute, subject), event_list in duplicate_events_map.items():
print(f"发现重复事件:主题='{subject}', 时间='{minute}', 共 {len(event_list)} 个。")
# 保留第一个,删除后面的所有重复项
events_to_delete = event_list[1:]
print(f"将删除 {len(events_to_delete)} 个事件。")

for event_info in events_to_delete:
all_delete_tasks.append(delete_event(session, event_info))

print(f"\n--- 本月总计将并发删除 {len(all_delete_tasks)} 个事件 ---")

if all_delete_tasks:
await asyncio.gather(*all_delete_tasks)
else:
print("没有找到需要删除的事件。")

if __name__ == "__main__":
asyncio.run(async_main_process())

项目4 min read

  • 用于盗版游戏和未知安全工具的批量断网设置
@Echo Off
chcp 936
SetLocal EnableDelayedExpansion

echo 正在以当前权限运行防火墙规则创建。..
echo 如果看不到完整规则,请尝试以管理员身份运行
echo:

Set "Cmnd=netsh advfirewall firewall add rule action=block"

Set "TargetDir=E:\Games\CrackedGame"
For /R "%TargetDir%" %%a In (*.exe) Do (
For %%b In (in out) Do (
set "ruleName=blocked %%~fa %%b"
netsh advfirewall firewall show rule name="!ruleName!" >nul 2>&1
if !errorlevel! equ 0 (
echo 跳过已存在的规则【%%~fa】方向:%%b
) else (
echo 创建禁止 %%b 规则【%%~fa】
%Cmnd% name="!ruleName!" dir=%%b program="%%a"
if !errorlevel! equ 0 (
echo -- 成功创建规则:!ruleName!
) else (
echo -- 创建规则失败,请检查权限
)
)
)
)

echo:
echo 处理完成
echo ----------------------------
pause

管理员方式启动

  1. 创建快捷方式:按住Alt键拖动bat文件到目标位置
  2. 设置管理员运行
    • 右键快捷方式→属性→快捷方式选项卡
    • 点击“高级”,勾选“用管理员身份运行”
    • 点击“确定”
PythonOne min read

查看 API

from gradio_client import Client

# 连接到已部署的 Gradio 应用
client = Client("http://localhost:9872") # 替换为你的应用地址

config = client.view_api()
print(config)

批量调用

  • ValueError: File D:\data\音频 is not in the upload folder and cannot be accessed.
    • 看一下源代码,读取文件必须在它的特定目录下
  • 要关 vpn 或者修改代理
import os
import shutil

import tqdm
from gradio_client import Client

reference_audio_path = (
r"D:\data\音频\2025年05月14日 参考语言文本,需要大概十秒钟。.m4a"
)
reference_audio_text = "参考语言文本,需要大概十秒钟。"
reference_audio_tmp_path = r"D:\CodeProjects\GPT-SoVITS-v4-20250422fix\TEMP\gradio\4c841149931f32360e312b31e81a5f81549e5584\reference_audio.m4a"
os.makedirs(os.path.dirname(reference_audio_tmp_path), exist_ok=True)
if os.path.exists(reference_audio_tmp_path):
os.remove(reference_audio_tmp_path) # 删除文件
shutil.copy2(reference_audio_path, reference_audio_tmp_path) # 源路径和目标路径均为字符串

client = Client("http://localhost:9872")
output_dir = "output_audios"
os.makedirs(output_dir, exist_ok=True)

# 构造符合 Gradio 要求的文件字典(包含"path")
ref_wav_path = {
"name": reference_audio_path,
# 文件名(可选,用于显示)
"path": reference_audio_tmp_path
# 必需的文件路径
}

prompt_language = "中文"
text_language = "中文"
how_to_cut = "按中文句号。切"
sample_steps = 8

texts = [
"湘江北去,橘子洲头。看万山红遍,层林尽染;",
"漫江碧透,百舸争流。鹰击长空,鱼翔浅底,万类霜天竞自由。",
"怅寥廓,问苍茫大地,谁主沉浮?",
"携来百侣曾游。忆往昔峥嵘岁月稠。",
"恰同学少年,风华正茂;书生意气,挥斥方遒。",
"指点江山,激扬文字,粪土当年万户侯。",
"曾记否,到中流击水,浪遏飞舟?",
]

for i, text in tqdm.tqdm(enumerate(texts)):
print(f"生成第 {i + 1} 条音频...")

result = client.predict(
ref_wav_path, # 传递包含"path"的文件字典
reference_audio_text,
prompt_language,
text,
text_language,
how_to_cut,
15, # top_k
1.0, # top_p
1.0, # temperature
False, # ref_free
1.0, # speed
False, # if_freeze
None, # inp_refs(无多文件)
sample_steps,
False, # if_sr
0.3, # pause_second
api_name="/get_tts_wav",
)
print(result)
# copy file
ori_file = result
target_file = os.path.join(output_dir, f"{i} {text}.wav")
shutil.copy2(result, target_file) # 源路径和目标路径均为字符串
print(target_file)
项目2 min read

import os

target_dir = r"D:\CodeProjects\WG" # 目标目录(可改为 '.' 表示当前目录)

# 存储符合条件的文件路径
matching_files = []

# 递归查找符合条件的文件
for root, dirs, files in os.walk(target_dir):
for filename in files:
if filename.endswith(".md"):
file_path = os.path.join(root, filename)
try:
with open(file_path, "r", encoding="utf-8") as f:
content = f.read().strip()
if content.startswith("#") and "\n" not in content:
matching_files.append(file_path)
print(f"找到符合条件的文件:{file_path} - 内容: {content}")
except Exception as e:
print(f"读取 {file_path} 失败: {e}")

# 处理删除操作
if matching_files:
choice = input("\n输入 1 确认删除上述文件,输入其他数字取消:")
if choice == "1":
for file_path in matching_files:
try:
os.remove(file_path) # 直接删除文件
print(f"✅ 已删除:{file_path}")
except Exception as e:
print(f"❌ 删除失败 {file_path}: {e}")
print("\n操作完成,文件已永久删除。")
else:
print("操作已取消。")
else:
print("未找到符合条件的文件。")
PythonOne min read

大型语言模型 (LLM) 很强大,但与外部数据和工具交互不便。模型上下文协议 (MCP) 旨在解决此问题,让 AI 从“解答者”变为“执行者”。本文是 MCP 的快速入门笔记。

参考

MCP 解决了啥痛点? 🤔

  • 传统痛点:

    • LLM 无法感知实时数据、操作外部工具。
    • 手动为 LLM 准备上下文,效率低,自动化程度差。
    • 不同 LLM 平台的 Function Calling 实现不兼容。
  • MCP 方案:

    • MCP 提供标准接口(像 AI 的“USB-C”),让应用和 LLM 标准化地交换上下文(数据与工具调用)。
  • 图解 MCP

  • MCP 使 LLM 能按需、安全、灵活地使用外部工具和数据。

快速上手 MCP:Github MCP 示例 🚀

以孟哥博客中 Github MCP + Cursor (AI IDE) 为例:

  • Github 令牌 🔑

  • 路径:Github -> Settings -> Developer settings -> Personal access tokens -> Tokens (classic).

  • 权限:授予 repoworkflow

  • 注意: 妥善保管 Token。

  • 配置 MCP 服务器 (Cursor) ⚙️

    • 找到应用的 MCP 配置文件 (如 .mcp/config.json)。

    • 将 Github Access Token 添加进去。

    • 示例配置:

    {
    "inputs": [
    {
    "type": "promptString",
    "id": "github_token",
    "description": "GitHub Personal Access Token",
    "password": true
    }
    ],
    "servers": {
    "GitHub": {
    "command": "npx",
    "args": [
    "-y",
    "@modelcontextprotocol/server-github"
    ],
    "env": {
    "GITHUB_PERSONAL_ACCESS_TOKEN": "${input:github_token}"
    }
    }
    }
    }
  • 验证与使用 🗣️

    • 重启应用 (如 Cursor),检查 MCP 连接状态。
    • 成功后,即可用自然语言操作代码库。例如:“总结 feature-xyz 分支的提交”。
    • 测试问题:查找 我 github 上 crawler-utils 项目 下有什么内容

报错

Connection state: Error spawn npx ENOENT

{
// 💡 Inputs are prompted on first server start, then stored securely by VS Code.
"inputs": [
{
"type": "promptString",
"id": "perplexity-key",
"description": "Perplexity API Key",
"password": true
}
],
"servers": {
// https://github.com/ppl-ai/modelcontextprotocol/
"Perplexity": {
"type": "stdio",
"command": "npx",
"args": ["-y", "server-perplexity-ask"],
"env": {
"PERPLEXITY_API_KEY": "${input:perplexity-key}"
}
}
}
}
  • 本地启动不报错 npx -y @modelcontextprotocol/server-github
2025-05-15 17:59:37.349 [info] Connection state: Starting
2025-05-15 17:59:37.354 [info] Starting server from LocalProcess extension host
2025-05-15 17:59:37.366 [info] Connection state: Starting
2025-05-15 17:59:37.366 [info] Connection state: Error spawn npx ENOENT
  • 改成
{  
"servers": {
"github": {
"command": "cmd",
"args": ["/c","npx","-y", "@modelcontextprotocol/server-github"],
"env": {
"GITHUB_PERSONAL_ACCESS_TOKEN": "${input:github_token}"
}
}
}
}

报错 Server exited before responding to initialize request

2025-05-15 18:02:47.180 [info] Connection state: Running
2025-05-15 18:02:47.196 [warning] [server stderr] 'npx' �����ڲ����ⲿ���Ҳ���ǿ����еij���
2025-05-15 18:02:47.196 [warning] [server stderr] �����������
2025-05-15 18:02:47.200 [info] Connection state: Error Process exited with code 1
2025-05-15 18:02:47.200 [error] Server exited before responding to `initialize` request.
  • 首先直接搜搜不到结果
  • 然后本地启动也没问题
  • 然后想尝试看看错误日志是什么
    • 应该是 GBK 编码
    • 但是输出的时候应该已经丢失内容了
    • C:\Users\z\AppData\Roaming\Code\logs\20250515T181527\window1 下也不能看到原内容
  • 在 Trae 上有直接的 MCP 市场,可以直接配置成功,就放弃了
    • 等别人出现同样的问题把
编程4 min read

  • 本文记录了为解决 Clash 订阅更新问题所进行的一系列尝试和最终实现的自动化方案。
  • 主要目标是克服订阅链接中 token 频繁变化带来的不便,并最终实现近乎全自动化的 Clash 订阅更新。

背景与痛点 🤔

  • 使用某机场提供的 Clash 订阅服务,该服务通过订阅链接提供配置信息。

  • 需要在手机、电脑和服务器上使用 Clash。

  • 订阅链接中的 token 会定期更新,每次都需要手动更新所有设备的配置,令人苦不堪言 😫。

  • 核心需求:

    • 实现多设备共享 Clash 配置。
    • 终极目标: 尽可能减少手动操作,实现全自动更新。

初始方案与架构

1. 局域网共享 (临时方案) 🔗

  • 最初,为了在手机上使用 Clash,利用电脑作为中转。
  • 电脑运行 Clash 后,手机通过局域网连接到电脑的 Clash 端口。
  • 再由电脑代理流量。
  • 问题: 电脑需要保持开机状态,关机了,手机也没网了。自动化程度:低

2. 引入服务器 🖥️

  • 为了更稳定地使用,在服务器上部署了 Clash for Linux。
  • 所有设备都可以通过系统代理连接到服务器上的 Clash,再由服务器连接到机场。
  • 小服务器 24 小时在线~
  • 问题:
    • 仍然需要手动更新所有设备的 Clash 配置。
    • 小服务定时脚本更新 Clash Docker,但无法应对订阅链接的变动。
    • 还需要手动重启 Docker。
    • 自动化程度:中(服务器 Clash 更新)。

解决 Token 动态变化:Cloudflare Worker 的尝试与转向 ☁️

  • 有了 Cloudflare 能解决本地网络访问不到的问题

  • 使用预设的固定 token 替换请求中的 token 参数,

    • 不妨碍其他小伙伴使用这个服务。
  • 并修改 Clash 配置文件的部分内容。

  • 在编写 Cloudflare Worker 脚本的时候就想过包括登录,

    • 但是碰到了 ja3 指纹的问题,
    • 所以放弃,这个东西用 Python 也很难解决。

Cloudflare Worker 的主要功能:

  • 接收客户端发出的订阅请求,避免客户端访问不到机场的问题。
  • 如果是特定的 key 请求,则将其替换为预设的 token,减少多端改动。
  • 修改从机场获取的 Clash 配置文件内容,例如:
    • 设置 interval180 秒,提高更新频率。
    • 添加 tolerance: 2000,增强更新的稳定性。
  • 伪装 User-Agent,避免订阅接口的拦截。
  • 将修改后的 Clash 配置文件返回给客户端。

Cloudflare Worker 脚本:

export default {
async fetch(request, env, ctx) {
const url = new URL(request.url);
var { pathname, search } = url;
if (search === "?token=KEYTOKEN") {
search = "?token=ASpecialToken";
}

const base_url = `https://bbb.aaa.com/api/v1/client/subscribe${search}`;
const modifiedHeaders = new Headers(request.headers);
modifiedHeaders.set('User-Agent', 'ClashForWindows/0.15.10');
const modifiedRequest = new Request(request, { headers: modifiedHeaders });

const image_response = await fetch(base_url, modifiedRequest);
const body = await image_response.text();
const modifiedYaml = body.replace(/(interval:)\s*\d+/g, '$1 180');
const modifiedYamlWithTolerance = body.replace(/(interval: 180)/g, '$1, tolerance: 2000');

return new Response(modifiedYamlWithTolerance, {
status: image_response.status,
statusText: image_response.statusText,
headers: image_response.headers,
});
},
};

优势:

  • 无需在客户端设备上直接配置机场订阅链接。
  • 只需更新 Worker 上的 token 即可同步所有设备的配置。
  • 隐藏了真实的订阅链接,提高了隐私性。

不足:

  • 仍然需要手动获取最新的有效 token 并更新到 Worker。
  • 还必须手动执行以下操作来完成整个更新流程:
    • 打开网页
    • 登录
    • 获取 token
    • 打开 Cloudflare 网站
    • ⭐ 等待 Cloudflare 加载
    • 更新 token
    • 打开 ssh 软件 tabby
    • 连接到 Linux 服务器。
    • 输入 运行脚本的命令
    • 输入 sudo 密码
    • 关闭 SSH 软件
  • 好麻烦 😫 自动化程度:低(Token 更新)。

自动化获取 Token:Selenium 的应用 ⚙️

  • 为了自动化获取最新的 token,
  • 编写了一个 Python 脚本,
  • 使用 Selenium 模拟浏览器登录机场并提取包含 token 的订阅链接。
  • 通过 Cloudflare API 更新 Worker 的环境变量
  • 最后通过 SSH 连接到服务器,重启 Docker 服务。
  • 能自动一点是一点 👍

遇到的小问题:

  • ChromeDriver 重启后需要重新下载,且下载慢

缓存 ChromeDriver 的 Python 代码:

def get_selenium_driver(headless=False):
driver_path = os.path.join(conf.SELENIUM_DRIVER_DOWNLOAD_DIR, "chromedriver.exe")
move_driver = False
if os.path.exists(driver_path):
service = Service(executable_path=driver_path)
else:
move_driver = True
service = None
user_cookies = os.path.join(
os.path.expanduser("~"), r"AppData\Local\Google\Chrome\User Data"
)
option = webdriver.ChromeOptions()
option.add_argument("--no-sandbox")
option.add_argument("--disable-dev-shm-usage")
option.add_argument("--user-data-dir={}".format(user_cookies))
if headless:
option.add_argument("--headless")

chrom_driver = webdriver.Chrome(options=option, service=service)
if move_driver:
shutil.copy2(chrom_driver.service.path, conf.SELENIUM_DRIVER_DOWNLOAD_DIR)
return chrom_driver

自动化更新 Cloudflare Worker Token 🔄

  • xpath 部分 略过
  • 获取到最新的 token 后,需要将其更新到 Cloudflare Worker 的环境变量中。

遇到的问题:

  • Cloudflare API 限制:
    • Cloudflare 的 HTTP API 无法直接更新 Worker 的环境变量。
    • 要用他们自己的工具 wrangler
    • wrangler 是 Cloudflare 提供的命令行工具,还需要 nodejs。
  • fnm 执行问题:
    • 使用 FNM 管理 Node.js 版本。
    • 但直接通过 Python 执行 fnm exec npx wrangler secret put 命令更新 token 失败。
    • 好像是环境变量问题。

解决方案:

  • 直接调用 Node.js 解释器来执行 wrangler 命令,从而更新 Worker 的环境变量。

Python 代码:

def update_cloudflare_worker_env(token):
api_token = os.environ["CLOUDFLARE_API_TOKEN"]
script_name = os.environ["script_name"]
env_name = os.environ["env_name"]
node_exe = os.environ["node_exe"] # where.exe node
wrangler_path = os.environ["wrangler_path"] # 安装 wrangler 后在当前目录下 找 wrangler.js
os.environ["CLOUDFLARE_API_TOKEN"] = api_token
command = f"echo {token} | {node_exe} {wrangler_path} secret put {env_name} --name {script_name}"
result = subprocess.run(command, shell=True, capture_output=True, text=True, encoding="utf-8")
print("Delete Output:", result.stdout)
if result.stderr:
print("Error:", result.stderr)

自动化更新 Clash 配置并重启服务 🛠️

  • 最后一步是在服务器上自动更新 Clash 的配置文件并重启 Clash 服务。编写了一个 update.sh 脚本来完成这些操作。

  • 遇到的问题:sudo 权限

  • 执行 update.sh 脚本需要 sudo 权限,使用 paramiko 库通过 SSH 连接到服务器并执行带 sudo 权限的命令。

Python 代码:

def run_restart_sh():
command = "sudo bash /home/root/server/update.sh "
l_password = os.environ["server_passwd"]
l_host = os.environ["server_ip"]
l_user = os.environ["server_username"]
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
ssh.connect(l_host, username=l_user, password=l_password)
transport = ssh.get_transport()
session = transport.open_session()
session.set_combine_stderr(True)
session.get_pty()
session.exec_command(f"{command}")
stdin = session.makefile("wb", -1)
stdout = session.makefile("rb", -1)
stdin.write(l_password + "\n")
stdin.flush()

for line in stdout.read().decode("utf-8").splitlines():
print(f"host: {l_host}: {line}")
ssh.close()

总结:自动化升级,解放双手 🎉

  • 通过以上步骤,最终实现了一键运行,所有设备都能自动更新 Clash 订阅的操作。
  • 虽然当前的方案需要在电脑上运行脚本触发更新,但已经极大地减少了手动操作,提升了用户体验。
  • 要是有过 Ja3 和谷歌验证的方案。可以考虑将整个流程部署到服务器上,并设置定时任务。
编程7 min read

感觉 docker 版本不支持 supervisor 和 add-ons 功能怪怪的。也没啥特别好玩的地方,同时也不自持触发,米家本身设置的事件。而且这样子安全问题感觉也增加了,没太大必要折腾,可以关注下跟新日志,等功能完善了再玩。

home-assistant docker 安装

版本区别

  • home-assistant/docker-compose.yml
version: '3'
services:
homeassistant:
container_name: home-assistant
image: ghcr.io/home-assistant/home-assistant:latest
volumes:
- ./config:/config # 持久化配置文件,请替换为实际路径
- /etc/localtime:/etc/localtime:ro # 同步主机时区
environment:
- TZ=Asia/Shanghai # 设置时区,根据需要修改
ports:
- "8123:8123" # 映射端口
restart: unless-stopped
network_mode: host # 使用主机网络模式

ha_xiaomi_home 插件安装

Download and copy custom_components/xiaomi_home folder to config/custom_components folder in your Home Assistant.

wget -e http_proxy=http://localhost:9999 -e https_proxy=http://localhost:9999 https://github.com/XiaoMi/ha_xiaomi_home/archive/refs/heads/main.zip

# 解压 ZIP 文件
unzip main.zip

# 将插件文件夹移动到 custom_components 目录
sudo cp -r ha_xiaomi_home-main/custom_components ./config/

# 清理下载的文件
sudo rm -rf ha_xiaomi_home-main main.zip

# 递归设置 custom_components 目录的权限
# sudo chown -R homeassistant:homeassistant custom_components/xiaomi_home

# 使用 systemd 管理的 Home Assistant
sudo docker compose restart

ha 里安装 node-red 插件

取消 小米授权

  • 访问 小米云服务
    • 绑定授权
    • Xiaomi Home (Home Assistant Integration)
    • 取消授权
随笔2 min read


相关

  • 想法起源 别人的博客, 看起来很酷,还能显示当前人数。
  • 在 docusaurus 或者 markdown 中博客展示 google analysis 数据
  • 目前好像没有相关项目 可以做个类似 display card 的东西
  • 使用默认的 google analytics 查看数据
  • lookerstudio 可以调用 google analytics 并展示,但是自带的模板库没有使用最新的 API。
    • 报告地址
    • embed
    • 右上角分享 - 复制代码后 需要删除部分代码
    • 暂时不能实现的功能
      • 不能显示当前再看的人数
<!-- 代码参考 -->
<iframe width="800" height="600" src="https://lookerstudio.google.com/embed/reporting/6054b482-a054-42f9-870d-3fe15faeb78f/page/IpdTE">

编程One min read