Skip to main content

29 posts tagged with "Python"

View All Tags

相关资料:

安装依赖

pip install pstats  
pip install snakeviz

获取程序运行数据

通过cProfile获取性能分析数据

  • cProfile自python2.5以来就是标准版Python解释器默认的性能分析器。
  • cProfile是一种确定性分析器,只测量CPU时间,并不关心内存消耗和其他与内存相关联的信息。
def run_1():  
"""your function """
pass


def profile_func(func):
import cProfile
file_name = "prof_{}_1.pstat".format(func.__name__)
cProfile.run("{}()".format(func.__name__), file_name)
import pstats
p = pstats.Stats(file_name).sort_stats("cumtime")
# p.print_stats("rqalpha_mod_ricequant_data")
p.print_stats("base_position")
return p


if __name__ == '__main__':
p = profile_func(run_1)

通过pycharm直接获取运行数据

Pycharm 右上角启动按钮旁边有性能分析按钮,
点击后会自动弹出pycharm自带的性能分析界面,
当然也可以复制.pstat文件地址,使用snakeviz进行分析(推荐)。


snakeviz 生成剖面图

运行目录下 命令行输入:snakeviz prof_run_1_1.pstat

点击生成的连接 查看柱状剖面图

image.png


gprof2dot 时间分析图

gprof2dot -f pstats mkm_run.prof | dot -Tpng -o mkm_run.png

image.png

PythonOne min read

转载自《PyPI 打包上传实践》

1. 代码打包

要打包代码,首先需要编写自己的代码包。比如你写了一个。py 文件,里面有一些函数啥的,为了方便调用,你需要将代码打包,下次使用时直接调用就好,因此,第一步,将你写的代码打包。 创建一个文件夹,并在该文件夹下创建 __init__.py 文件,然后将你写的。py 文件放到这个文件夹下面就行。

packagename/    
|
+-- __init__.py
|
+-- myfunction.py
|
+-- mymorefunction.py
|
+-- ...
|

packagename为你创建的包名称,myxxx.py是你写的 python 代码,还有添加个__init__.py文件(文件内容可以为空). 现在你可以调用这个包了(引入包的路径)

import packagename    

2. 符合 pypi 的格式

将上面的文件的目录结构改成如下格式

packagename    
|
+-- COPYING.txt
|
+-- README.rst
|
+-- setup.py
|
+-- packagename
. |
. +-- __init__.py
. |
. +-- myscripts1.py
. |
. +-- mysscripts2.py
. |
. +-- mymorescripts.py
. |
.
|
+-- docs/
|

就是将原来的目录深移一层,文件夹的名称一样即可。在第一层目录下创建些特殊文件。 Tips

  • COPYING.txt : 可以不要(节约时间,重要的事情先说、简单说)。 就是授权文件,里面是你关于这个包的授权,比如:MIT license,那么你里面放入 MIT License 全文即可,当然,如果你不清楚这个,你完全可以不要这个文件。
  • README.rst:就是介绍,可以不要吧(不推荐,要是想让大家用的话还是好好写一写) 这个文件想必研发都应该清楚。如果有,尽量放些东西在这里了,后面如果可能我们会用到它的。
  • setup.py:核心文件 这里面的内容后面讲
  • docs/(这是个文件夹,存放一些文档的) 这个文件夹你放你的 documents 吧,不过要用心写文档真是个难事,所以这个文件夹基本是不存在的——为自己的懒惰可耻一把。

**setup.py **的样例

# coding: utf-8    
import codecs
import os
import sys

try:
from setuptools import setup
except:
from distutils.core import setup

"""
打包的用的 setup 必须引入,
"""

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import sys
if sys.version_info < (2, 5):
sys.exit('Python 2.5 or greater is required.')

try:
from setuptools import setup
except ImportError:
from distutils.core import setup

import SendMoney

with open('README.rst', 'rb') as fp:
readme = fp.read()

# 版本号,自己随便写
VERSION = "1.0.7"

LICENSE = "MIT"

