编程语言
首页 > 编程语言> > 积累--Python常用工具函数

积累--Python常用工具函数

作者:互联网

1. 随机生成6位验证码

import random
def get_verify_code(n=6,alpha=False):
    '''
    n: 验证码位数
    alpha:是否需要字母验证码
    '''
    s = '' # 创建字符串变量,存储生成的验证码
    for i in range(n):  # 通过for循环控制验证码位数
        num = random.randint(0,9)  # 生成随机数字0-9
        if alpha: # 需要字母验证码,关键字alpha=True
            upper_alpha = chr(random.randint(65,90))
            lower_alpha = chr(random.randint(97,122))
            num = random.choice([num,upper_alpha,lower_alpha])
        s = s + str(num)
    return s

2. 获取上个月最后一天的日期

# 1
import datetime
def last_day_of_month(any_day):
    next_month = any_day.replace(day=28) + datetime.timedelta(days=4)  # this will never fail
    return next_month - datetime.timedelta(days=next_month.day)
print last_day_of_month(datetime.date.today())

# 2
import calendar   # 导入库
import datetime

date = datetime.datetime.now()
print(date.year, date.month)
# print(year, month)
lastDay = calendar.monthrange(date.year, date.month - 1)[1]   # 指定年月的最后一天,即指定年月的整月总天数
print(lastDay)      

3. 代理IP获取

import requests
import time

headers = {
    'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/97.0.4692.71 Safari/537.36'
}


def ip_text(li, valid_IP):
    try:
        url = "https://www.baidu.com//"
        for info in li:
            ip = info['ip']
            print(f'开始校验IP地址:{ip}')
            try:
                port = info['port']
                protocol = info['protocol']
                rep = requests.get(url, proxies={protocol: f'{protocol}://{ip}:{port}'}, headers=headers, timeout=0.5)
                if rep.status_code == 200:  # 如果放回的状态码是200,那么说明该IP地址可用
                    valid_IP.append(f'{protocol}://{ip}:{port}')
                    print(f"该代理IP有效:{ip}, 响应时间为:{rep.elapsed.total_seconds()}")
                else:
                    print("该代理IP无效:" + ip)
            except:
                print("该代理IP无效:" + ip)
    except:
        print("IP测试失败")


def get_proxy():
    url = 'https://ip.jiangxianli.com/api/proxy_ips'
    params = {
        'page': 1
    }
    validIp = []
    while 1:
        data = requests.get(url=url, params=params, headers=headers).json()
        if not data['data']['data']:
            break

        info = data['data']['data']
        ip_text(info, validIp)

        params['page'] += 1

        time.sleep(2)
        break

    print(validIp)


if __name__ == '__main__':
    get_proxy()

4. Redis延迟队列的实现

class RedisDelayQueue(object):
    """Simple Queue with Redis Backend
    dq = RedisDelayQueue('delay:commtrans')
    dq.put( 5 ,{'info':'测试 5555','time': timestamp_to_datetime_str(t + 5)})

    print(dq.get())
    """

    def __init__(self, name, namespace='queue'):
        """The default connection parameters are: host='localhost', port=6379, db=0"""
        self.__db = get_redis_engine(database_name='spdb')
        self.key = '%s:%s' % (namespace, name)

    def qsize(self):
        """Return the approximate size of the queue."""
        return self.__db.zcard(self.key)

    def empty(self):
        """Return True if the queue is empty, False otherwise."""
        return self.qsize() == 0

    def rem(self, value):
        return self.__db.zrem(self.key, value)

    def get(self):
        # 获取任务,以0和当前时间为区间,返回一条记录
        items = self.__db.zrangebyscore(self.key, 0, int(time.time()), 0, 1)
        if items:
            item = items[0]
            if self.rem(item):  # 解决并发问题  如能删就让谁取走
                return json.loads(item)
        return None

    def put(self, interval, item):
        """:param interval 延时秒数"""
        # 以时间作为score,对任务队列按时间戳从小到大排序
        """Put item into the queue."""
        d = json.dumps(item)
        return self.__db.zadd(self.key, {d: int(time.time()) + int(interval)})

