Skip to main content

5 posts tagged with "项目"

View All Tags

  • 这是网盘爬取的部分记录,很多坑,分享一下避免大家采坑。
  • 后续发现网盘有网页版本,在安装能 root 的 Android 模拟器后,放弃通过 app 爬。

Fiddler

教程

  • 电脑管家下的 Fiddler 4.0
    • Fiddler Anywhere 只找到苹果的破解版
  • 允许远程连接
    • 更改 fiddler 端口后要重启软件
  • 手机设置
    • 证书安装
    • 卸载方式
      • 搜索 加密与凭据
      • 用户凭据 找到自己命名的凭据 (fiddler) 点击删除

打开网盘后 无连接

走 fiddler 后,app 打开显示无网络。用 adb 查看日志,找到如下错误:

06-06 19:09:03.624 28223 28223 W System.err: javax.net.ssl.SSLHandshakeException: java.security.cert.CertPathValidatorException: Trust anchor for certification path not found.

是因为,fiddler 装的证书不是根证书,app 不认可。 改为弄根证书需要手机 root,懒得弄了。

装安卓虚拟机

考虑使用安卓模拟器

  • 夜神模拟器
    • hyper-v 冲突
  • bluestacks
  • 天天模拟器
    • 启动不了
    • 安装时蓝屏

卸载某个模拟器后 dll 乱了

无效的 wechatwin.dll 文件 errcode:126, 点击“确定”下载最新版本

  • 微信打不开 怀疑是某个模拟器卸载的时候 把系统的 dll 一起
  • 修复方法参考

按 Win + S 键搜索栏输入 CMD,以管理员身份打开 “命令提示符” 输入命令:

sfc /SCANNOW

等待扫描结束:

Dism /Online /Cleanup-Image /ScanHealth

等进度条走完,再次打开微信,看是否能够正常打开。如果能正常打开,重启电脑再看是否能打开微信,如果重启之后还能正常打开微信等其它软件,说明此问题就修复了。

Hyper-V + Visual Studio 跑 Android 模拟器

参考 用 Xamarin 的模拟器。

安装 Visual Studio 2022 一样可以

  • 打开 SDK 管理器后 提示:
    • Loading SDK component information failed. Please retry
    • 加载 SDK 组件信息失败 请重试
    • must not be an empty string forPath

通过点击:工具 - 选项 - 搜索 Android - Xamarin Android 设置 点击确定后就能找找到位置了。

下载 SDK 需要全局翻墙。

  • 爬完后注意存档位置 方便后续删除
    • C:\Users\xxx\AppData\Local\Xamarin\Mono for Android\Archives

如此,终于有一个能用的,和 hyper-v 不冲突的 Android 模拟器了。

到这里后,发现有网页版本,满足需求,放弃了 app 爬取。


询问大佬们的解决方案

项目3 min read

lz18届毕业生一枚,工作有段时间了,计科专业。 上班的时候有很多想法,但没有时间实现。所以组个纯网络的兴趣小组,想把想法变现。

社团性质:

  • 纯网络兴趣小组(也就是没有工作室啥的,靠自己)
  • 纯公益社团(项目公益,不向社员收费同时也不赚钱,有能力可以靠自己接case)
  • python语言向(因为lz只会py编程语言,所以只能给这个支持)

小组成员事物内容:

  • 学习编程(偏python,因为lz只能给这方面的指导。。。)
  • 实现各种各样的idea(流程一般是:提出想法,大家讨论可行性,组队,实现想法)

小组需要:

  • 对编程有兴趣的人(估摸着应该靠自学,编程没兴趣学不下去的)
  • 有一定的经济基础(自己的电脑得跑得起编辑器把,租个服务器或买个树莓派啥的,但不会有小组收钱的行为)
  • 有时间(课程不紧,闲暇时间比较多)
  • 能自己写总结(分享型小组嘛)

加分项

  • 有一定编程基础
  • 计算机专业大二大三
  • 有一定的英语能力(编程,英语还是很重要的)

相信有些人觉得在学校里学的东西出来难找工作,我觉得做些开源项目会对以后找ITl类工作or实习有帮助,不过也不绝对,还是得靠自己努力。

Q群号: 870632533 欢迎转给身边人

项目2 min read

项目介绍

