Skip to main content

8 posts tagged with "项目"

View All Tags

  • 批量转换视频

  • 有进度条

  • 其他格式的视频都支持

  • 转换好的视频用 "原视频名 - converted.mp4" 表示

  • pip install ffmpeg_progress_yield tqdm

gpu 加速检查

ffmpeg -encoders | findstr nvenc
ffmpeg -decoders | findstr cuda

完整代码

import os
from tqdm import tqdm
from ffmpeg_progress_yield import FfmpegProgress

dir_path = r"D:\Downloads\tab_recorder"

def convert_webm_to_mp4(filename):
"""转换单个 WEBM 文件为 MP4,返回转换是否成功"""

webm_path = os.path.join(dir_path, filename)
mp4_path = os.path.splitext(webm_path)[0] + " - converted.mp4"

# 检查是否已存在有效 MP4 文件
if os.path.exists(mp4_path) and os.path.getsize(mp4_path) > 0:
print(f"已跳过:{mp4_path} (已存在且有效)")
return

# fmt: off
# 构建 FFmpeg 命令
cmd = [
"ffmpeg",
"-hwaccel", "cuda",
"-i", webm_path,
"-c:v", "h264_nvenc", # "libx264",
"-preset", "p7", # NVENC 支持的高质量预设(替代 veryslow)
"-tune", "hq", # 高质量调优
# "-vf", "scale=iw:ih", # 保持原分辨率 不写就继承
# "-r", "30", # 设置帧率为 30fps 不写就继承
"-c:a", "aac", # 音频编码(确保兼容性)
"-ac", "2", # 声道数设置为 2(立体声)
"-ar", "48000", # 音频采样频率设置为 48000Hz
mp4_path,
]
# fmt: on

# 使用进度条执行转换
with FfmpegProgress(cmd) as ff:
with tqdm(total=100, desc=f"{filename} 转换进度", unit="%") as pbar:
for progress in ff.run_command_with_progress():
pbar.update(progress - pbar.n)

def main():
# 获取目录下所有 WEBM 文件
webm_files = [f for f in os.listdir(dir_path) if f.lower().endswith(".webm")]

# 遍历处理每个文件
for filename in tqdm(webm_files, desc="总进度", unit="个"):
# 执行转换
convert_webm_to_mp4(filename)

if __name__ == "__main__":
main()

项目2 min read

清理重复的日历事件

这个 Python 脚本旨在帮助你自动清理 Microsoft 365 账户中重复的日历事件。它通过并发请求的方式高效地识别并删除多余的日程,从而保持你的日历整洁有序。

主要功能:

  • 高效查重:脚本会获取指定月份的所有日历事件,并识别出那些在同一分钟内创建、且主题完全相同的重复事件。

  • 并发删除:它利用异步编程(asyncioaiohttp),以并发的方式批量发送删除请求,大大加快了处理速度。

  • 智能重试:脚本内置了针对 API 限流(429 错误)和令牌过期(401 错误)的重试机制,确保操作的稳定性和成功率。

  • ACCESS_TOKEN 需要自己弄,过期时间很短


import asyncio
import datetime
from collections import defaultdict

import aiohttp

# 您的访问令牌。请替换为实际获取到的 access token。
# 注意:在实际应用中,不应将令牌硬编码在这里,应动态获取。
ACCESS_TOKEN = ""

# Microsoft Graph API 的基本 URL
GRAPH_API_URL = "https://graph.microsoft.com/v1.0"

# 全局的访问令牌,在需要时可以更新
global_access_token = ACCESS_TOKEN

# 请求头,包含访问令牌
def get_headers():
return {
"Authorization": f"Bearer {global_access_token}",
"Content-Type": "application/json",
}

# 模拟令牌刷新函数,替换为你实际的获取令牌逻辑
async def refresh_access_token():
"""
模拟刷新访问令牌。
在实际应用中,这里应调用 _get_ms_token() 或其他刷新令牌的函数。
"""
print("访问令牌可能已过期,正在尝试刷新。..")
# 你的 _get_ms_token() 函数是同步的,为了在异步中使用,需要用 run_in_executor
# 但由于没有你的真实认证逻辑,这里仅用一个示例字符串代替
new_token = "新_访问_令牌" # _get_ms_token()
global global_access_token
global_access_token = new_token
print("令牌已刷新。")

async def get_calendar_events_by_month(session, start_date: datetime.datetime, end_date: datetime.datetime):
"""
按月份查询日历事件,并处理分页以获取所有结果。
"""
all_events = []
endpoint = f"{GRAPH_API_URL}/me/calendarview"
params = {
"startdatetime": start_date.isoformat(),
"enddatetime": end_date.isoformat(),
"$select": "subject,start,end,id",
}
url = endpoint

while url:
try:
headers = get_headers()
if url == endpoint:
async with session.get(url, headers=headers, params=params) as response:
response.raise_for_status()
data = await response.json()
else:
async with session.get(url, headers=headers) as response:
response.raise_for_status()
data = await response.json()

all_events.extend(data.get("value", []))
url = data.get("@odata.nextLink")
if url:
print(f"发现下一页数据,正在获取。.. {len(all_events)} 个事件")

except aiohttp.ClientResponseError as err:
print(f"查询日历事件失败:{err}")
return []
return all_events

def find_duplicate_events(events: list):
"""
识别同一分钟内主题完全相同的重复事件,并返回事件 ID、名称和日期。
"""
event_map = defaultdict(list)
for event in events:
start_time_str = event["start"]["dateTime"]
# 获取事件开始时间并精确到分钟
start_minute = start_time_str[:16]
# 获取事件日期
start_date = start_time_str.split("T")[0]
subject = event["subject"]
event_id = event["id"]
# 查重键只包括开始时间和主题
event_map[(start_minute, subject)].append({
"id": event_id,
"subject": subject,
"date": start_date
})