setup(
name='<项目的名称>',
version=VERSION,
description=(
'<项目的简单描述>'
),
long_description=readme,
author='<你的名字>',
author_email='<你的邮件地址>',
maintainer='<维护人员的名字>',
maintainer_email='<维护人员的邮件地址',
license=LICENSE,
packages=find_packages(),
platforms=["all"],
url='<项目的网址,我一般都是 github 的 url>',
install_requires=[
"beautifulsoup4",
lxml_requirement
],
classifiers=[
'Development Status :: 4 - Beta',
'Operating System :: OS Independent',
'Intended Audience :: Developers',
'License :: OSI Approved :: BSD License',
'Programming Language :: Python',
'Programming Language :: Python :: Implementation',
'Programming Language :: Python :: 2',
'Programming Language :: Python :: 2.7',
'Programming Language :: Python :: 3',
'Programming Language :: Python :: 3.4',
'Programming Language :: Python :: 3.5',
'Programming Language :: Python :: 3.6',
'Topic :: Software Development :: Libraries'
],
)

# URL 你这个包的项目地址,如果有,给一个吧,没有你直接填写在 PyPI 你这个包的地址也是可以的
# INSTALL_REQUIRES 模块所依赖的 python 模块
# 以上字段不需要都包含

文中的 classifiers 的内容并不是随便填写的,你需要参照本文参考文档中的 PyPI Classifiers 来写

3、开始使用 Distutils 进行打包

为了保证效果,在打包之前我们可以验证 setup.py 的正确性,执行下面的代码

python setup.py check

输出一般是 running check 如果有错误或者警告,就会在此之后显示 没有任何显示表示 Distutils 认可你这个 setup.py 文件。

如果没有问题,那么就可以正式打包,执行下面的代码:

python setup.py sdist

执行完成后,会在顶层目录下生成 dist 目录和 egg 目录

打包完成后就可以准备将打包好的模块上传到 pypi 了,首先你需要在 pypi 上进行注册 注册完成后,你需要在本地创建好 pypi 的配置文件,不然有可能会出现使用 http 无法上传到 pypi 的问题 在用户目录下创建。pypirc 文件,文件的内容如下 window 用户创建.pypirc可以命名为.pypirc. 位置示例:C:\Users\admin\.pypirc.

[distutils]    
index-servers=pypi

[pypi]
repository = https://upload.pypi.org/legacy/
username = <username>
password = <password>

完成后运行:

python setup.py register sdist upload

最后出现Server response (200): OK就是成功了,可以去 pypi 上查看自己发布的包

包到这里,就完成了上传 PyPI 的工作了。你如果要用,安装下就好:

pip install packagename

这个过程还是很顺利的,以后多尝试,出现问题再补充!

作者:snowy_sunny 链接:https://www.jianshu.com/p/be91c70adb27

Python5 min read

相关参考 python 中文写入 CSV 乱码解决

  • 原因:csv 文件开头缺少 BOM_UTF8 字符
  • 解决:在 Response 开头加上 BOM_UTF8 字符
  • 核心代码
titles[0] = codecs.BOM_UTF8.decode("utf8")+codecs.BOM_UTF8.decode()+titles[0]    
  • 相关引入
import codecs    
import csv

from flask_admin._compat import csv_encode
from flask import request, redirect, flash, current_app, Response, stream_with_context
  • 逻辑代码(复制 放到对应的 ModelView 类里就好了)
    def _export_csv(self, return_url):    
"""
Export a CSV of records as a stream.
"""
count, data = self._export_data()

# https://docs.djangoproject.com/en/1.8/howto/outputting-csv/
class Echo(object):
"""
An object that implements just the write method of the file-like
interface.
"""
def write(self, value):
"""
Write the value by returning it, instead of storing
in a buffer.
"""
return value

#
writer = csv.writer(Echo())

def generate():
# Append the column titles at the beginning
titles = [csv_encode(c[1]) for c in self._export_columns]
titles[0] = codecs.BOM_UTF8.decode("utf8")+codecs.BOM_UTF8.decode()+titles[0]
yield writer.writerow(titles)

for row in data:
vals = [csv_encode(self.get_export_value(row, c[0]))
for c in self._export_columns]
yield writer.writerow(vals)

filename = self.get_export_name(export_type='csv')

disposition = 'attachment;filename=%s' % (secure_filename(filename),)