通过Chrome与selenium 将网易云私信中歌曲放到一个歌单中,方便实时收听新歌。 我们可以直接用已登录的Chrome(保存有网易云网站登录态),模拟用户操作,收藏新歌。

image.png


关键代码

  • 初始化selenium
def init_driver():    
executable_path = "chromedriver"
if not os.path.exists("chromedriver.exe"): # 无驱则需要下载
# 本地项目 复制驱动
_path = os.path.abspath(__file__)
if not "crawler_set" in _path:
root_path = _path[:_path.index("crawler_set") + len("crawler_set")]
executable_path = os.path.join(root_path, "driver", "chromedriver.exe")
else: # 单个文件 下载驱动
url = "https://raw.githubusercontent.com/AngusWG/crawler_set/master/driver/chromedriver.exe"
proxies = {
"http": "socks5://127.0.0.1:1080",
"https": "socks5://127.0.0.1:1080"
}
r = requests.get(url, proxies=proxies)
with open("chromedriver.exe", "wb") as code:
content_size = int(r.headers['content-length']) # 内容体总大小
for data in tqdm(iterable=r.iter_content(1024), total=content_size, unit="k", desc="下载驱动"):
code.write(data)

user_cookies = "".join([os.path.expanduser('~'), r"\AppData\Local\Google\Chrome\User Data"])

option = webdriver.ChromeOptions()
option.add_argument("--user-data-dir={}".format(user_cookies)) # 设置成用户自己的数据目录

try:
driver = webdriver.Chrome(executable_path, options=option)
driver.implicitly_wait(5)
return driver
except WebDriverException:
print("请先关掉所有的Chrome")
exit(-2)

  • 收藏歌曲
def save_song(url):    
print("[{}] start".format(url), end=" ")
driver.get(url)
driver.switch_to.frame("contentFrame")
title = driver.find_element_by_xpath('//div[contains(@class, "tit")]').text
for word in shielding_words:
if word in title:
print("{} 因 {} 已忽略".format(title, word))
return
driver.find_element_by_xpath('//*[contains(text(), "收藏")]').click()
driver.find_element_by_xpath('//*[contains(text(), "{}")]'.format(song_dir)).click()
print(title)


全部代码

#!/usr/bin/python3    
# encoding: utf-8
# @Time : 2019/12/19 9:36
# @author : zza
# @Email : 740713651@qq.com
# @File : 将网易云的私信音乐整理.py
"""
该脚本会保存每天新私信里最后几首歌曲到tmp_save_dir
1.在Chrome上登录自己的网易云帐号t
2.创建tmp_save_dir
然后运行脚本
"""
import os

import requests
from selenium import webdriver
from selenium.common.exceptions import WebDriverException, NoSuchElementException
from tqdm import tqdm

song_dir = "tmp_save_dir"
# 屏蔽关键词
shielding_words = ['伴奏']


def init_driver():
executable_path = "chromedriver"
if not os.path.exists("chromedriver.exe"): # 无驱则需要下载
# 本地项目 复制驱动
_path = os.path.abspath(__file__)
if not "crawler_set" in _path:
root_path = _path[:_path.index("crawler_set") + len("crawler_set")]
executable_path = os.path.join(root_path, "driver", "chromedriver.exe")
else: # 单个文件 下载驱动
url = "https://raw.githubusercontent.com/AngusWG/crawler_set/master/driver/chromedriver.exe"
proxies = {
"http": "socks5://127.0.0.1:1080",
"https": "socks5://127.0.0.1:1080"
}
r = requests.get(url, proxies=proxies)
with open("chromedriver.exe", "wb") as code:
content_size = int(r.headers['content-length']) # 内容体总大小
for data in tqdm(iterable=r.iter_content(1024), total=content_size, unit="k", desc="下载驱动"):
code.write(data)

user_cookies = "".join([os.path.expanduser('~'), r"\AppData\Local\Google\Chrome\User Data"])

option = webdriver.ChromeOptions()
option.add_argument("--user-data-dir={}".format(user_cookies)) # 设置成用户自己的数据目录

try:
driver = webdriver.Chrome(executable_path, options=option)
driver.implicitly_wait(5)
return driver
except WebDriverException:
print("请先关掉所有的Chrome")
exit(-2)


