Skip to main content

26 posts tagged with "Python"

View All Tags

  • 人比较懒 ,看不懂评论告诉我哈

vim view_markdown_index.py

#!/usr/bin/python3    
# encoding: utf-8
# @Time : 2020/3/26 15:37
# @author : zza
# @Email : 740713651@qq.com
# @File : readme_helper.py
import datetime
import os
from shutil import copyfile

import markdown
import markdown.extensions.fenced_code
from flask import flash, redirect, request, send_from_directory
from flask_admin import AdminIndexView, expose
from pygments.formatters.html import HtmlFormatter
from werkzeug.utils import secure_filename

formatter = HtmlFormatter(style="emacs", full=True, cssclass="codehilite")
css_string = formatter.get_style_defs()

class MarkdownIndexView(AdminIndexView):

@expose()
def index(self):
"""获取 readme 文件作为 index 页面帮助文档
copy from https://github.com/solitudenote/gitkeeper/blob/d42f5990b05cf28cee12f20780e7794cd3579ead/app.py
"""
# get file
readme_file = open("README.md", "r", encoding="utf8")
md_template_string = markdown.markdown(readme_file.read(), extensions=["fenced_code", "codehilite"])
md_css_string = "<style>" + css_string + "</style>"
md_template = md_css_string + md_template_string
return self.render(self._template, readme_md=md_template, upload_readme_url="/admin/upload_form")

@expose('/upload_form', methods=['POST'])
def upload_file(self):
# check if the post request has the file part
if 'file' not in request.files:
flash('No file part')
return redirect('/admin/')
file = request.files['file']
if file.filename == '':
flash('No file selected for uploading')
return redirect('/admin/')
if file and file.filename == "README.md":
bak_file = "README.md" + ".{}.bak".format(datetime.datetime.now().isoformat()).replace(":", "-")
copyfile("README.md", bak_file)
filename = secure_filename(file.filename)
file.save(filename)
flash('README.md 上传成功')
return redirect('/admin/')
else:
flash('文件名必须为 README.md')
return redirect('/admin/')

@expose('/export')
def export(self):
return send_from_directory(os.path.abspath("."), "README.md", as_attachment=True) # as_a

admin_index_view = MarkdownIndexView(name="主页", template="index.html")

vim index.html

{% extends 'admin/index.html' %}    

{% block page_body %}
{{ super() }}

<table class="markdown-table table-hover searchable">
<tr>
<td>
<a href="{{ get_url('.export') }}"
title="{{ _gettext('Export') }}">{{ _gettext('Export') + ' ' + export_type|upper }}</a>
</td>

<form id="form" method="post" action={{ upload_readme_url }} enctype="multipart/form-data">
<td>
<label class="input-file">
<a title="{{ _gettext('Import') }}"> {{ _gettext('Import') }} </a>
<input type="file" name="file" id="upload-file" autocomplete="off" hidden required>
</label>
</td>
</form>

</tr>
</table>
<br>
<hr/>
<script>

let __main = function () {
document.getElementById("upload-file").onchange = function () {
document.getElementById("form").submit();
};
}

__main()
</script>

{{ readme_md|safe }}

<style>
.markdown-table {
width: 200px;
float: right;

}

.markdown-table label {
font-weight: normal;
}

[hidden] {
display: none !important;
}

</style>
{% endblock %}

vim app,py

from view_markdown_index import admin_index_view    
from flask import Flask

app = Flask(__name__)
admin_view = Admin(
app,
template_mode='bootstrap3',
index_view=admin_index_view,
base_template=r'layout.html',
category_icon_classes={'Profiles': 'glyphicon glyphicon-wrench'},
)

if __name__ == '__main__':
app.run()

Python2 min read

{% extends 'admin/base.html' %}    

{% block head_tail %}
{{ super() }}
<link href="{{ url_for('static', filename='layout.css') }}" rel="stylesheet">
<style>
.container {
width: 100%;
}
</style>
{% endblock %}
PythonOne min read

# https://blog.miguelgrinberg.com/post/the-flask-mega-tutorial-part-xiii-i18n-and-l10n
# 请在rqlicense-server 目录下操作
# 生成翻译文件模版
pybabel extract -F babel.cfg -k _l -o messages.pot .
# 生成翻译文件 一般只需要 init 一次
pybabel init -i messages.pot -d rqlicense/translations -l en
# 更新翻译文件
pybabel update -i messages.pot -d rqlicense/translations -l en
# 编译
pybabel compile -d rqlicense/translations
  • 附赠 messages.po 谷歌翻译脚本