# 修改返回格式,包含所有需要的信息
duplicate_events = {key: ids for key, ids in event_map.items() if len(ids) > 1}
return duplicate_events

async def delete_event(session, event_info: dict, retry_count=3):
"""
异步地删除单个日历事件,并处理重试。

Args:
session: aiohttp.ClientSession 对象。
event_info: 包含 'id', 'subject', 'date' 的字典。
retry_count: 重试次数。
"""
event_id = event_info['id']
subject = event_info['subject']
event_date = event_info['date']
endpoint = f"{GRAPH_API_URL}/me/events/{event_id}"
headers = get_headers()

for attempt in range(retry_count):
try:
async with session.delete(endpoint, headers=headers) as response:
response.raise_for_status()
# 打印更详细的成功信息
print(f"事件 '{subject}' (ID: {event_id}, 日期:{event_date}) 删除成功。")
return True
except aiohttp.ClientResponseError as err:
# 处理 429 和 401 错误
if err.status == 429:
retry_after = int(err.headers.get("Retry-After", 5))
print(f"收到 429 错误,等待 {retry_after} 秒后重试。.. (第 {attempt + 1}/{retry_count} 次尝试)")
await asyncio.sleep(retry_after)
elif err.status == 401:
print(f"收到 401 错误,令牌可能已过期。正在刷新令牌。.. (第 {attempt + 1}/{retry_count} 次尝试)")
await refresh_access_token()
headers = get_headers() # 使用新令牌更新头部
else:
print(f"删除事件 '{subject}' (ID: {event_id}, 日期:{event_date}) 失败:{err}")
return False

print(f"删除事件 '{subject}' (ID: {event_id}, 日期:{event_date}) 失败,重试 {retry_count} 次后仍未成功。")
return False

async def async_main_process():
"""
主流程函数,按月份循环执行“查询 -> 识别 -> 删除”操作。
"""
year = 2024
months_to_process = list(range(1, 12))

async with aiohttp.ClientSession() as session:
for month in months_to_process:
start_of_month = datetime.datetime(year, month, 1, tzinfo=datetime.timezone.utc)
if month == 12:
end_of_month = datetime.datetime(year + 1, 1, 1, tzinfo=datetime.timezone.utc) - datetime.timedelta(
seconds=1)
else:
end_of_month = datetime.datetime(year, month + 1, 1, tzinfo=datetime.timezone.utc) - datetime.timedelta(
seconds=1)

print(f"\n--- 正在处理 {year}{month} 月 ---")

events_data = await get_calendar_events_by_month(session, start_of_month, end_of_month)

if not events_data:
print("当月没有找到事件或查询失败。")
continue

print(f"找到 {len(events_data)} 个事件。")
duplicate_events_map = find_duplicate_events(events_data)

if not duplicate_events_map:
print("没有发现重复事件。")
continue

print(f"找到 {len(duplicate_events_map)} 组重复事件。")

all_delete_tasks = []
for (minute, subject), event_list in duplicate_events_map.items():
print(f"发现重复事件:主题='{subject}', 时间='{minute}', 共 {len(event_list)} 个。")
# 保留第一个,删除后面的所有重复项
events_to_delete = event_list[1:]
print(f"将删除 {len(events_to_delete)} 个事件。")

for event_info in events_to_delete:
all_delete_tasks.append(delete_event(session, event_info))

print(f"\n--- 本月总计将并发删除 {len(all_delete_tasks)} 个事件 ---")

if all_delete_tasks:
await asyncio.gather(*all_delete_tasks)
else:
print("没有找到需要删除的事件。")

if __name__ == "__main__":
asyncio.run(async_main_process())

项目4 min read

查看 API

from gradio_client import Client

# 连接到已部署的 Gradio 应用
client = Client("http://localhost:9872") # 替换为你的应用地址

config = client.view_api()
print(config)

批量调用

  • ValueError: File D:\data\音频 is not in the upload folder and cannot be accessed.
    • 看一下源代码,读取文件必须在它的特定目录下
  • 要关 vpn 或者修改代理
import os
import shutil

import tqdm
from gradio_client import Client

reference_audio_path = (
r"D:\data\音频\2025年05月14日 参考语言文本,需要大概十秒钟。.m4a"
)
reference_audio_text = "参考语言文本,需要大概十秒钟。"
reference_audio_tmp_path = r"D:\CodeProjects\GPT-SoVITS-v4-20250422fix\TEMP\gradio\4c841149931f32360e312b31e81a5f81549e5584\reference_audio.m4a"
os.makedirs(os.path.dirname(reference_audio_tmp_path), exist_ok=True)
if os.path.exists(reference_audio_tmp_path):
os.remove(reference_audio_tmp_path) # 删除文件
shutil.copy2(reference_audio_path, reference_audio_tmp_path) # 源路径和目标路径均为字符串

client = Client("http://localhost:9872")
output_dir = "output_audios"
os.makedirs(output_dir, exist_ok=True)

# 构造符合 Gradio 要求的文件字典(包含"path")
ref_wav_path = {
"name": reference_audio_path,
# 文件名(可选,用于显示)
"path": reference_audio_tmp_path
# 必需的文件路径
}

prompt_language = "中文"
text_language = "中文"
how_to_cut = "按中文句号。切"
sample_steps = 8

texts = [
"湘江北去,橘子洲头。看万山红遍,层林尽染;",
"漫江碧透,百舸争流。鹰击长空,鱼翔浅底,万类霜天竞自由。",
"怅寥廓,问苍茫大地,谁主沉浮?",
"携来百侣曾游。忆往昔峥嵘岁月稠。",
"恰同学少年,风华正茂;书生意气,挥斥方遒。",
"指点江山,激扬文字,粪土当年万户侯。",
"曾记否,到中流击水,浪遏飞舟?",
]