def get_private_detail():
# 点击私信后
driver.get("https://music.163.com/#/msg/m/private")
driver.switch_to.frame("contentFrame")
new_msg_items = driver.find_elements_by_xpath('//i[@class="u-bub"]/b[@class="f-alpha"]/..//parent::*//a')
private_detail_url_dict = dict()
for i in new_msg_items:
_, singer_id = i.get_attribute("href").split("?")
uri = "https://music.163.com/#/msg/m/private_detail?" + singer_id
msg_num = int(i.find_element_by_xpath("../i/em").text)
private_detail_url_dict[uri] = msg_num
return private_detail_url_dict


def get_song_url_from_album_set(url):
song_set = set()
# 歌曲页面保存
driver.get(url)
driver.switch_to.frame("contentFrame")
url_list = driver.find_elements_by_xpath('//a[contains(@href, "/song?id")]')
for item in url_list:
if "伴奏" in item.text:
break
_, song_id = item.get_attribute("href").split("?id=")
song_set.add("https://music.163.com/#/song?id=" + song_id)
song_name = item.find_element_by_xpath("./b").get_attribute("title")
print(song_name, end=" ")
return song_set


def get_song_url_from_private_detail(url, msg_num):
album_set = set()
song_set = set()
# 歌曲页面保存
driver.get(url)
driver.switch_to.frame("contentFrame")
url_list = driver.find_elements_by_xpath('//div[contains(@class,"itemleft")]')[-msg_num:]
for item in url_list:
try:
i = item.find_element_by_xpath(
'.//a[contains(@href,"album?id") or contains(@href, "song?id")]').get_attribute("href")
_, _id = i.split("?id=")
if "song?id" in i:
song_set.add("https://music.163.com/#/song?id=" + _id)
else: # "album?id"
album_set.add("https://music.163.com/#/album?id=" + _id)
except NoSuchElementException:
pass

for album_url in album_set:
song_set.update(get_song_url_from_album_set(album_url))
return song_set


def save_song(url):
print("[{}] start".format(url), end=" ")
driver.get(url)
driver.switch_to.frame("contentFrame")
title = driver.find_element_by_xpath('//div[contains(@class, "tit")]').text
for word in shielding_words:
if word in title:
print("{} 因 {} 已忽略".format(title, word))
return
driver.find_element_by_xpath('//*[contains(text(), "收藏")]').click()
driver.find_element_by_xpath('//*[contains(text(), "{}")]'.format(song_dir)).click()
print(title)


driver = init_driver()
if not os.path.exists("tmp.txt"):
# 获取私信用户列表
private_detail_url_dict = get_private_detail()
print("private_detail_url_set len={}".format(len(private_detail_url_dict)))
# 获取歌曲id
song_url_set = set()
for private_detail_url, msg_num in private_detail_url_dict.items():
song_url_set.update(get_song_url_from_private_detail(private_detail_url, msg_num))
print("song_url_set len={}".format(len(song_url_set)))
with open("tmp.txt", "w", encoding="utf8") as f:
f.write("\n".join(song_url_set))
else:
with open("tmp.txt", "r", encoding="utf8") as f:
data = f.read()
song_url_set = data.split("\n") if data else []
# 保存歌曲
for song_url in song_url_set:
save_song(song_url)
os.remove("tmp.txt")
driver.close()

项目3 min read

写在前面的话

  • 通过 米筐 网站提供的量化平台进行的测试。
  • 做这个研究的初衷是笔者有一点意象投资股市,有几个不同的投资方案(策略),我想先拿 19 年的数据跑一跑我的方案(策略),如果亏钱了,就不玩股市,老老实实玩基金了。


关于如何读这个图

  • 首先能看到时间,我的数据是从2019 年 01 -- 2020 年 01,
  • 红色的线代表账户市值(把股票持仓也换算成钱)的变化。
  • 蓝色的线代表基准(沪深三百)的市值变化。
  • 其次最重要的指标,回测收益,表示的是我本金到时间结束,涨了多少,比如我有 5w,那么这个图表示我年末结束涨到了 5*(0.09+1) = 5.45(万元)
  • 然后平均一年能有 13.5%的收益。
  • 然后看基准收益, 这个我一般设置的是沪深三百,也就是大盘,这个图表示 19 年大盘涨了30+%,而我的策略只有9+%,其实是没有跑赢大盘的 (还不如买基金)。
  • 然后需要注意下一个指标"MaxDrawdown(最大回测)": 最大回撤是最常用的指标,描述了投资者可能面临的最大亏损。最大回撤的数值越小越好,越大说明风险越大。
  • Sharpe(夏普率):夏普比率若为正值,代表基金报酬率大于风险;若为负值,代表基金风险大于报酬率。因此,夏普比率越高,投资组合越佳。