#!/usr/bin/python3    
# encoding: utf-8
# @Time : 2019/12/13 15:25
# @author : zza
# @Email : 740713651@qq.com
# @File : 翻译messages.po文件.py
import re
from tqdm import tqdm
from googletrans import Translator

proxies = {"http": 'http://localhost:9999',
"https": 'https://localhost:9999'}
translate = Translator(proxies=proxies)


def service(messages_po_path):
with open(messages_po_path, "r", encoding="utf8") as f:
messages_body = f.read()
messages_lines = messages_body.split("\n")
result_lines = []
msgid = ""
msgstr = ""
for line in tqdm(messages_lines):
if line.startswith("msgid"):
msgid = line
elif line.startswith("msgstr"):
msgstr = line
if msgid == 'msgid ""':
pass
elif msgstr == 'msgstr ""':
translate_str = re.findall(r"msgid \"(.*)\"", msgid)[0]
en_str = translate.translate(translate_str).text
msgstr = msgstr.replace('""', '"{}"'.format(en_str))
result_lines.append(msgid)
result_lines.append(msgstr)
else:
result_lines.append(line)
result_body = "\n".join(result_lines)
messages_po_to_path = messages_po_path.replace(".po", "bak.po")
with open(messages_po_to_path, "w", encoding="utf8") as f:
f.write(result_body)


if __name__ == '__main__':
messages_po_path = r"D:\PycharmProjects\rqlicense\rqlicense-server\rqlicense\translations\en\LC_MESSAGES\messages.po"
service(messages_po_path)

PythonOne min read

logging 模块多进程解决方案 concurrent-log-handler 0.9.12

错误日志

PermissionError: [WinError 32] 另一个程序正在使用此文件,进程无法访问。: 'E:\\logs\\contest\\contest.log' -> 'E:\\logs\\contest\\contest.log.1'    

解决

  • bash
pip install concurrent-log-handler    
pip install pypiwin32
  • python
from logging import getLogger, DEBUG    
from concurrent_log_handler import ConcurrentRotatingFileHandler
import os

logger = getLogger()
# Use an absolute path to prevent file rotation trouble.
logfile = os.path.abspath("mylogfile.log")
# Rotate log after reaching 512K, keep 5 old copies.
rotateHandler = ConcurrentRotatingFileHandler(logfile, "a", 512*1024, 5)
logger.addHandler(rotateHandler)
logger.setLevel(DEBUG)

logger.info("Here is a very exciting log message, just for you")

  • 如果有很多日志 建议多弄几个 logger

其他参考

PythonOne min read

pip 命令

运行以下命令就好了

pip config set global.index-url http://pypi.douban.com/simple    
pip config set global.trusted-host pypi.douban.com
pip config set global.disable-pip-version-check true
  • 第三条用于关闭版本检查

部分操作系统可能有写入的位置不对的情况 于是乎有了下列脚本 前面配置已生效的同学可以不用管了

网上教程基本上都说 pip 的配置文件在用户目录下 %HOMEPATH%/.pip/pip 但是实际上在在 pip config list读的不知道是那个文件夹的文件 使用pip config set写入到%HOMEPATH%\AppData\Roaming\pip\pip.ini 用 --global --user 等指令 写入的地方也不同 有待研究

操作代码

#!/usr/bin/python3    
# encoding: utf-8
# @Time : 2020/4/13 14:12
# @author : zza
# @Email : 740713651@qq.com
# @File : pip_douban_source.py
# https://python3-cookbook.readthedocs.io/zh_CN/latest/c13/p10_read_configuration_files.html
# https://www.jianshu.com/p/0cdd647bcc3e
"""
python -c "import requests;res = requests.get('http://cdn.ricequant.com/rqpro/pip_douban_source_v2.py');exec(res.text)"
"""

import os
import sys

from configparser import ConfigParser

conf_dir = os.path.join(os.path.expanduser("~"), ".pip")
os.makedirs(conf_dir, exist_ok=True)
WINDOWS = (sys.platform.startswith("win") or (sys.platform == 'cli' and os.name == 'nt'))
CONFIG_BASENAME = 'pip.ini' if WINDOWS else 'pip.conf'
conf_path = os.path.join(conf_dir, CONFIG_BASENAME)

cfg = ConfigParser()
cfg.read(conf_path, encoding="utf8")

if not cfg.has_section('global'):
cfg.add_section('global')