return Response(
stream_with_context(generate()),
headers={'Content-Disposition': disposition},
mimetype='text/csv'
)
PythonOne min read

  • 人比较懒 ,看不懂评论告诉我哈

vim view_markdown_index.py

#!/usr/bin/python3    
# encoding: utf-8
# @Time : 2020/3/26 15:37
# @author : zza
# @Email : 740713651@qq.com
# @File : readme_helper.py
import datetime
import os
from shutil import copyfile

import markdown
import markdown.extensions.fenced_code
from flask import flash, redirect, request, send_from_directory
from flask_admin import AdminIndexView, expose
from pygments.formatters.html import HtmlFormatter
from werkzeug.utils import secure_filename

formatter = HtmlFormatter(style="emacs", full=True, cssclass="codehilite")
css_string = formatter.get_style_defs()

class MarkdownIndexView(AdminIndexView):

@expose()
def index(self):
"""获取 readme 文件作为 index 页面帮助文档
copy from https://github.com/solitudenote/gitkeeper/blob/d42f5990b05cf28cee12f20780e7794cd3579ead/app.py
"""
# get file
readme_file = open("README.md", "r", encoding="utf8")
md_template_string = markdown.markdown(readme_file.read(), extensions=["fenced_code", "codehilite"])
md_css_string = "<style>" + css_string + "</style>"
md_template = md_css_string + md_template_string
return self.render(self._template, readme_md=md_template, upload_readme_url="/admin/upload_form")

@expose('/upload_form', methods=['POST'])
def upload_file(self):
# check if the post request has the file part
if 'file' not in request.files:
flash('No file part')
return redirect('/admin/')
file = request.files['file']
if file.filename == '':
flash('No file selected for uploading')
return redirect('/admin/')
if file and file.filename == "README.md":
bak_file = "README.md" + ".{}.bak".format(datetime.datetime.now().isoformat()).replace(":", "-")
copyfile("README.md", bak_file)
filename = secure_filename(file.filename)
file.save(filename)
flash('README.md 上传成功')
return redirect('/admin/')
else:
flash('文件名必须为 README.md')
return redirect('/admin/')

@expose('/export')
def export(self):
return send_from_directory(os.path.abspath("."), "README.md", as_attachment=True) # as_a

admin_index_view = MarkdownIndexView(name="主页", template="index.html")

vim index.html

{% extends 'admin/index.html' %}    

{% block page_body %}
{{ super() }}

<table class="markdown-table table-hover searchable">
<tr>
<td>
<a href="{{ get_url('.export') }}"
title="{{ _gettext('Export') }}">{{ _gettext('Export') + ' ' + export_type|upper }}</a>
</td>

<form id="form" method="post" action={{ upload_readme_url }} enctype="multipart/form-data">
<td>
<label class="input-file">
<a title="{{ _gettext('Import') }}"> {{ _gettext('Import') }} </a>
<input type="file" name="file" id="upload-file" autocomplete="off" hidden required>
</label>
</td>
</form>

</tr>
</table>
<br>
<hr/>
<script>

let __main = function () {
document.getElementById("upload-file").onchange = function () {
document.getElementById("form").submit();
};
}

__main()
</script>

{{ readme_md|safe }}

<style>
.markdown-table {
width: 200px;
float: right;

}

.markdown-table label {
font-weight: normal;
}

[hidden] {
display: none !important;
}

</style>
{% endblock %}

vim app,py

from view_markdown_index import admin_index_view    
from flask import Flask

app = Flask(__name__)
admin_view = Admin(
app,
template_mode='bootstrap3',
index_view=admin_index_view,
base_template=r'layout.html',
category_icon_classes={'Profiles': 'glyphicon glyphicon-wrench'},
)

if __name__ == '__main__':
app.run()

Python2 min read

{% extends 'admin/base.html' %}    

{% block head_tail %}
{{ super() }}
<link href="{{ url_for('static', filename='layout.css') }}" rel="stylesheet">
<style>
.container {
width: 100%;
}
</style>
{% endblock %}
PythonOne min read