好了,下面开始正文。


检测低于 x 元的 股票 ,购买并持有 3 个礼拜后抛出

这个是一个普遍的购买思路,觉得底价股票一定会长,我短期持有然后等他高于某个价格就出售,我模拟一下这个情况,下面是代码:

#!/usr/bin/python3    
# encoding: utf-8
# @Time : 2020/11/18 14:32
# @author : zza
# @Email : 740713651@qq.com

import warnings

import pandas
import rqdatac

__config__ = {
"base": {
"start_date": "20190101",
"end_date": "20191231",
'frequency': '1d',
"accounts": {
"stock": 5000,
}
},
"mod": {
"sys_progress": {
"enabled": True,
"show": True
}, "sys_accounts": {
"enabled": True,
"dividend_reinvestment": True,
},
"sys_analyser": {
"enabled": True,
"plot": True,
'benchmark': '000300.XSHG',
},
}
}

def init(context):
context.lowest = 5
context.highest = 5.9
context.day_count = 0
context.holding_days = 10

def get_ticker(context):
all_ins = rqdatac.all_instruments("Stock").order_book_id.to_list()
price_df = rqdatac.get_price(all_ins, context.now, context.now, expect_df=True, fields=['close'])
price_df = price_df.reset_index(drop=False)
price_df = price_df[(context.lowest <= price_df.close) & (price_df.close <= context.highest)]

start_date = rqdatac.get_previous_trading_date(context.now, 2)
price_df_2 = rqdatac.get_price(price_df.order_book_id.to_list(), start_date, context.now, expect_df=True,
fields=['close']).reset_index(drop=False)
order_ticker = []
for order_book_id, data_df in price_df_2.groupby(price_df_2.order_book_id):
if (data_df.close.shift(fill_value=0) < data_df.close).all():
order_ticker.append(order_book_id)

return order_ticker

def before_trading(context):
context.day_count += 1
if (context.day_count - 1) % context.holding_days == 0:
context.order_ticker = get_ticker(context)

def handle_bar(context, bar_dict):
if (context.day_count - 1) % context.holding_days == 0:
with warnings.catch_warnings():
for item in context.order_ticker:
o = order_target_value(item, 1000)
if o:
print(o)

if (context.day_count - 1) % context.holding_days == (context.holding_days-1):
for item in context.portfolio.positions.keys():
order_target_value(item, 0)

def after_trading(context):
print("total_value {}".format(context.portfolio.total_value))
if (context.day_count - 1) % 15 == 0:
items = []
for k, v in context.portfolio.positions.items():
items.append({"order_book_id": k, "quantity": v.quantity})
print(pandas.DataFrame(items))

if __name__ == '__main__':
import rqalpha

rqalpha.run_func(init=init,
before_trading=before_trading,
handle_bar=handle_bar,
after_trading=after_trading,
config=__config__)
  • 持有 3 个交易日 image.png

  • 持有 10 个交易日

  • 持有 15 个交易日

结论:底价股票确实会随着大盘的涨而有一定的波动,但最终都是会赔钱的,且三个交易的日的时候,直接赔钱了。这个图可以给广大读者的保佑侥幸心里的朋友看看,一年忙到头,不如买国运(沪深三百)。


根据公众号 闷声发小财

这个是根据我一个朋友跑的算法跑出来的可能会涨的概率。笔者拿到了他一年的数据(没用爬虫),然后试试能不能涨,这么跑的原因是,我当初确实想按着他的这个公众号来买,不过既然有回放数据测试,我当然先测试一下,本金本来就不多,挥霍不了,不敢随便教学费。

策略如下:

#!/usr/bin/python3    
# encoding: utf-8
# @Time : 2020/11/22 16:33
# @author : zza
# @Email : 740713651@qq.com
# @File : 闷声发小财。py

import warnings

import numpy
import pandas
import rqdatac
from rqalpha.apis import *