for i, text in tqdm.tqdm(enumerate(texts)):
print(f"生成第 {i + 1} 条音频...")

result = client.predict(
ref_wav_path, # 传递包含"path"的文件字典
reference_audio_text,
prompt_language,
text,
text_language,
how_to_cut,
15, # top_k
1.0, # top_p
1.0, # temperature
False, # ref_free
1.0, # speed
False, # if_freeze
None, # inp_refs(无多文件)
sample_steps,
False, # if_sr
0.3, # pause_second
api_name="/get_tts_wav",
)
print(result)
# copy file
ori_file = result
target_file = os.path.join(output_dir, f"{i} {text}.wav")
shutil.copy2(result, target_file) # 源路径和目标路径均为字符串
print(target_file)
项目2 min read

  • 这是网盘爬取的部分记录,很多坑,分享一下避免大家采坑。
  • 后续发现网盘有网页版本,在安装能 root 的 Android 模拟器后,放弃通过 app 爬。

Fiddler

教程

  • 电脑管家下的 Fiddler 4.0
    • Fiddler Anywhere 只找到苹果的破解版
  • 允许远程连接
    • 更改 fiddler 端口后要重启软件
  • 手机设置
    • 证书安装
    • 卸载方式
      • 搜索 加密与凭据
      • 用户凭据 找到自己命名的凭据 (fiddler) 点击删除

打开网盘后 无连接

走 fiddler 后,app 打开显示无网络。用 adb 查看日志,找到如下错误:

06-06 19:09:03.624 28223 28223 W System.err: javax.net.ssl.SSLHandshakeException: java.security.cert.CertPathValidatorException: Trust anchor for certification path not found.

是因为,fiddler 装的证书不是根证书,app 不认可。 改为弄根证书需要手机 root,懒得弄了。

装安卓虚拟机

考虑使用安卓模拟器

  • 夜神模拟器
    • hyper-v 冲突
  • bluestacks
  • 天天模拟器
    • 启动不了
    • 安装时蓝屏

卸载某个模拟器后 dll 乱了

无效的 wechatwin.dll 文件 errcode:126, 点击“确定”下载最新版本

  • 微信打不开 怀疑是某个模拟器卸载的时候 把系统的 dll 一起
  • 修复方法参考

按 Win + S 键搜索栏输入 CMD,以管理员身份打开 “命令提示符” 输入命令:

sfc /SCANNOW

等待扫描结束:

Dism /Online /Cleanup-Image /ScanHealth

等进度条走完,再次打开微信,看是否能够正常打开。如果能正常打开,重启电脑再看是否能打开微信,如果重启之后还能正常打开微信等其它软件,说明此问题就修复了。

Hyper-V + Visual Studio 跑 Android 模拟器

参考 用 Xamarin 的模拟器。

安装 Visual Studio 2022 一样可以

  • 打开 SDK 管理器后 提示:
    • Loading SDK component information failed. Please retry
    • 加载 SDK 组件信息失败 请重试
    • must not be an empty string forPath

通过点击:工具 - 选项 - 搜索 Android - Xamarin Android 设置 点击确定后就能找找到位置了。

下载 SDK 需要全局翻墙。

  • 爬完后注意存档位置 方便后续删除
    • C:\Users\xxx\AppData\Local\Xamarin\Mono for Android\Archives

如此,终于有一个能用的,和 hyper-v 不冲突的 Android 模拟器了。

到这里后,发现有网页版本,满足需求,放弃了 app 爬取。


询问大佬们的解决方案

项目3 min read

lz18届毕业生一枚,工作有段时间了,计科专业。 上班的时候有很多想法,但没有时间实现。所以组个纯网络的兴趣小组,想把想法变现。

社团性质:

  • 纯网络兴趣小组(也就是没有工作室啥的,靠自己)
  • 纯公益社团(项目公益,不向社员收费同时也不赚钱,有能力可以靠自己接case)
  • python语言向(因为lz只会py编程语言,所以只能给这个支持)

小组成员事物内容:

  • 学习编程(偏python,因为lz只能给这方面的指导。。。)
  • 实现各种各样的idea(流程一般是:提出想法,大家讨论可行性,组队,实现想法)

小组需要:

  • 对编程有兴趣的人(估摸着应该靠自学,编程没兴趣学不下去的)
  • 有一定的经济基础(自己的电脑得跑得起编辑器把,租个服务器或买个树莓派啥的,但不会有小组收钱的行为)
  • 有时间(课程不紧,闲暇时间比较多)
  • 能自己写总结(分享型小组嘛)

加分项

  • 有一定编程基础
  • 计算机专业大二大三
  • 有一定的英语能力(编程,英语还是很重要的)

相信有些人觉得在学校里学的东西出来难找工作,我觉得做些开源项目会对以后找ITl类工作or实习有帮助,不过也不绝对,还是得靠自己努力。

Q群号: 870632533 欢迎转给身边人

项目2 min read

项目介绍

通过Chrome与selenium 将网易云私信中歌曲放到一个歌单中,方便实时收听新歌。 我们可以直接用已登录的Chrome(保存有网易云网站登录态),模拟用户操作,收藏新歌。

image.png


关键代码

  • 初始化selenium
def init_driver():    
executable_path = "chromedriver"
if not os.path.exists("chromedriver.exe"): # 无驱则需要下载
# 本地项目 复制驱动
_path = os.path.abspath(__file__)
if not "crawler_set" in _path:
root_path = _path[:_path.index("crawler_set") + len("crawler_set")]
executable_path = os.path.join(root_path, "driver", "chromedriver.exe")
else: # 单个文件 下载驱动
url = "https://raw.githubusercontent.com/AngusWG/crawler_set/master/driver/chromedriver.exe"
proxies = {
"http": "socks5://127.0.0.1:1080",
"https": "socks5://127.0.0.1:1080"
}
r = requests.get(url, proxies=proxies)
with open("chromedriver.exe", "wb") as code:
content_size = int(r.headers['content-length']) # 内容体总大小
for data in tqdm(iterable=r.iter_content(1024), total=content_size, unit="k", desc="下载驱动"):
code.write(data)