cfg.set('global', 'index-url', 'http://pypi.douban.com/simple')
cfg.set('global', 'trusted-host', 'pypi.douban.com')
cfg.set('global', 'timeout', "60")
cfg.set('global', 'disable-pip-version-check', "true") #关闭版本检查

if not WINDOWS:
if not cfg.has_section('install'):
cfg.add_section('install')
cfg.set("install", "use-mirrors", "true")
cfg.set("install", "mirrors", "https://pypi.douban.com/simple/")
cfg.set("install", "trusted-host", "pypi.douban.com")

with open(conf_path, "w", encoding="utf8:") as f:
cfg.write(f)

print("save to {}".format(conf_path))

一行命令跑起来

  • 这个文件我放公司 cdn 上了
  • 需要用到 python exec 方法 如下:
python -c "import requests;res = requests.get('http://cdn.ricequant.com/rqpro/pip_douban_source_v2.py');exec(res.text)"    

Python2 min read

  • 按 ctrl+V 不能贴贴,ctrl+C 能复制
  • 选中文字后按 Backspace 不是删除,而是选定行往后继续+1
  • 原因:Pycharm 启动了 Vim 编辑模式
  • 解决方式:Tools -Vim Emulator 关闭就行了
  • 字典中,用变量名。get(key1) 和 变量名 [key1] 的区别
  • 变量名。get(key1) 如果没有给 0 值
  • 变量名 [key1] 没有则抛出 KeyError 异常
PythonOne min read

prometheus 监控 ,用gunicorn启动时。多进程内存不互通导致数据有问题。

参考

gunicorn启动方案

  • 安装prometheus_client pip install prometheus_client

  • 复制这个文件到你的项目中 vim monitoring.py

#!/usr/bin/python3    
# encoding: utf-8
# @Time : 2019/8/2 16:29
# @author : zza
# @Email : 740713651@qq.com
# @File : monitoring.py
"""
FROM https://github.com/ITISFoundation/osparc-simcore/blob/3e80ce451352c906f2876113dbb6ae33e8574be1/packages/service-library/src/servicelib/monitoring.py
&& https://github.com/ITISFoundation/osparc-simcore/blob/3e80ce451352c906f2876113dbb6ae33e8574be1/packages/service-library/src/servicelib/monitoring.py
"""
import time

from flask import request, current_app, Response
from prometheus_client import Counter, Histogram
from prometheus_client import multiprocess
from prometheus_client import generate_latest, CollectorRegistry, CONTENT_TYPE_LATEST, Gauge

# Example gauge.
IN_PROGRESS = Gauge("inprogress_requests", "help", multiprocess_mode='livesum')


# Expose metrics.
@IN_PROGRESS.track_inprogress()
def app(environ, start_response):
registry = CollectorRegistry()
multiprocess.MultiProcessCollector(registry)
data = generate_latest(registry)
status = '200 OK'
response_headers = [
('Content-type', CONTENT_TYPE_LATEST),
('Content-Length', str(len(data)))
]
start_response(status, response_headers)
return iter([data])


def setup_monitoring(app, app_name=None):
if app_name is None:
app_name = app.name

def start_timer():
request.start_time = time.time()
current_app.extensions["prometheus"]['REQUEST_IN_PROGRESS'].labels(
app_name, request.endpoint, request.method).inc()

def record_request_data(response):
resp_time = time.time() - request.start_time
endpoint = request.endpoint
ext_prometheus = current_app.extensions["prometheus"]
ext_prometheus['REQUEST_LATENCY'].labels(app_name, endpoint).observe(resp_time)
ext_prometheus['REQUEST_IN_PROGRESS'].labels(app_name, endpoint, request.method).dec()
ext_prometheus['REQUEST_COUNT'].labels(app_name, request.method, endpoint, response.status).inc()
return response

app.before_request(start_timer)
app.after_request(record_request_data)

extensions_prometheus = dict()
extensions_prometheus['app_name'] = app_name
extensions_prometheus['REQUEST_COUNT'] = Counter(
'http_requests_total', 'Total Request Count',
['app_name', 'method', 'endpoint', 'http_status']
)

# Latency of a request in seconds
extensions_prometheus['REQUEST_LATENCY'] = Histogram(
'http_request_latency_seconds', 'Request latency',
['app_name', 'endpoint']
)

extensions_prometheus['REQUEST_IN_PROGRESS'] = Gauge(
'http_requests_in_progress_total', 'Requests in progress',
['app_name', 'endpoint', 'method']
)

app.extensions["prometheus"] = extensions_prometheus