__config__ = {
"base": {
"start_date": "20190101",
"end_date": "20191231",
'frequency': '1d',
"accounts": {
"stock": 5000,
},
"data_bundle_path": r"E:\data\bundle",
},
"extra": {
"log_level": "debug",
},
"mod": {
"sys_progress": {
"enabled": True,
"show": True
}, "sys_accounts": {
"enabled": True,
"dividend_reinvestment": True,
},
"sys_analyser": {
"enabled": True,
"plot": True,
'benchmark': '000300.XSHG',
},
},
}

def init(context):
context.df = pandas.read_csv("发小财。csv", dtype={"股票代码": numpy.str})
context.df['order_book_id'] = rqdatac.id_convert(context.df["股票代码"].to_list())
context.used_df = context.df[context.df["预测结果"] > 0.7]
context.sell_multiple = 1.08
context.holding_days = 1
context.buy_queen = []
context.sell_map = {}

def before_trading(context):
print("当日购买:{}".format(context.buy_queen))
print("当日持仓:{}".format(list(context.sell_map.keys())))

def handle_bar(context, bar_dict):
while context.buy_queen:
order_book_id = context.buy_queen.pop()
if context.stock_account.cash > 1000:
o = order_value(order_book_id, 1000)
if o:
trading_dt = get_next_trading_date(context.now, context.holding_days).to_pydatetime()
context.sell_map[order_book_id] = trading_dt
print("[{}] 购买成功".format(order_book_id))
else:
print("[{}] 资金不足,无法购买".format(order_book_id))
for order_id, dt in context.sell_map.copy().items():
a = dt.date() <= context.now.date()
b = bar_dict[order_id].close > get_position(order_id).avg_price # * context.sell_multiple
if a and b:
o = order_percent(order_id, -1)
if o:
del context.sell_map[order_id]
print("[{}] 卖出成功".format(order_id))

def dt_to_int(dt):
return dt.year * 10000 + dt.month * 100 + dt.day

def after_trading(context):
df = context.used_df[dt_to_int(context.now) == context.used_df.trading_dt]
if df.empty:
return
else:
for _, item in df.iterrows():
context.buy_queen.append(item["order_book_id"])

if __name__ == '__main__':
import rqalpha

rqalpha.run_func(init=init,
before_trading=before_trading,
handle_bar=handle_bar,
after_trading=after_trading,
config=__config__)

  • 持有一天就卖出 image.png

  • 持有 3 天并价格高于持仓价则卖出

image.png

结论:还好跑了回放测试,不然得教好多学费。


检测行业因子普遍 (90%) 一周内上涨的低价股票

这个也是我在上班的时候偶尔能想到的,我觉得可能也代表一分部分人的想法,**某个行业市场涨幅的时候,必定是全行业性质的,且会持续一段时间。**于是我就写了这个,根据股票行业暴露度(属于哪个行业),分类行业,如果都涨了,就买进,如果都跌了,就卖出。

  • 检测行业因子普遍 (90%) 一周内上涨的低价股票 购买并持有
  • 在行业下跌在 30% 左右卖出

代码如下:

#!/usr/bin/python3    
# encoding: utf-8
# @Time : 2020/11/22 16:33
# @author : zza
# @Email : 740713651@qq.com
# @File : 闷声发小财。py
from collections import defaultdict
from pprint import pprint

import numpy
import pandas
from rqalpha.apis import *

__config__ = {
"base": {
"start_date": "20190101",
"end_date": "20191231",
'frequency': '1d',
"accounts": {
"stock": 50000,
},
"data_bundle_path": r"E:\data\bundle",
},
"extra": {
"log_level": "debug",
},
"mod": {
"sys_progress": {
"enabled": True,
"show": True
}, "sys_accounts": {
"enabled": True,
"dividend_reinvestment": True,
},
"sys_analyser": {
"enabled": True,
"plot": True,
'benchmark': '000300.XSHG',
},
},
}

def get_holding_ticker():
# 请去米筐购买或申请试用 RQData : www.ricequant.com
rqdatac.init()
df = rqdatac.get_factor_exposure(rqdatac.all_instruments("Stock").order_book_id, "20190101", "20191231")
df.reset_index().sort_values("date").to_csv("factor_exposure.csv", index=False)