user_cookies = "".join([os.path.expanduser('~'), r"\AppData\Local\Google\Chrome\User Data"])

option = webdriver.ChromeOptions()
option.add_argument("--user-data-dir={}".format(user_cookies)) # 设置成用户自己的数据目录

try:
driver = webdriver.Chrome(executable_path, options=option)
driver.implicitly_wait(5)
return driver
except WebDriverException:
print("请先关掉所有的Chrome")
exit(-2)

  • 收藏歌曲
def save_song(url):    
print("[{}] start".format(url), end=" ")
driver.get(url)
driver.switch_to.frame("contentFrame")
title = driver.find_element_by_xpath('//div[contains(@class, "tit")]').text
for word in shielding_words:
if word in title:
print("{} 因 {} 已忽略".format(title, word))
return
driver.find_element_by_xpath('//*[contains(text(), "收藏")]').click()
driver.find_element_by_xpath('//*[contains(text(), "{}")]'.format(song_dir)).click()
print(title)


全部代码

#!/usr/bin/python3    
# encoding: utf-8
# @Time : 2019/12/19 9:36
# @author : zza
# @Email : 740713651@qq.com
# @File : 将网易云的私信音乐整理.py
"""
该脚本会保存每天新私信里最后几首歌曲到tmp_save_dir
1.在Chrome上登录自己的网易云帐号t
2.创建tmp_save_dir
然后运行脚本
"""
import os

import requests
from selenium import webdriver
from selenium.common.exceptions import WebDriverException, NoSuchElementException
from tqdm import tqdm

song_dir = "tmp_save_dir"
# 屏蔽关键词
shielding_words = ['伴奏']


def init_driver():
executable_path = "chromedriver"
if not os.path.exists("chromedriver.exe"): # 无驱则需要下载
# 本地项目 复制驱动
_path = os.path.abspath(__file__)
if not "crawler_set" in _path:
root_path = _path[:_path.index("crawler_set") + len("crawler_set")]
executable_path = os.path.join(root_path, "driver", "chromedriver.exe")
else: # 单个文件 下载驱动
url = "https://raw.githubusercontent.com/AngusWG/crawler_set/master/driver/chromedriver.exe"
proxies = {
"http": "socks5://127.0.0.1:1080",
"https": "socks5://127.0.0.1:1080"
}
r = requests.get(url, proxies=proxies)
with open("chromedriver.exe", "wb") as code:
content_size = int(r.headers['content-length']) # 内容体总大小
for data in tqdm(iterable=r.iter_content(1024), total=content_size, unit="k", desc="下载驱动"):
code.write(data)

user_cookies = "".join([os.path.expanduser('~'), r"\AppData\Local\Google\Chrome\User Data"])

option = webdriver.ChromeOptions()
option.add_argument("--user-data-dir={}".format(user_cookies)) # 设置成用户自己的数据目录

try:
driver = webdriver.Chrome(executable_path, options=option)
driver.implicitly_wait(5)
return driver
except WebDriverException:
print("请先关掉所有的Chrome")
exit(-2)


def get_private_detail():
# 点击私信后
driver.get("https://music.163.com/#/msg/m/private")
driver.switch_to.frame("contentFrame")
new_msg_items = driver.find_elements_by_xpath('//i[@class="u-bub"]/b[@class="f-alpha"]/..//parent::*//a')
private_detail_url_dict = dict()
for i in new_msg_items:
_, singer_id = i.get_attribute("href").split("?")
uri = "https://music.163.com/#/msg/m/private_detail?" + singer_id
msg_num = int(i.find_element_by_xpath("../i/em").text)
private_detail_url_dict[uri] = msg_num
return private_detail_url_dict


def get_song_url_from_album_set(url):
song_set = set()
# 歌曲页面保存
driver.get(url)
driver.switch_to.frame("contentFrame")
url_list = driver.find_elements_by_xpath('//a[contains(@href, "/song?id")]')
for item in url_list:
if "伴奏" in item.text:
break
_, song_id = item.get_attribute("href").split("?id=")
song_set.add("https://music.163.com/#/song?id=" + song_id)
song_name = item.find_element_by_xpath("./b").get_attribute("title")
print(song_name, end=" ")
return song_set


def get_song_url_from_private_detail(url, msg_num):
album_set = set()
song_set = set()
# 歌曲页面保存
driver.get(url)
driver.switch_to.frame("contentFrame")
url_list = driver.find_elements_by_xpath('//div[contains(@class,"itemleft")]')[-msg_num:]
for item in url_list:
try:
i = item.find_element_by_xpath(
'.//a[contains(@href,"album?id") or contains(@href, "song?id")]').get_attribute("href")
_, _id = i.split("?id=")
if "song?id" in i:
song_set.add("https://music.163.com/#/song?id=" + _id)
else: # "album?id"
album_set.add("https://music.163.com/#/album?id=" + _id)
except NoSuchElementException:
pass

for album_url in album_set:
song_set.update(get_song_url_from_album_set(album_url))
return song_set


def save_song(url):
print("[{}] start".format(url), end=" ")
driver.get(url)
driver.switch_to.frame("contentFrame")
title = driver.find_element_by_xpath('//div[contains(@class, "tit")]').text
for word in shielding_words:
if word in title:
print("{} 因 {} 已忽略".format(title, word))
return
driver.find_element_by_xpath('//*[contains(text(), "收藏")]').click()
driver.find_element_by_xpath('//*[contains(text(), "{}")]'.format(song_dir)).click()
print(title)