# https://blog.miguelgrinberg.com/post/the-flask-mega-tutorial-part-xiii-i18n-and-l10n
# 请在rqlicense-server 目录下操作
# 生成翻译文件模版
pybabel extract -F babel.cfg -k _l -o messages.pot .
# 生成翻译文件 一般只需要 init 一次
pybabel init -i messages.pot -d rqlicense/translations -l en
# 更新翻译文件
pybabel update -i messages.pot -d rqlicense/translations -l en
# 编译
pybabel compile -d rqlicense/translations
  • 附赠 messages.po 谷歌翻译脚本
#!/usr/bin/python3    
# encoding: utf-8
# @Time : 2019/12/13 15:25
# @author : zza
# @Email : 740713651@qq.com
# @File : 翻译messages.po文件.py
import re
from tqdm import tqdm
from googletrans import Translator

proxies = {"http": 'http://localhost:9999',
"https": 'https://localhost:9999'}
translate = Translator(proxies=proxies)


def service(messages_po_path):
with open(messages_po_path, "r", encoding="utf8") as f:
messages_body = f.read()
messages_lines = messages_body.split("\n")
result_lines = []
msgid = ""
msgstr = ""
for line in tqdm(messages_lines):
if line.startswith("msgid"):
msgid = line
elif line.startswith("msgstr"):
msgstr = line
if msgid == 'msgid ""':
pass
elif msgstr == 'msgstr ""':
translate_str = re.findall(r"msgid \"(.*)\"", msgid)[0]
en_str = translate.translate(translate_str).text
msgstr = msgstr.replace('""', '"{}"'.format(en_str))
result_lines.append(msgid)
result_lines.append(msgstr)
else:
result_lines.append(line)
result_body = "\n".join(result_lines)
messages_po_to_path = messages_po_path.replace(".po", "bak.po")
with open(messages_po_to_path, "w", encoding="utf8") as f:
f.write(result_body)


if __name__ == '__main__':
messages_po_path = r"D:\PycharmProjects\rqlicense\rqlicense-server\rqlicense\translations\en\LC_MESSAGES\messages.po"
service(messages_po_path)

PythonOne min read

logging 模块多进程解决方案 concurrent-log-handler 0.9.12

错误日志

PermissionError: [WinError 32] 另一个程序正在使用此文件,进程无法访问。: 'E:\\logs\\contest\\contest.log' -> 'E:\\logs\\contest\\contest.log.1'    

解决

  • bash
pip install concurrent-log-handler    
pip install pypiwin32
  • python
from logging import getLogger, DEBUG    
from concurrent_log_handler import ConcurrentRotatingFileHandler
import os

logger = getLogger()
# Use an absolute path to prevent file rotation trouble.
logfile = os.path.abspath("mylogfile.log")
# Rotate log after reaching 512K, keep 5 old copies.
rotateHandler = ConcurrentRotatingFileHandler(logfile, "a", 512*1024, 5)
logger.addHandler(rotateHandler)
logger.setLevel(DEBUG)

logger.info("Here is a very exciting log message, just for you")

  • 如果有很多日志 建议多弄几个 logger

其他参考

PythonOne min read

pip 命令

运行以下命令就好了

pip config set global.index-url http://pypi.douban.com/simple    
pip config set global.trusted-host pypi.douban.com
pip config set global.disable-pip-version-check true
  • 第三条用于关闭版本检查

部分操作系统可能有写入的位置不对的情况 于是乎有了下列脚本 前面配置已生效的同学可以不用管了

网上教程基本上都说 pip 的配置文件在用户目录下 %HOMEPATH%/.pip/pip 但是实际上在在 pip config list读的不知道是那个文件夹的文件 使用pip config set写入到%HOMEPATH%\AppData\Roaming\pip\pip.ini 用 --global --user 等指令 写入的地方也不同 有待研究

操作代码

#!/usr/bin/python3    
# encoding: utf-8
# @Time : 2020/4/13 14:12
# @author : zza
# @Email : 740713651@qq.com
# @File : pip_douban_source.py
# https://python3-cookbook.readthedocs.io/zh_CN/latest/c13/p10_read_configuration_files.html
# https://www.jianshu.com/p/0cdd647bcc3e
"""
python -c "import requests;res = requests.get('http://cdn.ricequant.com/rqpro/pip_douban_source_v2.py');exec(res.text)"
"""

import os
import sys

from configparser import ConfigParser