def init(context):
df = pandas.read_csv("factor_exposure.csv")
all_order_book_id = all_instruments().order_book_id.to_list()
context.df = df[df["order_book_id"].isin(all_order_book_id)]
context.buy_map = defaultdict(list)
context.holding_map = defaultdict(list)
context.sell_factors = []
context.day_count = 0
context.sell_multiple = 1.2

def before_trading(context):
pprint("当日购买行业:{}".format(context.buy_map))
print("当日持仓:{}".format([i.order_book_id for i in get_positions()]))
print("年化:{}".format(context.portfolio.total_returns))

def handle_bar(context, bar_dict):
# buy
for factors, order_book_ids in context.buy_map.items():
if context.stock_account.cash < 2000:
break
for order_book_id in order_book_ids.copy():
o = order_value(order_book_id, 2000)
if o:
context.holding_map[factors].append(order_book_id)
order_book_ids.remove(order_book_id)
# sell
for factors in context.sell_factors:
for order_book_id in context.holding_map[factors].copy():
# a = bar_dict[order_book_id].close > get_position(order_book_id).avg_price * context.sell_multiple
a = True
if a:
o = order_percent(order_book_id, -1)
if o:
context.holding_map[factors].remove(order_book_id)

def dt_to_str(dt):
return "{}-{}-{}".format(dt.year, dt.month, dt.day)

def is_rise(order_book_id):
a, b = history_bars(order_book_id, 2, frequency="1d", fields='close')
return b > a

def after_trading(context):
context.day_count += 1
if context.day_count % 5 != 1:
return
df = context.df[context.now.date().isoformat() == context.df.date].copy()
df.loc[:, 'rose'] = df["order_book_id"].apply(is_rise)
df.loc[:, 'close'] = df["order_book_id"].apply(lambda x: history_bars(x, 1, frequency="1d", fields='close'))
context.sell_factors = []
context.buy_map = defaultdict(list)
for factors in df.columns[13:-2]:
_df = df[df[factors] == 1]
if _df['rose'].sum() / _df['order_book_id'].size > 0.9:
context.buy_map[factors] = _df[(_df["close"] > 5) & (_df["close"] < 6)].sort_values('close')[
'order_book_id'].to_list()[:2]
elif _df['rose'].sum() / _df['order_book_id'].size < 0.3:
context.sell_factors.append(factors)
del df

if __name__ == '__main__':
import rqalpha

rqalpha.run_func(init=init,
before_trading=before_trading,
handle_bar=handle_bar,
after_trading=after_trading,
config=__config__)

  • 各行业两只股票

emmm,不说了,都是磊。


普通 macd 均线策略

  • 这个是米筐官网的默认策略
from collections import defaultdict    
from pprint import pprint

import numpy
import pandas
from rqalpha.apis import *

__config__ = {
"base": {
"start_date": "20190101",
"end_date": "20191231",
'frequency': '1d',
"accounts": {
"stock": 50000,
},
"data_bundle_path": r"E:\data\bundle",
},
"extra": {
"log_level": "debug",
},
"mod": {
"sys_progress": {
"enabled": True,
"show": True
}, "sys_accounts": {
"enabled": True,
"dividend_reinvestment": True,
},
"sys_analyser": {
"enabled": True,
"plot": True,
'benchmark': '000300.XSHG',
},
},
}

import talib

def init(context):
context.s1 = '000300.XSHG'

context.SHORTPERIOD = 12
context.LONGPERIOD = 26
context.SMOOTHPERIOD = 9
context.OBSERVATION = 100

subscribe(context.s1)

def before_trading(context):
pass

def handle_bar(context, bar_dict):
closes = history_bars(context.s1, context.OBSERVATION, '1d', 'close')

diff, signal, _ = talib.MACD(closes)

if diff[-1] > signal[-1] and diff[-2] < signal[-2]:
order_target_percent(context.s1, 1)

if diff[-1] < signal[-1] and diff[-2] > signal[-2]:
order_target_percent(context.s1, 0)

def after_trading(context):
pass

if __name__ == '__main__':
import rqalpha

rqalpha.run_func(init=init,
before_trading=before_trading,
handle_bar=handle_bar,
after_trading=after_trading,
config=__config__)

image.png

这个是根据大佬的代码抄的。其实数据还行,虽然没跑赢沪深三百,但是确实赚钱了,贴一下 源地址, 大佬说有一部分巧合在里面。不多说了。

