积累--Python常用工具函数
作者:互联网
1. 随机生成6位验证码
import random
def get_verify_code(n=6,alpha=False):
'''
n: 验证码位数
alpha:是否需要字母验证码
'''
s = '' # 创建字符串变量,存储生成的验证码
for i in range(n): # 通过for循环控制验证码位数
num = random.randint(0,9) # 生成随机数字0-9
if alpha: # 需要字母验证码,关键字alpha=True
upper_alpha = chr(random.randint(65,90))
lower_alpha = chr(random.randint(97,122))
num = random.choice([num,upper_alpha,lower_alpha])
s = s + str(num)
return s
2. 获取上个月最后一天的日期
# 1
import datetime
def last_day_of_month(any_day):
next_month = any_day.replace(day=28) + datetime.timedelta(days=4) # this will never fail
return next_month - datetime.timedelta(days=next_month.day)
print last_day_of_month(datetime.date.today())
# 2
import calendar # 导入库
import datetime
date = datetime.datetime.now()
print(date.year, date.month)
# print(year, month)
lastDay = calendar.monthrange(date.year, date.month - 1)[1] # 指定年月的最后一天,即指定年月的整月总天数
print(lastDay)
3. 代理IP获取
import requests
import time
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/97.0.4692.71 Safari/537.36'
}
def ip_text(li, valid_IP):
try:
url = "https://www.baidu.com//"
for info in li:
ip = info['ip']
print(f'开始校验IP地址:{ip}')
try:
port = info['port']
protocol = info['protocol']
rep = requests.get(url, proxies={protocol: f'{protocol}://{ip}:{port}'}, headers=headers, timeout=0.5)
if rep.status_code == 200: # 如果放回的状态码是200,那么说明该IP地址可用
valid_IP.append(f'{protocol}://{ip}:{port}')
print(f"该代理IP有效:{ip}, 响应时间为:{rep.elapsed.total_seconds()}")
else:
print("该代理IP无效:" + ip)
except:
print("该代理IP无效:" + ip)
except:
print("IP测试失败")
def get_proxy():
url = 'https://ip.jiangxianli.com/api/proxy_ips'
params = {
'page': 1
}
validIp = []
while 1:
data = requests.get(url=url, params=params, headers=headers).json()
if not data['data']['data']:
break
info = data['data']['data']
ip_text(info, validIp)
params['page'] += 1
time.sleep(2)
break
print(validIp)
if __name__ == '__main__':
get_proxy()
4. Redis延迟队列的实现
class RedisDelayQueue(object):
"""Simple Queue with Redis Backend
dq = RedisDelayQueue('delay:commtrans')
dq.put( 5 ,{'info':'测试 5555','time': timestamp_to_datetime_str(t + 5)})
print(dq.get())
"""
def __init__(self, name, namespace='queue'):
"""The default connection parameters are: host='localhost', port=6379, db=0"""
self.__db = get_redis_engine(database_name='spdb')
self.key = '%s:%s' % (namespace, name)
def qsize(self):
"""Return the approximate size of the queue."""
return self.__db.zcard(self.key)
def empty(self):
"""Return True if the queue is empty, False otherwise."""
return self.qsize() == 0
def rem(self, value):
return self.__db.zrem(self.key, value)
def get(self):
# 获取任务,以0和当前时间为区间,返回一条记录
items = self.__db.zrangebyscore(self.key, 0, int(time.time()), 0, 1)
if items:
item = items[0]
if self.rem(item): # 解决并发问题 如能删就让谁取走
return json.loads(item)
return None
def put(self, interval, item):
""":param interval 延时秒数"""
# 以时间作为score,对任务队列按时间戳从小到大排序
"""Put item into the queue."""
d = json.dumps(item)
return self.__db.zadd(self.key, {d: int(time.time()) + int(interval)})
5. 后端代码sh文件部署
#!/bin/sh
cd /root/www/assist_api_new
echo "进行 git pull 更新,时间可能会比较长,请稍等一会,不要退出或者停止当前脚本"
git pull
echo "restart api engine "
supervisorctl restart assist_api_new
6. 前端Vue代码sh文件部署
#!/bin/sh
cd /root/sources/assist/front_new/
echo "进行 git pull 更新,时间可能会比较长,请稍等一会,不要退出或者停止当前脚本"
git pull
echo 'front_new ====================> npm build'
/usr/local/node/bin/npm run build >> service_deploy.log &&
rm -rf /root/www/assist_front_new/* && \cp -rf ./dist/* /root/www/assist_front_new/
7. 自动安装对应Chrome版本的Webdriver
from selenium import webdriver
from webdriver_manager.chrome import ChromeDriverManager
driver = webdriver.Chrome(ChromeDriverManager().install())
8. sqlAchemy 实现分页器
from math import ceil
class Pagination:
def __init__(self, query, per_page: int = 5, page: int = 1):
"""
初始化分页参数
:param query: 查询对象
:param per_page: 一页多少内容
:param page: 第几页 1起
"""
self.query = query
self.per_page = per_page
self.page = page
@property
def items(self):
"""
得到分页后的内容
:return: [model row / Model]
"""
if self.page > self.pages:
return []
offset_num = (self.page - 1) * self.per_page
return self.query.limit(self.per_page).offset(offset_num).all()
@property
def counts(self):
"""
总数据量
:return: int
"""
return self.query.count()
@property
def pages(self):
"""
总页数
:return: int
"""
return ceil(self.counts / self.per_page)
@property
def next_num(self):
"""下一页"""
next_num = self.page + 1
if self.pages < next_num:
return None
return next_num
@property
def prev_num(self):
"""上一页"""
prev_num = self.page - 1
if prev_num < 1:
return None
return prev_num
def iter_pages(self, left=2, right=2):
length = left + right + 1
# 页数大于
if self.page > self.pages:
range_start = self.pages - length
if range_start <= 0:
range_start = 1
return range(range_start, self.pages + 1)
# 页数小于最少分页数
if self.pages < length:
return range(1, self.pages + 1)
# 页数正常的情况下,至少大于 length 长度
l_boundary, r_boundary = left + 1, self.pages - right + 1
if l_boundary < self.page < r_boundary:
return range(self.page - left, self.page + right + 1)
if self.page <= left:
return range(1, length + 1)
return range(self.pages - length, self.pages + 1)
if __name__ == '__main__':
per_page = 1
pages = 4
paginate = Pagination(query=[1, 2, 3, 4, 5, 6, 7, 8], per_page=per_page, page=pages)
ret = paginate.iter_pages() # 显示在前端的页码列表
print(ret)
# paginate.items # 分页后的数据 []
# paginate.pages # 共xxx页
# paginate.page # 当前页码 从1开始
# paginate.per_page # 一页几行
# paginate.prev_num # 上一页页码
# paginate.next_num # 下一页页码
9. 企业微信机器人
from copy import copy
import os
import requests
import json
import time
import datetime
from urllib3 import encode_multipart_formdata
class Robot:
def __init__(self, wx_url, key):
self.wx_url = wx_url
self.key = key
self.wx_upload_url = f'https://qyapi.weixin.qq.com/cgi-bin/webhook/upload_media?key={key}&type=file'
def upload_file(self, file_path):
file_name = file_path.split("/")[-1]
with open(file_path, 'rb') as f:
length = os.path.getsize(file_path)
data = f.read()
headers = {"Content-Type": "application/octet-stream"}
params = {
"filename": file_name,
"filelength": length,
}
file_data = copy(params)
file_data['file'] = (file_path.split('/')[-1:][0], data)
encode_data = encode_multipart_formdata(file_data)
file_data = encode_data[0]
headers['Content-Type'] = encode_data[1]
r = requests.post(self.wx_upload_url, data=file_data, headers=headers)
print(r.text)
media_id = r.json()['media_id']
return media_id
def qi_ye_wei_xin_file(self, media_id):
headers = {"Content-Type": "text/plain"}
data = {
"msgtype": "file",
"file": {
"media_id": media_id
}
}
r = requests.post(
url=self.wx_url,
headers=headers, json=data)
print(r.text)
def qi_ye_wei_xin_text(self, content):
content = str(content, encoding='utf-8')
headers = {"Content-Type": "application/json"}
data = {
"msgtype": 'text',
"text": {
"content": content,
"mentioned_mobile_list": ''
},
}
r = requests.post(self.wx_url, headers=headers, data=json.dumps(data, ensure_ascii=False))
print(r.text)
if __name__ == '__main__':
url = 'https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=8f064ec6-b9b9-4a37-a2de-b251ea81207b'
key = '8f064ec6-b9b9-4a37-a2de-b251ea81207b'
robot = Robot(url, key)
10. sha256withRSA验签
from base64 import decodebytes, encodebytes
from Crypto.PublicKey import RSA
from Crypto.Signature import PKCS1_v1_5
from Crypto.Hash import MD5, SHA1, SHA256
import logging
import json
sha_rsa_log = logging.getLogger('sha_rsa_log')
str2 = "{\"测试字段\":\"test\"}"
str1 = "NpA0IaNbGXe4BlTX4VPfGqBPlJYRkXHGR6D7MWYdlOfC5UI35KGfBW6eSVVh619nW0UFAsAz9XDYGRjPFFctTlbb+yuNkQOLFqcjlUI9X/AOSyNWhNXRUeByPkNbzixtXPPlAg6mr4DBGVb42H6QzhCb2lfS7U3xchk1TL1r15A="
publicKey = """MIGfMA0GCSqGSIb3DQEBAQUAA4GNADCBiQKBgQDr+PnAY5TwEDl4cCay3J/IPH74
rlUpn0zSB7yFfx1lvo21wEr5F8psnv6eDfthbGVbGhmDcW7HvCja9hEQ+0t4n6vf
mzOhFMOBCkXFeWH5GjCDgLqIUQn3YXUHGsG0A11sO0L1cGVIyIJixZThHrB8XKvR
Be6mAJHgwuv1P8ZwVwIDAQAB"""
PUBLICKEY_TEST = """MIGfMA0GCSqGSIb3DQEBAQUAA4GNADCBiQKBgQDYVSjr6IU8u7W6PuRUb05ONZJj
sJ1lNI9ofcujYehjAO4mOOrWBIMZkadhej0HdtErj3b3c11HHbshPiXcbKoHbfeF
qvsLCo7Cg6ZjhMsY1/nkR1+x+LZU2MBygW9h/hnKK8hGluIAGiJVbiQGuFxa9V48
z74KTXFbZhGkfAxl+QIDAQAB"""
def verify(data, signature, public_key):
""" Sha256 with RSA 验签 """
key = RSA.importKey(decodebytes(public_key.encode()))
hash_value = SHA256.new(data.encode())
verifier = PKCS1_v1_5.new(key)
if verifier.verify(hash_value, decodebytes(signature.encode())):
sha_rsa_log.info('The signature is authentic.')
print(True)
return True
else:
sha_rsa_log.info('The signature is not authentic.')
return False
if __name__ == '__main__':
verify(str2, str1, publicKey)
标签:__,return,--,data,self,Python,import,常用工具,def 来源: https://www.cnblogs.com/zhengjiahui/p/16131136.html