driver = init_driver()
if not os.path.exists("tmp.txt"):
# 获取私信用户列表
private_detail_url_dict = get_private_detail()
print("private_detail_url_set len={}".format(len(private_detail_url_dict)))
# 获取歌曲id
song_url_set = set()
for private_detail_url, msg_num in private_detail_url_dict.items():
song_url_set.update(get_song_url_from_private_detail(private_detail_url, msg_num))
print("song_url_set len={}".format(len(song_url_set)))
with open("tmp.txt", "w", encoding="utf8") as f:
f.write("\n".join(song_url_set))
else:
with open("tmp.txt", "r", encoding="utf8") as f:
data = f.read()
song_url_set = data.split("\n") if data else []
# 保存歌曲
for song_url in song_url_set:
save_song(song_url)
os.remove("tmp.txt")
driver.close()

项目3 min read

写在前面的话

  • 通过 米筐 网站提供的量化平台进行的测试。
  • 做这个研究的初衷是笔者有一点意象投资股市,有几个不同的投资方案(策略),我想先拿 19 年的数据跑一跑我的方案(策略),如果亏钱了,就不玩股市,老老实实玩基金了。


关于如何读这个图

  • 首先能看到时间,我的数据是从2019 年 01 -- 2020 年 01,
  • 红色的线代表账户市值(把股票持仓也换算成钱)的变化。
  • 蓝色的线代表基准(沪深三百)的市值变化。
  • 其次最重要的指标,回测收益,表示的是我本金到时间结束,涨了多少,比如我有 5w,那么这个图表示我年末结束涨到了 5*(0.09+1) = 5.45(万元)
  • 然后平均一年能有 13.5%的收益。
  • 然后看基准收益, 这个我一般设置的是沪深三百,也就是大盘,这个图表示 19 年大盘涨了30+%,而我的策略只有9+%,其实是没有跑赢大盘的 (还不如买基金)。
  • 然后需要注意下一个指标"MaxDrawdown(最大回测)": 最大回撤是最常用的指标,描述了投资者可能面临的最大亏损。最大回撤的数值越小越好,越大说明风险越大。
  • Sharpe(夏普率):夏普比率若为正值,代表基金报酬率大于风险;若为负值,代表基金风险大于报酬率。因此,夏普比率越高,投资组合越佳。

好了,下面开始正文。


检测低于 x 元的 股票 ,购买并持有 3 个礼拜后抛出

这个是一个普遍的购买思路,觉得底价股票一定会长,我短期持有然后等他高于某个价格就出售,我模拟一下这个情况,下面是代码:

#!/usr/bin/python3    
# encoding: utf-8
# @Time : 2020/11/18 14:32
# @author : zza
# @Email : 740713651@qq.com

import warnings

import pandas
import rqdatac

__config__ = {
"base": {
"start_date": "20190101",
"end_date": "20191231",
'frequency': '1d',
"accounts": {
"stock": 5000,
}
},
"mod": {
"sys_progress": {
"enabled": True,
"show": True
}, "sys_accounts": {
"enabled": True,
"dividend_reinvestment": True,
},
"sys_analyser": {
"enabled": True,
"plot": True,
'benchmark': '000300.XSHG',
},
}
}

def init(context):
context.lowest = 5
context.highest = 5.9
context.day_count = 0
context.holding_days = 10

def get_ticker(context):
all_ins = rqdatac.all_instruments("Stock").order_book_id.to_list()
price_df = rqdatac.get_price(all_ins, context.now, context.now, expect_df=True, fields=['close'])
price_df = price_df.reset_index(drop=False)
price_df = price_df[(context.lowest <= price_df.close) & (price_df.close <= context.highest)]

start_date = rqdatac.get_previous_trading_date(context.now, 2)
price_df_2 = rqdatac.get_price(price_df.order_book_id.to_list(), start_date, context.now, expect_df=True,
fields=['close']).reset_index(drop=False)
order_ticker = []
for order_book_id, data_df in price_df_2.groupby(price_df_2.order_book_id):
if (data_df.close.shift(fill_value=0) < data_df.close).all():
order_ticker.append(order_book_id)

return order_ticker

def before_trading(context):
context.day_count += 1
if (context.day_count - 1) % context.holding_days == 0:
context.order_ticker = get_ticker(context)

def handle_bar(context, bar_dict):
if (context.day_count - 1) % context.holding_days == 0:
with warnings.catch_warnings():
for item in context.order_ticker:
o = order_target_value(item, 1000)
if o:
print(o)

if (context.day_count - 1) % context.holding_days == (context.holding_days-1):
for item in context.portfolio.positions.keys():
order_target_value(item, 0)

def after_trading(context):
print("total_value {}".format(context.portfolio.total_value))
if (context.day_count - 1) % 15 == 0:
items = []
for k, v in context.portfolio.positions.items():
items.append({"order_book_id": k, "quantity": v.quantity})
print(pandas.DataFrame(items))

if __name__ == '__main__':
import rqalpha

rqalpha.run_func(init=init,
before_trading=before_trading,
handle_bar=handle_bar,
after_trading=after_trading,
config=__config__)
  • 持有 3 个交易日 image.png

  • 持有 10 个交易日

  • 持有 15 个交易日

结论:底价股票确实会随着大盘的涨而有一定的波动,但最终都是会赔钱的,且三个交易的日的时候,直接赔钱了。这个图可以给广大读者的保佑侥幸心里的朋友看看,一年忙到头,不如买国运(沪深三百)。


根据公众号 闷声发小财

这个是根据我一个朋友跑的算法跑出来的可能会涨的概率。笔者拿到了他一年的数据(没用爬虫),然后试试能不能涨,这么跑的原因是,我当初确实想按着他的这个公众号来买,不过既然有回放数据测试,我当然先测试一下,本金本来就不多,挥霍不了,不敢随便教学费。