我结合下只买五块钱的情况

#!/usr/bin/python3    
# encoding: utf-8
# @Time : 2020/11/22 16:33
# @author : zza
# @Email : 740713651@qq.com
# @File : 闷声发小财。py
import numpy
import pandas
from rqalpha.apis import *

__config__ = {
"base": {
"start_date": "20190101",
"end_date": "20191231",
'frequency': '1d',
"accounts": {
"stock": 50000,
},
"data_bundle_path": r"E:\data\bundle",
},
"extra": {
"log_level": "debug",
},
"mod": {
"sys_progress": {
"enabled": True,
"show": True
}, "sys_accounts": {
"enabled": True,
"dividend_reinvestment": True,
},
"sys_analyser": {
"enabled": True,
"plot": True,
'benchmark': '000300.XSHG',
},
},
}

import talib

def init(context):
context.s1 = '000300.XSHG'

context.SHORTPERIOD = 12
context.LONGPERIOD = 26
context.SMOOTHPERIOD = 9
context.OBSERVATION = 100

subscribe(context.s1)
context.lowest = 5
context.highest = 6

context.order_ticker = []

def _history_bar_for_all_ins(x):
bar = history_bars(x, 1, "1d", fields=['close'])
if not bar:
return numpy.nan
if bar[0]:
return bar[0][0]
return numpy.nan

def get_ticker(context):
all_ins = all_instruments("Stock").order_book_id.to_list()
df = pandas.DataFrame(all_ins, columns=['order_book_id'])
df['close'] = df.order_book_id.apply(_history_bar_for_all_ins)
price_df = df[(context.lowest <= df.close) & (df.close <= context.highest)]

order_ticker = []
for order_book_id in price_df.order_book_id.to_list():
c = pandas.Series(i[0] for i in history_bars(order_book_id, 3, "1d", fields=['close']))
if (c.shift(fill_value=0) < c).all():
order_ticker.append(order_book_id)
return order_ticker

def before_trading(context):
print("年化:{}".format(context.portfolio.total_returns))

def handle_bar(context, bar_dict):
closes = history_bars(context.s1, context.OBSERVATION, '1d', 'close')

diff, signal, _ = talib.MACD(closes)

if diff[-1] > signal[-1] and diff[-2] < signal[-2] :
if context.stock_account.cash > 1000:
context.order_ticker = get_ticker(context)
for item in context.order_ticker:
o = order_target_value(item, 2000)
if o:
print(o)
print("context.order_ticker", context.order_ticker)

if diff[-1] < signal[-1] and diff[-2] > signal[-2]:
for item in context.portfolio.positions.keys():
order_target_percent(item, 0)

def after_trading(context):
context.order_ticker = []
if __name__ == '__main__':
import rqalpha

rqalpha.run_func(init=init,
before_trading=before_trading,
handle_bar=handle_bar,
after_trading=after_trading,
config=__config__)

image.png

还不如直接买华夏国运沪深三百。


朋友想法 A

试试朋友的想法

假如设置一个均值(一直在变),低于 0.8 倍均值(mean_coe),并且连续 fall_days 天,跌幅加起来达到 fall_x%,买入 高于买入价格,并且连续 rise_days 天,涨幅加起来达到 rise_x%,卖出

我开始简单的设置了几个参数,发现结果不是很理想,但是没亏,我就想可能是参数的问题,然后列出了个集合,想着再跑跑,代码如下:

#!/usr/bin/python3    
# encoding: utf-8
# @Time : 2020/11/22 16:33
# @author : zza
# @Email : 740713651@qq.com
import itertools

import numpy
import pandas
from rqalpha.apis import *

__config__ = {
"base": {
"start_date": "20190101",
"end_date": "20191231",
'frequency': '1d',
"accounts": {
"stock": 50000,
},
"data_bundle_path": r"E:\data\bundle",
},
"extra": {
"log_level": "debug",
},
"mod": {
"sys_progress": {
"enabled": True,
"show": True
}, "sys_accounts": {
"enabled": True,
"dividend_reinvestment": True,
},
"sys_analyser": {
"enabled": True,
"plot": True,
'benchmark': '000300.XSHG',
},
},
}