conf_dir = os.path.join(os.path.expanduser("~"), ".pip")
os.makedirs(conf_dir, exist_ok=True)
WINDOWS = (sys.platform.startswith("win") or (sys.platform == 'cli' and os.name == 'nt'))
CONFIG_BASENAME = 'pip.ini' if WINDOWS else 'pip.conf'
conf_path = os.path.join(conf_dir, CONFIG_BASENAME)

cfg = ConfigParser()
cfg.read(conf_path, encoding="utf8")

if not cfg.has_section('global'):
cfg.add_section('global')

cfg.set('global', 'index-url', 'http://pypi.douban.com/simple')
cfg.set('global', 'trusted-host', 'pypi.douban.com')
cfg.set('global', 'timeout', "60")
cfg.set('global', 'disable-pip-version-check', "true") #关闭版本检查

if not WINDOWS:
if not cfg.has_section('install'):
cfg.add_section('install')
cfg.set("install", "use-mirrors", "true")
cfg.set("install", "mirrors", "https://pypi.douban.com/simple/")
cfg.set("install", "trusted-host", "pypi.douban.com")

with open(conf_path, "w", encoding="utf8:") as f:
cfg.write(f)

print("save to {}".format(conf_path))

一行命令跑起来

  • 这个文件我放公司 cdn 上了
  • 需要用到 python exec 方法 如下:
python -c "import requests;res = requests.get('http://cdn.ricequant.com/rqpro/pip_douban_source_v2.py');exec(res.text)"    

Python2 min read

  • 按 ctrl+V 不能贴贴,ctrl+C 能复制
  • 选中文字后按 Backspace 不是删除,而是选定行往后继续+1
  • 原因:Pycharm 启动了 Vim 编辑模式
  • 解决方式:Tools -Vim Emulator 关闭就行了
  • 字典中,用变量名。get(key1) 和 变量名 [key1] 的区别
  • 变量名。get(key1) 如果没有给 0 值
  • 变量名 [key1] 没有则抛出 KeyError 异常
PythonOne min read

prometheus 监控 ,用gunicorn启动时。多进程内存不互通导致数据有问题。

参考

gunicorn启动方案

  • 安装prometheus_client pip install prometheus_client

  • 复制这个文件到你的项目中 vim monitoring.py

#!/usr/bin/python3    
# encoding: utf-8
# @Time : 2019/8/2 16:29
# @author : zza
# @Email : 740713651@qq.com
# @File : monitoring.py
"""
FROM https://github.com/ITISFoundation/osparc-simcore/blob/3e80ce451352c906f2876113dbb6ae33e8574be1/packages/service-library/src/servicelib/monitoring.py
&& https://github.com/ITISFoundation/osparc-simcore/blob/3e80ce451352c906f2876113dbb6ae33e8574be1/packages/service-library/src/servicelib/monitoring.py
"""
import time

from flask import request, current_app, Response
from prometheus_client import Counter, Histogram
from prometheus_client import multiprocess
from prometheus_client import generate_latest, CollectorRegistry, CONTENT_TYPE_LATEST, Gauge

# Example gauge.
IN_PROGRESS = Gauge("inprogress_requests", "help", multiprocess_mode='livesum')


# Expose metrics.
@IN_PROGRESS.track_inprogress()
def app(environ, start_response):
registry = CollectorRegistry()
multiprocess.MultiProcessCollector(registry)
data = generate_latest(registry)
status = '200 OK'
response_headers = [
('Content-type', CONTENT_TYPE_LATEST),
('Content-Length', str(len(data)))
]
start_response(status, response_headers)
return iter([data])


def setup_monitoring(app, app_name=None):
if app_name is None:
app_name = app.name

def start_timer():
request.start_time = time.time()
current_app.extensions["prometheus"]['REQUEST_IN_PROGRESS'].labels(
app_name, request.endpoint, request.method).inc()

def record_request_data(response):
resp_time = time.time() - request.start_time
endpoint = request.endpoint
ext_prometheus = current_app.extensions["prometheus"]
ext_prometheus['REQUEST_LATENCY'].labels(app_name, endpoint).observe(resp_time)
ext_prometheus['REQUEST_IN_PROGRESS'].labels(app_name, endpoint, request.method).dec()
ext_prometheus['REQUEST_COUNT'].labels(app_name, request.method, endpoint, response.status).inc()
return response