策略如下:

#!/usr/bin/python3    
# encoding: utf-8
# @Time : 2020/11/22 16:33
# @author : zza
# @Email : 740713651@qq.com
# @File : 闷声发小财。py

import warnings

import numpy
import pandas
import rqdatac
from rqalpha.apis import *

__config__ = {
"base": {
"start_date": "20190101",
"end_date": "20191231",
'frequency': '1d',
"accounts": {
"stock": 5000,
},
"data_bundle_path": r"E:\data\bundle",
},
"extra": {
"log_level": "debug",
},
"mod": {
"sys_progress": {
"enabled": True,
"show": True
}, "sys_accounts": {
"enabled": True,
"dividend_reinvestment": True,
},
"sys_analyser": {
"enabled": True,
"plot": True,
'benchmark': '000300.XSHG',
},
},
}

def init(context):
context.df = pandas.read_csv("发小财。csv", dtype={"股票代码": numpy.str})
context.df['order_book_id'] = rqdatac.id_convert(context.df["股票代码"].to_list())
context.used_df = context.df[context.df["预测结果"] > 0.7]
context.sell_multiple = 1.08
context.holding_days = 1
context.buy_queen = []
context.sell_map = {}

def before_trading(context):
print("当日购买:{}".format(context.buy_queen))
print("当日持仓:{}".format(list(context.sell_map.keys())))

def handle_bar(context, bar_dict):
while context.buy_queen:
order_book_id = context.buy_queen.pop()
if context.stock_account.cash > 1000:
o = order_value(order_book_id, 1000)
if o:
trading_dt = get_next_trading_date(context.now, context.holding_days).to_pydatetime()
context.sell_map[order_book_id] = trading_dt
print("[{}] 购买成功".format(order_book_id))
else:
print("[{}] 资金不足,无法购买".format(order_book_id))
for order_id, dt in context.sell_map.copy().items():
a = dt.date() <= context.now.date()
b = bar_dict[order_id].close > get_position(order_id).avg_price # * context.sell_multiple
if a and b:
o = order_percent(order_id, -1)
if o:
del context.sell_map[order_id]
print("[{}] 卖出成功".format(order_id))

def dt_to_int(dt):
return dt.year * 10000 + dt.month * 100 + dt.day

def after_trading(context):
df = context.used_df[dt_to_int(context.now) == context.used_df.trading_dt]
if df.empty:
return
else:
for _, item in df.iterrows():
context.buy_queen.append(item["order_book_id"])

if __name__ == '__main__':
import rqalpha

rqalpha.run_func(init=init,
before_trading=before_trading,
handle_bar=handle_bar,
after_trading=after_trading,
config=__config__)

  • 持有一天就卖出 image.png

  • 持有 3 天并价格高于持仓价则卖出

image.png

结论:还好跑了回放测试,不然得教好多学费。


检测行业因子普遍 (90%) 一周内上涨的低价股票

这个也是我在上班的时候偶尔能想到的,我觉得可能也代表一分部分人的想法,**某个行业市场涨幅的时候,必定是全行业性质的,且会持续一段时间。**于是我就写了这个,根据股票行业暴露度(属于哪个行业),分类行业,如果都涨了,就买进,如果都跌了,就卖出。

  • 检测行业因子普遍 (90%) 一周内上涨的低价股票 购买并持有
  • 在行业下跌在 30% 左右卖出

代码如下:

#!/usr/bin/python3    
# encoding: utf-8
# @Time : 2020/11/22 16:33
# @author : zza
# @Email : 740713651@qq.com
# @File : 闷声发小财。py
from collections import defaultdict
from pprint import pprint

import numpy
import pandas
from rqalpha.apis import *

__config__ = {
"base": {
"start_date": "20190101",
"end_date": "20191231",
'frequency': '1d',
"accounts": {
"stock": 50000,
},
"data_bundle_path": r"E:\data\bundle",
},
"extra": {
"log_level": "debug",
},
"mod": {
"sys_progress": {
"enabled": True,
"show": True
}, "sys_accounts": {
"enabled": True,
"dividend_reinvestment": True,
},
"sys_analyser": {
"enabled": True,
"plot": True,
'benchmark': '000300.XSHG',
},
},
}

def get_holding_ticker():
# 请去米筐购买或申请试用 RQData : www.ricequant.com
rqdatac.init()
df = rqdatac.get_factor_exposure(rqdatac.all_instruments("Stock").order_book_id, "20190101", "20191231")
df.reset_index().sort_values("date").to_csv("factor_exposure.csv", index=False)

def init(context):
df = pandas.read_csv("factor_exposure.csv")
all_order_book_id = all_instruments().order_book_id.to_list()
context.df = df[df["order_book_id"].isin(all_order_book_id)]
context.buy_map = defaultdict(list)
context.holding_map = defaultdict(list)
context.sell_factors = []
context.day_count = 0
context.sell_multiple = 1.2

def before_trading(context):
pprint("当日购买行业:{}".format(context.buy_map))
print("当日持仓:{}".format([i.order_book_id for i in get_positions()]))
print("年化:{}".format(context.portfolio.total_returns))

def handle_bar(context, bar_dict):
# buy
for factors, order_book_ids in context.buy_map.items():
if context.stock_account.cash < 2000:
break
for order_book_id in order_book_ids.copy():
o = order_value(order_book_id, 2000)
if o:
context.holding_map[factors].append(order_book_id)
order_book_ids.remove(order_book_id)
# sell
for factors in context.sell_factors:
for order_book_id in context.holding_map[factors].copy():
# a = bar_dict[order_book_id].close > get_position(order_book_id).avg_price * context.sell_multiple
a = True
if a:
o = order_percent(order_book_id, -1)
if o:
context.holding_map[factors].remove(order_book_id)

