多进程编程
参考: 廖雪峰的Python多进程教程
示例
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from multiprocessing import Pool,cpu_count
def funcName(n):
print(str(n) + ':' + str(n*n)+'\n')
p = Pool(cpu_count()) # Pool的默认大小是CPU的核数
for i in range(10):
# 对Pool对象调用join()方法会等待所有子进程执行完毕,
# 调用join()之前必须先调用close(),调用close()之后就不能继续添加新的Process了。
p.apply_async(funcName,args=(i,))
print('Waiting for all subprocesses done...')
p.close()
p.join()
print('All subprocesses done.')
'''输出类似:
2:4
1:1
3:9
4:16
5:25
0:0
6:36
8:64
7:49
9:81
Waiting for all subprocesses done...
All subprocesses done.
'''
在 spyder中不能显示子进程的输出,参考:关于python 3.x:没有多处理打印输出(Spyder)
spyder设置: 运行,单文件配置,在系统终端中运行
emoji
emoji表情和对应短代码网址: emoji-cheat-sheet
在MySQL中保存emoji的问题:插入或更新前执行 SQL sql = 'SET NAMES utf8mb4;' # 为了保存emoji
打印异常信息
import traceback
try:
something_here()
except Exception as e:
#这个是输出错误类别的,如果捕捉的是通用错误,其实这个看不出来什么
print ('str(Exception):\t', str(Exception)) #输出 str(Exception): <type 'exceptions.Exception'>
#这个是输出错误的具体原因,这步可以不用加str,输出
print ('str(e):\t\t', str(e)) #输出 str(e): integer division or modulo by zero
print ('repr(e):\t', repr(e)) #输出 repr(e): ZeroDivisionError('integer division or modulo by zero',)
print ('traceback.print_exc():')
#以下两步都是输出错误的具体位置的
traceback.print_exc()
print ('traceback.format_exc():\n%s' % traceback.format_exc())
Python文件读写: f.write 写错误
gbk编码报错:
UnicodeEncodeError: 'gbk' codec can't encode character '\u2308' in position 33706: illegal multibyte sequence
改变目标文件的编码:
with open(filename, 'w',encoding='utf-8') as f:
f.write(content)
Python脚本在目录间相互调用
假设目录如下:
- dir
__init__.py
- dir1
- __init__.py
- file1.py
- func1()
- file2.py
- func2()
- dir2
- __init__.py
- file3.py
- func3()
- file4.py
- func4()
1.设置顶级目录下的 __init__.py
dir 目录下的 __init__.py
变量 __all__
指定该包下可以被导入的模块:
__all__ = ["dir1","dir2"]
2.各模块要包含顶级目录路径:
import sys
import os
parent_path = os.path.abspath('..')
if parent_path not in sys.path:
sys.path.append(parent_path )
3.各模块之间可以相互调用了
同目录间的调用, 比如在file1 中 要调用file2 的函数
import sys
import os
parent_path = os.path.abspath('..')
if parent_path not in sys.path:
sys.path.append(parent_path )
from dir1 import file2
file2.func2()
跨目录调用,比如file1要调用 func4
import sys
import os
parent_path = os.path.abspath('..')
if parent_path not in sys.path:
sys.path.append(parent_path )
from dir2 import file4
file4.func4()
python脚本批量检查更新pip安装包
pip的依赖检测方式有变, 如下更新方式对miniconda环境有破坏, 不建议使用. 思路共参考:
# -*- coding: utf-8 -*-
# pip_update_all.py
import os
def exeCmd(cmd):
r = os.popen(cmd)
text = r.read()
r.close()
return text
def get_pkg_list(text):
print(text)
text_list = text.split('\n')
if len(text_list) <4:
return []
pkg_list = []
for i in range(len(text_list)):
if i>1 and len(text_list[i])>5:
tempList = text_list[i].split(' ')
pkg_list.append(tempList[0])
return pkg_list
def update():
text = exeCmd('pip list --outdate')
pkg_list = get_pkg_list(text)
for pkg in pkg_list:
cmd = 'pip install --upgrade ' + pkg
print(cmd)
exeCmd(cmd)
if __name__=='__main__':
update()
pandas 读取excel数据
excel导出csv数据读取可能有问题, 比如损失小数位数等.
可以将xlsx格式excel的数据格式修改为文本, 导出为 xls, 用pandas读取为列表:
import csv
import pandas as pd
# 为了保证小数位数,需要设置单元格格式为文本, 可能处理不了小数位数损失的问题
# csv_file=csv.reader(open('data.csv','r',encoding='UTF-8'))
# 用pandas解决小数位数损失的问题
data = pd.read_excel('temp.xls')
content = data.values.tolist()
# 将列表中数据存入 csv文件
with open('results.csv', 'w', newline='') as csvfile:
writer = csv.writer(csvfile)
for row in content:
writer.writerow(row)
从cnbc抓取10年期美债利率(需要代理)
# -*- coding: utf-8 -*-
# us10y.py
import requests
import json5
import datetime
import time
def get_us10y():
# 获取美债10y利率
# https://www.cnbc.com/quotes/US10Y
# 必须使用代理
headers = {
'authority': 'quote.cnbc.com',
'cache-control': 'max-age=0',
'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/89.0.4389.82 Safari/537.36',
'accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.9',
}
params = (
('symbols', 'US10Y'),
('requestMethod', 'itv'),
('noform', '1'),
('partnerId', '2'),
('fund', '1'),
('exthrs', '1'),
('output', 'json'),
('events', '1'),
)
proxies = {
"http":"http://127.0.0.1:10809",
"https":"http://127.0.0.1:10809"
}
try:
# response = requests.get('https://quote.cnbc.com/quote-html-webservice/restQuote/symbolType/symbol', headers=headers, params=params)
response = requests.get('https://quote.cnbc.com/quote-html-webservice/restQuote/symbolType/symbol', headers=headers, params=params,proxies=proxies)
except Exception as e:
print(e)
return None
d = json5.loads(response.text)['FormattedQuoteResult']['FormattedQuote'][0]
# open, high,low,last,last_time
# ST是Standard Time(标准时间);DT是Daylight Time(夏令时间)。
# 返回的是美国东部标准时间 EST, 不需要处理夏令时
last_dt = datetime.datetime.strptime(d['last_time'][:19], "%Y-%m-%dT%H:%M:%S") + datetime.timedelta(hours=12)
us10y = [float(d['open'][:-1]),float(d['high'][:-1]),float(d['low'][:-1]),\
float(d['last'][:-1]),last_dt]
return us10y
if __name__=="__main__":
while True:
us10y = get_us10y()
if us10y is not None:
print('open,high,low,close,datetime:',us10y)
time.sleep(10)
Python 批量删除微博脚本
脚本的实现思路很简单: 循环抓取自己的微博页面,获取mid列表,然后根据mid列表删除对应的微博。
不要用多进程,会导致网络繁忙不能删除。
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import requests
import time
import json
def get_r_from_weibo():
#### ====> 用以下在线工具获取的代码 https://curl.trillworks.com/
###
#### <==== 用以下在线工具获取的代码 https://curl.trillworks.com/
return response
def del_by_mid(mid):
#### ====> 用以下在线工具获取的代码 https://curl.trillworks.com/
###
#### <==== 用以下在线工具获取的代码 https://curl.trillworks.com/
try:
print('===> del:',mid)
response = requests.post('https://weibo.com/aj/mblog/del', headers=headers, params=params, data=data,timeout=2)
data = json.loads(response.text)
print(response.status_code)
# print(data)
if '繁忙' in data['msg']:
print(data['msg'])
return 0
else:
print('Done:',mid)
return 1
except Exception as e:
print(e)
def get_mid_list():
r = get_r_from_weibo()
text = r.text
begin =0
stop=False
mid_list = []
while stop==False:
begin = text.find('''mid=\\\"''',begin)
if begin==-1:
stop=True
end = text.find('''\\"''',begin+6)
if end==-1:
stop=True
if stop==False:
mid_list.append(text[begin+6:end])
begin += 6
return mid_list
def del_weibo():
# 删除所有微博
sum_delete = 0
while True:
try:
mid_list = get_mid_list()
# print(mid_list)
if len(mid_list)>0:
for mid in mid_list:
try:
sum_delete += del_by_mid(mid)
except Exception as e:
print(e)
print('delete sum:',sum_delete)
except Exception as e:
print(e)
if __name__=='__main__':
del_weibo()
执行效果:
简单的python文字识别ocr脚本
假设当前目录下需要识别的图片文件为 ocr.png, 直接运行该脚本. 识别效果还可以:
"""
参考: https://www.cjavapy.com/article/807/
从
https://github.com/UB-Mannheim/tesseract/wiki
下载 tesseract-ocr-w64-setup-v5.0.0-alpha.20201127.exe (64 bit) resp, 安装到
C:\Tesseract-OCR
注意:安装过程中下载语言文件时可能需要同时打开代理
"""
import pytesseract # pip install pytesseract-ocr
from PIL import Image
pytesseract.pytesseract.tesseract_cmd = 'C:\\Tesseract-OCR\\tesseract.exe'
text = pytesseract.image_to_string(Image.open("ocr.png"), lang='chi_sim')
print(text)