@app.route("/metrics")
def metrics():
registry = CollectorRegistry()
multiprocess.MultiProcessCollector(registry)
data = generate_latest(registry)
return Response(data, mimetype=CONTENT_TYPE_LATEST)

  • 在你代码中 导入文件并初始化
# from flask import Flask    
# app = Flask(__name__)
from persistd.monitoring import setup_monitoring
setup_monitoring(app, "app_name")
  • 设置Gunicom配置文件 vim gunicorn.conf.py
from prometheus_client import multiprocess    
def child_exit(server, worker):
multiprocess.mark_process_dead(worker.pid)
  • 启动Gunicom时 增加参数指向配置文件 -c gunicorn.conf.py

  • 设置环境变量:需要一个临时文件夹,且环境变量prometheus_multiproc_dir指向该文件夹(注意启动用户读写权限), 该文件夹用于存放prometheus数据。

rm -rf multiproc-tmp    
mkdir multiproc-tmp
export prometheus_multiproc_dir=multiproc-tmp
gunicorn -c gunicorn_conf.py -w 4 yourapp:app

附一个 asyncio 的 monitoring.py

#!/usr/bin/python3    
# encoding: utf-8
# @Time : 2019/9/5 16:36
# @author : zza
# @File : monitoring.py
"""
FROM:
https://github.com/cloud-cds/cds-stack/blob/4243cd9b2e878f16a251d05afb2d202d71e41dce/api/monitoring.py
https://github.com/DD-DeCaF/gene-to-reactions/blob/3af42110433edf8495810e6a95a516368464e179/src/gene_to_reactions/app.py

setup_monitoring(app, "app_name")
"""
import time
import asyncio
from aiohttp import web
from prometheus_client import multiprocess, generate_latest
from prometheus_client import CONTENT_TYPE_LATEST, CollectorRegistry, Histogram, Counter, Gauge


def prom_middleware(app_name):
@asyncio.coroutine
def factory(app, handler):
@asyncio.coroutine
def middleware_handler(request):
try:
request['start_time'] = time.time()
request.app['REQUEST_IN_PROGRESS'].labels(
app_name, request.path, request.method).inc()
response = yield from handler(request)
resp_time = time.time() - request['start_time']
request.app['REQUEST_LATENCY'].labels(app_name, request.path).observe(resp_time)
request.app['REQUEST_IN_PROGRESS'].labels(app_name, request.path, request.method).dec()
request.app['REQUEST_COUNT'].labels(
app_name, request.method, request.path, response.status).inc()
return response
except Exception as ex:
raise

return middleware_handler

return factory


async def metrics(request):
resp = web.Response(body=generate_latest(multiprocess.MultiProcessCollector(CollectorRegistry())))
resp.content_type = CONTENT_TYPE_LATEST
return resp


def setup_monitoring(app, app_name):
app['REQUEST_COUNT'] = Counter(
'requests_total', 'Total Request Count',
['app_name', 'method', 'endpoint', 'http_status']
)
app['REQUEST_LATENCY'] = Histogram(
'request_latency_seconds', 'Request latency',
['app_name', 'endpoint']
)

app['REQUEST_IN_PROGRESS'] = Gauge(
'requests_in_progress_total', 'Requests in progress',
['app_name', 'endpoint', 'method']
)

app.middlewares.insert(0, prom_middleware(app_name))
app.router.add_get("/metrics", metrics)
Python2 min read

  • 跑程序的时候发现 _id 相同冲突
  • 程序中 mongo_id 的生成有 timestamp 决定
  • 于是测试:
import time    
for i in range(20):
print(time.time())

a = time.time()
for i in range(10**8):
time.time()
print(time.time() - a)

结果:

1546937536.8363261    
1546937536.8363261
1546937536.8363261
1546937536.8363261
1546937536.8372946
1546937536.8372946
1546937536.8372946
1546937536.8372946
1546937536.8372946
1546937536.8372946
1546937536.8372946
1546937536.8372946
1546937536.8372946
1546937536.8372946
1546937536.8372946
1546937536.8372946
1546937536.8372946
1546937536.8372946
1546937536.8372946
1546937536.8372946
9.680140972137451
  • emmm 经过与同事的机器的对比 短时间内

  • Windows 上时间戳是相同

  • mac 不同

  • 似乎是系统 bug

  • 各位要用时间戳做 id 的同学注意下

  • 注意

  • 1546937536.8363261

  • 1546937536.8372946

  • 时间不是连续的

  • time 的精度不同

PythonOne min read