def dt_to_str(dt):
return "{}-{}-{}".format(dt.year, dt.month, dt.day)

def is_rise(order_book_id):
a, b = history_bars(order_book_id, 2, frequency="1d", fields='close')
return b > a

def after_trading(context):
context.day_count += 1
if context.day_count % 5 != 1:
return
df = context.df[context.now.date().isoformat() == context.df.date].copy()
df.loc[:, 'rose'] = df["order_book_id"].apply(is_rise)
df.loc[:, 'close'] = df["order_book_id"].apply(lambda x: history_bars(x, 1, frequency="1d", fields='close'))
context.sell_factors = []
context.buy_map = defaultdict(list)
for factors in df.columns[13:-2]:
_df = df[df[factors] == 1]
if _df['rose'].sum() / _df['order_book_id'].size > 0.9:
context.buy_map[factors] = _df[(_df["close"] > 5) & (_df["close"] < 6)].sort_values('close')[
'order_book_id'].to_list()[:2]
elif _df['rose'].sum() / _df['order_book_id'].size < 0.3:
context.sell_factors.append(factors)
del df

if __name__ == '__main__':
import rqalpha

rqalpha.run_func(init=init,
before_trading=before_trading,
handle_bar=handle_bar,
after_trading=after_trading,
config=__config__)

  • 各行业两只股票

emmm,不说了,都是磊。


普通 macd 均线策略

  • 这个是米筐官网的默认策略
from collections import defaultdict    
from pprint import pprint

import numpy
import pandas
from rqalpha.apis import *

__config__ = {
"base": {
"start_date": "20190101",
"end_date": "20191231",
'frequency': '1d',
"accounts": {
"stock": 50000,
},
"data_bundle_path": r"E:\data\bundle",
},
"extra": {
"log_level": "debug",
},
"mod": {
"sys_progress": {
"enabled": True,
"show": True
}, "sys_accounts": {
"enabled": True,
"dividend_reinvestment": True,
},
"sys_analyser": {
"enabled": True,
"plot": True,
'benchmark': '000300.XSHG',
},
},
}

import talib

def init(context):
context.s1 = '000300.XSHG'

context.SHORTPERIOD = 12
context.LONGPERIOD = 26
context.SMOOTHPERIOD = 9
context.OBSERVATION = 100

subscribe(context.s1)

def before_trading(context):
pass

def handle_bar(context, bar_dict):
closes = history_bars(context.s1, context.OBSERVATION, '1d', 'close')

diff, signal, _ = talib.MACD(closes)

if diff[-1] > signal[-1] and diff[-2] < signal[-2]:
order_target_percent(context.s1, 1)

if diff[-1] < signal[-1] and diff[-2] > signal[-2]:
order_target_percent(context.s1, 0)

def after_trading(context):
pass

if __name__ == '__main__':
import rqalpha

rqalpha.run_func(init=init,
before_trading=before_trading,
handle_bar=handle_bar,
after_trading=after_trading,
config=__config__)

image.png

这个是根据大佬的代码抄的。其实数据还行,虽然没跑赢沪深三百,但是确实赚钱了,贴一下 源地址, 大佬说有一部分巧合在里面。不多说了。

我结合下只买五块钱的情况

#!/usr/bin/python3    
# encoding: utf-8
# @Time : 2020/11/22 16:33
# @author : zza
# @Email : 740713651@qq.com
# @File : 闷声发小财。py
import numpy
import pandas
from rqalpha.apis import *

__config__ = {
"base": {
"start_date": "20190101",
"end_date": "20191231",
'frequency': '1d',
"accounts": {
"stock": 50000,
},
"data_bundle_path": r"E:\data\bundle",
},
"extra": {
"log_level": "debug",
},
"mod": {
"sys_progress": {
"enabled": True,
"show": True
}, "sys_accounts": {
"enabled": True,
"dividend_reinvestment": True,
},
"sys_analyser": {
"enabled": True,
"plot": True,
'benchmark': '000300.XSHG',
},
},
}

import talib

def init(context):
context.s1 = '000300.XSHG'

context.SHORTPERIOD = 12
context.LONGPERIOD = 26
context.SMOOTHPERIOD = 9
context.OBSERVATION = 100

subscribe(context.s1)
context.lowest = 5
context.highest = 6

context.order_ticker = []

def _history_bar_for_all_ins(x):
bar = history_bars(x, 1, "1d", fields=['close'])
if not bar:
return numpy.nan
if bar[0]:
return bar[0][0]
return numpy.nan

def get_ticker(context):
all_ins = all_instruments("Stock").order_book_id.to_list()
df = pandas.DataFrame(all_ins, columns=['order_book_id'])
df['close'] = df.order_book_id.apply(_history_bar_for_all_ins)
price_df = df[(context.lowest <= df.close) & (df.close <= context.highest)]

order_ticker = []
for order_book_id in price_df.order_book_id.to_list():
c = pandas.Series(i[0] for i in history_bars(order_book_id, 3, "1d", fields=['close']))
if (c.shift(fill_value=0) < c).all():
order_ticker.append(order_book_id)
return order_ticker

def before_trading(context):
print("年化:{}".format(context.portfolio.total_returns))

def handle_bar(context, bar_dict):
closes = history_bars(context.s1, context.OBSERVATION, '1d', 'close')

diff, signal, _ = talib.MACD(closes)

if diff[-1] > signal[-1] and diff[-2] < signal[-2] :
if context.stock_account.cash > 1000:
context.order_ticker = get_ticker(context)
for item in context.order_ticker:
o = order_target_value(item, 2000)
if o:
print(o)
print("context.order_ticker", context.order_ticker)

if diff[-1] < signal[-1] and diff[-2] > signal[-2]:
for item in context.portfolio.positions.keys():
order_target_percent(item, 0)