app.before_request(start_timer)
app.after_request(record_request_data)

extensions_prometheus = dict()
extensions_prometheus['app_name'] = app_name
extensions_prometheus['REQUEST_COUNT'] = Counter(
'http_requests_total', 'Total Request Count',
['app_name', 'method', 'endpoint', 'http_status']
)

# Latency of a request in seconds
extensions_prometheus['REQUEST_LATENCY'] = Histogram(
'http_request_latency_seconds', 'Request latency',
['app_name', 'endpoint']
)

extensions_prometheus['REQUEST_IN_PROGRESS'] = Gauge(
'http_requests_in_progress_total', 'Requests in progress',
['app_name', 'endpoint', 'method']
)

app.extensions["prometheus"] = extensions_prometheus

@app.route("/metrics")
def metrics():
registry = CollectorRegistry()
multiprocess.MultiProcessCollector(registry)
data = generate_latest(registry)
return Response(data, mimetype=CONTENT_TYPE_LATEST)

  • 在你代码中 导入文件并初始化
# from flask import Flask    
# app = Flask(__name__)
from persistd.monitoring import setup_monitoring
setup_monitoring(app, "app_name")
  • 设置Gunicom配置文件 vim gunicorn.conf.py
from prometheus_client import multiprocess    
def child_exit(server, worker):
multiprocess.mark_process_dead(worker.pid)
  • 启动Gunicom时 增加参数指向配置文件 -c gunicorn.conf.py

  • 设置环境变量:需要一个临时文件夹,且环境变量prometheus_multiproc_dir指向该文件夹(注意启动用户读写权限), 该文件夹用于存放prometheus数据。

rm -rf multiproc-tmp    
mkdir multiproc-tmp
export prometheus_multiproc_dir=multiproc-tmp
gunicorn -c gunicorn_conf.py -w 4 yourapp:app

附一个 asyncio 的 monitoring.py

#!/usr/bin/python3    
# encoding: utf-8
# @Time : 2019/9/5 16:36
# @author : zza
# @File : monitoring.py
"""
FROM:
https://github.com/cloud-cds/cds-stack/blob/4243cd9b2e878f16a251d05afb2d202d71e41dce/api/monitoring.py
https://github.com/DD-DeCaF/gene-to-reactions/blob/3af42110433edf8495810e6a95a516368464e179/src/gene_to_reactions/app.py

setup_monitoring(app, "app_name")
"""
import time
import asyncio
from aiohttp import web
from prometheus_client import multiprocess, generate_latest
from prometheus_client import CONTENT_TYPE_LATEST, CollectorRegistry, Histogram, Counter, Gauge


def prom_middleware(app_name):
@asyncio.coroutine
def factory(app, handler):
@asyncio.coroutine
def middleware_handler(request):
try:
request['start_time'] = time.time()
request.app['REQUEST_IN_PROGRESS'].labels(
app_name, request.path, request.method).inc()
response = yield from handler(request)
resp_time = time.time() - request['start_time']
request.app['REQUEST_LATENCY'].labels(app_name, request.path).observe(resp_time)
request.app['REQUEST_IN_PROGRESS'].labels(app_name, request.path, request.method).dec()
request.app['REQUEST_COUNT'].labels(
app_name, request.method, request.path, response.status).inc()
return response
except Exception as ex:
raise

return middleware_handler

return factory


async def metrics(request):
resp = web.Response(body=generate_latest(multiprocess.MultiProcessCollector(CollectorRegistry())))
resp.content_type = CONTENT_TYPE_LATEST
return resp


def setup_monitoring(app, app_name):
app['REQUEST_COUNT'] = Counter(
'requests_total', 'Total Request Count',
['app_name', 'method', 'endpoint', 'http_status']
)
app['REQUEST_LATENCY'] = Histogram(
'request_latency_seconds', 'Request latency',
['app_name', 'endpoint']
)

app['REQUEST_IN_PROGRESS'] = Gauge(
'requests_in_progress_total', 'Requests in progress',
['app_name', 'endpoint', 'method']
)

app.middlewares.insert(0, prom_middleware(app_name))
app.router.add_get("/metrics", metrics)
Python2 min read