5. 后端代码sh文件部署

#!/bin/sh

cd /root/www/assist_api_new
echo "进行 git pull 更新,时间可能会比较长,请稍等一会,不要退出或者停止当前脚本"
git pull

echo "restart api engine "
supervisorctl restart assist_api_new

6. 前端Vue代码sh文件部署

#!/bin/sh

cd /root/sources/assist/front_new/
echo "进行 git pull 更新,时间可能会比较长,请稍等一会,不要退出或者停止当前脚本"
git pull


echo 'front_new ====================> npm build'
/usr/local/node/bin/npm run build >> service_deploy.log &&
rm -rf /root/www/assist_front_new/* && \cp -rf ./dist/* /root/www/assist_front_new/

7. 自动安装对应Chrome版本的Webdriver

from selenium import webdriver
from webdriver_manager.chrome import ChromeDriverManager

driver = webdriver.Chrome(ChromeDriverManager().install())

8. sqlAchemy 实现分页器

from math import ceil

class Pagination:
    def __init__(self, query, per_page: int = 5, page: int = 1):
        """
        初始化分页参数
        :param query: 查询对象
        :param per_page: 一页多少内容
        :param page: 第几页 1起
        """
        self.query = query
        self.per_page = per_page
        self.page = page

    @property
    def items(self):
        """
        得到分页后的内容
        :return: [model row / Model]
        """
        if self.page > self.pages:
            return []
        offset_num = (self.page - 1) * self.per_page
        return self.query.limit(self.per_page).offset(offset_num).all()

    @property
    def counts(self):
        """
        总数据量
        :return: int
        """
        return self.query.count()

    @property
    def pages(self):
        """
        总页数
        :return: int
        """
        return ceil(self.counts / self.per_page)

    @property
    def next_num(self):
        """下一页"""
        next_num = self.page + 1
        if self.pages < next_num:
            return None
        return next_num

    @property
    def prev_num(self):
        """上一页"""
        prev_num = self.page - 1
        if prev_num < 1:
            return None
        return prev_num

    def iter_pages(self, left=2, right=2):
        length = left + right + 1
        # 页数大于
        if self.page > self.pages:
            range_start = self.pages - length
            if range_start <= 0:
                range_start = 1
            return range(range_start, self.pages + 1)

        # 页数小于最少分页数
        if self.pages < length:
            return range(1, self.pages + 1)

        # 页数正常的情况下,至少大于 length 长度
        l_boundary, r_boundary = left + 1, self.pages - right + 1
        if l_boundary < self.page < r_boundary:
            return range(self.page - left, self.page + right + 1)
        if self.page <= left:
            return range(1, length + 1)
        return range(self.pages - length, self.pages + 1)

if __name__ == '__main__':
    per_page = 1
    pages = 4
    paginate = Pagination(query=[1, 2, 3, 4, 5, 6, 7, 8], per_page=per_page, page=pages)
    ret = paginate.iter_pages() # 显示在前端的页码列表
    print(ret)
    # paginate.items # 分页后的数据 []
    # paginate.pages # 共xxx页
    # paginate.page # 当前页码 从1开始
    # paginate.per_page # 一页几行
    # paginate.prev_num # 上一页页码
    # paginate.next_num # 下一页页码

9. 企业微信机器人

from copy import copy
import os
import requests
import json
import time
import datetime

from urllib3 import encode_multipart_formdata


class Robot:
    def __init__(self, wx_url, key):
        self.wx_url = wx_url
        self.key = key
        self.wx_upload_url = f'https://qyapi.weixin.qq.com/cgi-bin/webhook/upload_media?key={key}&type=file'

    def upload_file(self, file_path):
        file_name = file_path.split("/")[-1]
        with open(file_path, 'rb') as f:
            length = os.path.getsize(file_path)
            data = f.read()
        headers = {"Content-Type": "application/octet-stream"}
        params = {
            "filename": file_name,
            "filelength": length,
        }
        file_data = copy(params)
        file_data['file'] = (file_path.split('/')[-1:][0], data)
        encode_data = encode_multipart_formdata(file_data)
        file_data = encode_data[0]
        headers['Content-Type'] = encode_data[1]
        r = requests.post(self.wx_upload_url, data=file_data, headers=headers)
        print(r.text)
        media_id = r.json()['media_id']
        return media_id

    def qi_ye_wei_xin_file(self, media_id):
        headers = {"Content-Type": "text/plain"}
        data = {
            "msgtype": "file",
            "file": {
                "media_id": media_id
            }
        }
        r = requests.post(
            url=self.wx_url,
            headers=headers, json=data)
        print(r.text)

    def qi_ye_wei_xin_text(self, content):
        content = str(content, encoding='utf-8')
        headers = {"Content-Type": "application/json"}
        data = {
            "msgtype": 'text',
            "text": {
                "content": content,
                "mentioned_mobile_list": ''
            },
        }
        r = requests.post(self.wx_url, headers=headers, data=json.dumps(data, ensure_ascii=False))
        print(r.text)


if __name__ == '__main__':
    url = 'https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=8f064ec6-b9b9-4a37-a2de-b251ea81207b'
    key = '8f064ec6-b9b9-4a37-a2de-b251ea81207b'
    robot = Robot(url, key)

10. sha256withRSA验签

from base64 import decodebytes, encodebytes

from Crypto.PublicKey import RSA
from Crypto.Signature import PKCS1_v1_5
from Crypto.Hash import MD5, SHA1, SHA256
import logging
import json

sha_rsa_log = logging.getLogger('sha_rsa_log')
str2 = "{\"测试字段\":\"test\"}"

str1 = "NpA0IaNbGXe4BlTX4VPfGqBPlJYRkXHGR6D7MWYdlOfC5UI35KGfBW6eSVVh619nW0UFAsAz9XDYGRjPFFctTlbb+yuNkQOLFqcjlUI9X/AOSyNWhNXRUeByPkNbzixtXPPlAg6mr4DBGVb42H6QzhCb2lfS7U3xchk1TL1r15A="

publicKey = """MIGfMA0GCSqGSIb3DQEBAQUAA4GNADCBiQKBgQDr+PnAY5TwEDl4cCay3J/IPH74
rlUpn0zSB7yFfx1lvo21wEr5F8psnv6eDfthbGVbGhmDcW7HvCja9hEQ+0t4n6vf
mzOhFMOBCkXFeWH5GjCDgLqIUQn3YXUHGsG0A11sO0L1cGVIyIJixZThHrB8XKvR
Be6mAJHgwuv1P8ZwVwIDAQAB"""

PUBLICKEY_TEST = """MIGfMA0GCSqGSIb3DQEBAQUAA4GNADCBiQKBgQDYVSjr6IU8u7W6PuRUb05ONZJj
sJ1lNI9ofcujYehjAO4mOOrWBIMZkadhej0HdtErj3b3c11HHbshPiXcbKoHbfeF
qvsLCo7Cg6ZjhMsY1/nkR1+x+LZU2MBygW9h/hnKK8hGluIAGiJVbiQGuFxa9V48
z74KTXFbZhGkfAxl+QIDAQAB"""


def verify(data, signature, public_key):
    """ Sha256 with RSA 验签 """
    key = RSA.importKey(decodebytes(public_key.encode()))
    hash_value = SHA256.new(data.encode())

    verifier = PKCS1_v1_5.new(key)
    if verifier.verify(hash_value, decodebytes(signature.encode())):
        sha_rsa_log.info('The signature is authentic.')
        print(True)
        return True
    else:
        sha_rsa_log.info('The signature is not authentic.')
        return False


if __name__ == '__main__':
    verify(str2, str1, publicKey)

标签:__,return,--,data,self,Python,import,常用工具,def
来源: https://www.cnblogs.com/zhengjiahui/p/16131136.html