def after_trading(context):
context.order_ticker = []
if __name__ == '__main__':
import rqalpha

rqalpha.run_func(init=init,
before_trading=before_trading,
handle_bar=handle_bar,
after_trading=after_trading,
config=__config__)

image.png

还不如直接买华夏国运沪深三百。


朋友想法 A

试试朋友的想法

假如设置一个均值(一直在变),低于 0.8 倍均值(mean_coe),并且连续 fall_days 天,跌幅加起来达到 fall_x%,买入 高于买入价格,并且连续 rise_days 天,涨幅加起来达到 rise_x%,卖出

我开始简单的设置了几个参数,发现结果不是很理想,但是没亏,我就想可能是参数的问题,然后列出了个集合,想着再跑跑,代码如下:

#!/usr/bin/python3    
# encoding: utf-8
# @Time : 2020/11/22 16:33
# @author : zza
# @Email : 740713651@qq.com
import itertools

import numpy
import pandas
from rqalpha.apis import *

__config__ = {
"base": {
"start_date": "20190101",
"end_date": "20191231",
'frequency': '1d',
"accounts": {
"stock": 50000,
},
"data_bundle_path": r"E:\data\bundle",
},
"extra": {
"log_level": "debug",
},
"mod": {
"sys_progress": {
"enabled": True,
"show": True
}, "sys_accounts": {
"enabled": True,
"dividend_reinvestment": True,
},
"sys_analyser": {
"enabled": True,
"plot": True,
'benchmark': '000300.XSHG',
},
},
}

def init(context):
context.mean_coe = 0.8
context.mean_days = 244
context.rise_days = context.config.extra.rise_days
context.fall_days = context.config.extra.fall_days
context.rise_x = context.config.extra.rise_x
context.fall_x = context.config.extra.fall_x
context.order_ticker = [
"002111.XSHE", # 威海广泰
"002673.XSHE", # 西部证券
"601375.XSHG", # 中原证券
]

df = all_instruments('CS')
df = df[df.listed_date.apply(lambda x: x.year in [2015, 2016, 2017])]
context.order_ticker = context.order_ticker + df.order_book_id.to_list()

def before_trading(context):
print("年化:{}".format(context.portfolio.total_returns))
# 价格均值
context.mean_close = {}
for order_book_id in context.order_ticker:
bars = pandas.Series([i[0] for i in history_bars(order_book_id, context.mean_days, "1d", fields=['close'])])
context.mean_close[order_book_id] = bars.mean()

def handle_bar(context, bar_dict):
# for buy
if context.stock_account.cash >= 5000:

for order_book_id in context.order_ticker:
bar_close = bar_dict[order_book_id].close
if bar_close < (context.mean_close[order_book_id] * context.mean_coe):
_str = ""
bars = pandas.Series(
[i[0] for i in history_bars(order_book_id, context.fall_days, "1d", fields=['close'])])
_str += "\n" + f"[{order_book_id}] 价格 ({bar_close}) 低于一年平均价{context.mean_close[order_book_id]}"
if not (bars.shift(-1, fill_value=0) < bars).all():
# 判断连跌
continue
_str += "\n" + f"[{order_book_id}] 连跌 价格表现为{list(bars)}"
if not (bars[0] * context.fall_x > bar_close):
# 判断跌幅
# 非低于 0.8 倍均值(mean_coe)
continue
_str += "\n" + f"[{order_book_id}] 跌幅超过{context.fall_x}, {bars[0]} - {bar_close}"
o = order_value(order_book_id, 5000)
if o:
_str += "\n" + f"[{order_book_id}] 买入".format(order_book_id)
print(_str)
# for sell
for order_book_id in context.order_ticker:
position = context.portfolio.positions[order_book_id]
bar_close = bar_dict[order_book_id].close
if position is None or position.quantity <= 0:
continue
if position.avg_price < bar_close:
# 低于买入价格 不做操作
continue
_str = ""
bars = pandas.Series([i[0] for i in history_bars(order_book_id, context.fall_days, "1d", fields=['close'])])
if not (bars.shift(1, fill_value=0) < bars).all():
# 并且连续 rise_days 天
continue
_str += "\n" + f"[{order_book_id}] 连涨 价格表现为{list(bars)}"
if not (bars[0] * context.rise_x < bar_close):
# 涨幅加起来达到 rise_x%
continue
_str += "\n" + f"[{order_book_id}] 涨幅超过{context.rise_x}, {bars[0]} - {bar_close}"
o = order_target_percent(order_book_id, 0)
if o:
_str += "\n" + f"[{order_book_id}] 出售".format(order_book_id)
print(_str)

def after_trading(context):
pass

import rqalpha

rise_days = range(3, 10)
fall_days = range(3, 10)
rise_x = numpy.arange(1.06, 1.12, 0.01)
fall_x = numpy.arange(0.9, 0.94, 0.01)
config_map = itertools.product(rise_days, fall_days, rise_x, fall_x)

df = pandas.DataFrame(config_map, columns=["rise_days", "fall_days", "rise_x", "fall_x", ])
result_list = []
for _, item in df.iterrows():
__config__['extra']['rise_days'] = int(item.rise_days)
__config__['extra']['fall_days'] = int(item.fall_days)
__config__['extra']['rise_x'] = item.rise_x
__config__['extra']['fall_x'] = item.fall_x
result = rqalpha.run_func(init=init,
before_trading=before_trading,
handle_bar=handle_bar,
after_trading=after_trading,
config=__config__)
total_returns = result['sys_analyser']['summary']['total_value']
result_list.append(result)

其实跑出了几个超过大盘收益的结果。但是因为我的代码水平问题,报错了,结果也没保存,近期我会去吧这个跑完(1300 多的组合方式),有结果了我会贴上最优解的。


总结

我还是不在股票市场里当韭菜了 溜了溜了

项目10 min read