要求

  • 一亿数据 10*8
  • 测试用 5w 数据
  • 预测时间为 结果时间* 2000

设计思路

  • 程序执行 20 遍求平均值
  • 结束时间 - 开始时间
  • 不同 python 引擎
  • 不同数据量 然后 commit 提交 响应速度

数据库连接工具

  • MySQL-Python
  • pymysql
  • [X ] MySQL-Connector

代码

    #!/usr/bin/python3    
# encoding: utf-8
# @Time : 2018/7/14 0014 16:12
# @author : zza
# @Email : 740713651@qq.com
import time

from flask import Flask
from flask_sqlalchemy import SQLAlchemy

db = SQLAlchemy()

class Student(db.Model):
__tablename__ = "stu"
id = db.Column(db.Integer, primary_key=True, autoincrement=True, nullable=True)
name_ = db.Column(db.String(127))
age = db.Column(db.Integer)
class_num = db.Column(db.Integer)

def init(param):
class sqlalchemy(SQLAlchemy):

def __del__(self):
print("数据库关闭")
db.session.close_all()

app = Flask(__name__)
app.config['SQLALCHEMY_DATABASE_URI'] = param + "?charset=utf8&autocommit=False"
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = True
app.config['SQLALCHEMY_POOL_SIZE'] = 128
app.config['SQLALCHEMY_POOL_TIMEOUT'] = 60
app.config['SQLALCHEMY_POOL_RECYCLE'] = 30
app.config['SQLALCHEMY_MAX_OVERFLOW'] = 128
# app.config['SQLALCHEMY_ECHO'] = True
global db
db = sqlalchemy(app)

def finish():
db.session.query(Student).delete()
db.session.commit()

def time_me(fn):
def _wrapper(*args, **kwargs):
average = 0
i1 = 30
seconds = 0
for i in range(i1):
start = time.time()
fn(*args, **kwargs)
seconds = time.time() - start
average += seconds
finish()
print(u"{func}函数写入耗时{sec}秒".format(func=fn.__name__, sec=seconds))
# print(u"{func}函数每{count}条数数据写入耗时{sec}秒".format(func=fn.__name__, count=args[0], sec=seconds))
# finish()
return seconds, args

return _wrapper

@time_me
def insert_many():
# 插入诗句
all = 5 * 10 ** 4
inner = 1000
out = int(all / inner)
for i in range(out):
for c in range(inner):
db.session.add(Student(name_='test mysql insert', age=30, class_num=30))
db.session.commit()

######
@time_me
def insert_many_by_sql():
all = 5 * 10 ** 4
inner = 1000
out = int(all / inner)
with db.session.connection() as con:
for i in range(out):
for c in range(inner):
con.execute(
"INSERT INTO stu ( id ,name_, age, class_num) VALUES (null ,{}, {},{})".format(
"'test2mysql3insert'",
30, 30))
db.session.commit()

def main2():
"""测试 sql 语句与 orm 框架 谁快 包括数据组装"""
init("mysql+pymysql://root:root@192.168.14.147:3306/efficiency_test")
print("orm 框架插入数据")
# iinsert_many 函数每 500 条数数据写入耗时 19.671629905700684 秒
insert_many()
print("sql 语句插入数据")
# insert_many_by_sql 函数每 500 条数数据写入耗时 17.977628707885742 秒
insert_many_by_sql()
pass

def main():
print('测试开始')
# insert_many 函数写入耗时 168.07286262512207 秒
init("mysql+mysqlconnector://root:root@192.168.14.147:3306/efficiency_test")
insert_many()

# insert_many 函数写入耗时 64.85304117202759 秒
init("mysql://root:root@192.168.14.147:3306/efficiency_test") # 默认使用 MySQLdb
insert_many()

# insert_many 函数写入耗时 64.692676067352295 秒
init("mysql+pymysql://root:root@192.168.14.147:3306/efficiency_test")
insert_many()

# insert_many 函数写入耗时 66.991496086120605 秒
init("mysql+mysqldb://root:root@192.168.14.147:3306/efficiency_test")
insert_many()

if __name__ == '__main__':
main()
main2()

Python2 min read

sql 大概是这个样子

select booking_id from booking where concat(num, ',' , name) in ('1,name1', '2,name2', '3,name3')

用sqlalchemy 实现

_list_data = ['1,name1', '2,name2', '3,name3']    
session.query(Booking.booking_id).filter(Booking.num.concat(",").concat(Booking.name).notin_(_list_data))

PythonOne min read