def init(context):
context.mean_coe = 0.8
context.mean_days = 244
context.rise_days = context.config.extra.rise_days
context.fall_days = context.config.extra.fall_days
context.rise_x = context.config.extra.rise_x
context.fall_x = context.config.extra.fall_x
context.order_ticker = [
"002111.XSHE", # 威海广泰
"002673.XSHE", # 西部证券
"601375.XSHG", # 中原证券
]

df = all_instruments('CS')
df = df[df.listed_date.apply(lambda x: x.year in [2015, 2016, 2017])]
context.order_ticker = context.order_ticker + df.order_book_id.to_list()

def before_trading(context):
print("年化:{}".format(context.portfolio.total_returns))
# 价格均值
context.mean_close = {}
for order_book_id in context.order_ticker:
bars = pandas.Series([i[0] for i in history_bars(order_book_id, context.mean_days, "1d", fields=['close'])])
context.mean_close[order_book_id] = bars.mean()

def handle_bar(context, bar_dict):
# for buy
if context.stock_account.cash >= 5000:

for order_book_id in context.order_ticker:
bar_close = bar_dict[order_book_id].close
if bar_close < (context.mean_close[order_book_id] * context.mean_coe):
_str = ""
bars = pandas.Series(
[i[0] for i in history_bars(order_book_id, context.fall_days, "1d", fields=['close'])])
_str += "\n" + f"[{order_book_id}] 价格 ({bar_close}) 低于一年平均价{context.mean_close[order_book_id]}"
if not (bars.shift(-1, fill_value=0) < bars).all():
# 判断连跌
continue
_str += "\n" + f"[{order_book_id}] 连跌 价格表现为{list(bars)}"
if not (bars[0] * context.fall_x > bar_close):
# 判断跌幅
# 非低于 0.8 倍均值(mean_coe)
continue
_str += "\n" + f"[{order_book_id}] 跌幅超过{context.fall_x}, {bars[0]} - {bar_close}"
o = order_value(order_book_id, 5000)
if o:
_str += "\n" + f"[{order_book_id}] 买入".format(order_book_id)
print(_str)
# for sell
for order_book_id in context.order_ticker:
position = context.portfolio.positions[order_book_id]
bar_close = bar_dict[order_book_id].close
if position is None or position.quantity <= 0:
continue
if position.avg_price < bar_close:
# 低于买入价格 不做操作
continue
_str = ""
bars = pandas.Series([i[0] for i in history_bars(order_book_id, context.fall_days, "1d", fields=['close'])])
if not (bars.shift(1, fill_value=0) < bars).all():
# 并且连续 rise_days 天
continue
_str += "\n" + f"[{order_book_id}] 连涨 价格表现为{list(bars)}"
if not (bars[0] * context.rise_x < bar_close):
# 涨幅加起来达到 rise_x%
continue
_str += "\n" + f"[{order_book_id}] 涨幅超过{context.rise_x}, {bars[0]} - {bar_close}"
o = order_target_percent(order_book_id, 0)
if o:
_str += "\n" + f"[{order_book_id}] 出售".format(order_book_id)
print(_str)

def after_trading(context):
pass

import rqalpha

rise_days = range(3, 10)
fall_days = range(3, 10)
rise_x = numpy.arange(1.06, 1.12, 0.01)
fall_x = numpy.arange(0.9, 0.94, 0.01)
config_map = itertools.product(rise_days, fall_days, rise_x, fall_x)

df = pandas.DataFrame(config_map, columns=["rise_days", "fall_days", "rise_x", "fall_x", ])
result_list = []
for _, item in df.iterrows():
__config__['extra']['rise_days'] = int(item.rise_days)
__config__['extra']['fall_days'] = int(item.fall_days)
__config__['extra']['rise_x'] = item.rise_x
__config__['extra']['fall_x'] = item.fall_x
result = rqalpha.run_func(init=init,
before_trading=before_trading,
handle_bar=handle_bar,
after_trading=after_trading,
config=__config__)
total_returns = result['sys_analyser']['summary']['total_value']
result_list.append(result)

其实跑出了几个超过大盘收益的结果。但是因为我的代码水平问题,报错了,结果也没保存,近期我会去吧这个跑完(1300 多的组合方式),有结果了我会贴上最优解的。


总结

我还是不在股票市场里当韭菜了 溜了溜了

项目10 min read