PythonCode(代码片段)
     分类:DevPython     有: 0 条评论

PythonCode(代码片段)

     分类:DevPython     有: 0 条评论

本篇记录一些可以直接复用的代码片段。持续更新。
代码网站(可以找找有没有现成的):http://outofmemory.cn/code-snippet/c/python/


Flask JWT认证

#!/usr/bin/env python3
# -*-coding:utf-8-*-
# @Version: Python 3
# Flask JWT 验证

import time
from adslproxy.db import RedisClient
from itsdangerous import TimedJSONWebSignatureSerializer as Serializer
from itsdangerous import SignatureExpired, BadSignature, BadData
from flask import Flask, jsonify, request, abort

app = Flask(__name__)

# 用于存储已经保存的账户信息
redis_cli = RedisClient()

#############################################################
secret_key = 'PMF9IAnk16KVbUel'
salt = 'jR9kK3KjYDN79t6s'
access_token_expires_in = 60 * 60 * 5
refresh_token_expires_in = 60 * 60 * 6


def genTokenSeq(user):
    """
    # 生成token
    :param user: 输入用户名
    :return: 两个token
    """
    access_token_gen = Serializer(secret_key=secret_key, salt=salt, expires_in=access_token_expires_in)
    refresh_token_gen = Serializer(secret_key=secret_key, salt=salt, expires_in=refresh_token_expires_in)
    timestamp = time.time()
    access_token = access_token_gen.dumps({
        "userid": user,
        "iat": timestamp
    })
    refresh_token = refresh_token_gen.dumps({
        "userid": user,
        "iat": timestamp
    })

    data = {
        "access_token": str(access_token, 'utf-8'),
        "access_token_expire_in": access_token_expires_in,
        "refresh_token": str(refresh_token, 'utf-8'),
        "refresh_token_expire_in": refresh_token_expires_in,
    }
    return data


def validateToken(token):
    """
    # 解析token
    :param token: 输入toke
    :return: 解析结果
    """
    s = Serializer(secret_key=secret_key, salt=salt)
    try:
        data = s.loads(token)
    except SignatureExpired:
        return jsonify({'code': 401, 'message': 'toekn expired'})  # token过期
    except BadSignature as e:
        encoded_payload = e.payload
        if encoded_payload is not None:
            try:
                s.load_payload(encoded_payload)
            except BadData:
                return jsonify({'code': 401, 'message': 'token tampered'})  # token篡改
        return jsonify({'code': 401, 'message': 'badSignature of token'})  # 签名有误
    except Exception:
        return jsonify({'code': 401, 'message': 'wrong token with unknown reason'})  # 令牌错误

    if 'userid' not in data:
        return jsonify({'code': 401, 'message': 'illegal payload inside'})  # 非法载荷
    return jsonify({'code': 200, 'userid': data['userid'], 'message': f"user({data['userid']}) logged in by token."})


###############################################################
# API
@app.route('/login', methods=['POST'])
def login():
    """
    客户端发送json过来
    {
        "username":"admin",
        "password":"12345678"
    }
    """
    json_data = request.get_json()
    username = json_data.get('username')
    password = json_data.get('password')
    if username is None or password is None:
        abort(400)
    # 这里校验账户是否合法,我这里用redis简单对比;关系型数据库需要自行修改。
    # 这里使用了redis做AB数据集切换(账户密码是定时从配置文件读取并更新的),redis方法是自己封装的。
    list_key = RedisClient(list_key='ab_set').get('a_or_b')
    if RedisClient(list_key=list_key).get(username) == password:
        return genTokenSeq(username)
    else:
        abort(400)


@app.route('/', methods=['POST'])
def index():
    """
    客户端发送json过来
    {
        "token":"token-str",
    }
    """
    json_data = request.get_json()
    print(json_data)
    token = ''
    if json_data:
        token = json_data.get('token')
    else:
        abort(400)
    if token:
        data = validateToken(token)
        return data, 200
    else:
        abort(400)


if __name__ == '__main__':
    app.run(host='0.0.0.0', port=5000, debug=True)

参考资料:
https://mp.weixin.qq.com/s/S32HM0yvuy8ptSKYjsKu6A


伪造IP

简单的伪造headers;伪造:HTTP_X_FORWARDED_FOR;严格反扒情况下是检测REMOTE_ADDR,这个是客户端无法伪造的(可能是用户或者是代理服务器的IP);详情见扩展阅读。

import socket
import struct
import random
import requests


def createHeader():
    ip = socket.inet_ntoa(struct.pack('>I', random.randint(1, 0xffffffff)))
    headers = {
        'Content-Type': 'application/x-www-form-urlencoded',
        'User-Agent': 'Mozilla/5.0 (compatible; MSIE 9.0; Windows NT 6.1; Trident/5.0',
        'CLIENT-IP': ip,
        'X-FORWARDED-FOR': ip
    }
    return headers


if __name__ == '__main__':
    headers = createHeader()
    html = requests.post('http://members.3322.org/dyndns/getip', headers=headers)
    print(html.text)

扩展阅读:
https://gargantuax.github.io/blog/2017-02/%E4%BD%A0%E7%9C%9F%E7%9A%84%E4%BA%86%E8%A7%A3ip%E5%90%97php%E5%A6%82%E4%BD%95%E4%B8%A5%E6%A0%BC%E8%8E%B7%E5%8F%96%E7%9C%9F%E5%AE%9E%E7%94%A8%E6%88%B7ip/
https://www.urlteam.org/2016/08/%E4%BB%BF%E4%BA%BA%E5%8C%96%E4%BC%AA%E9%80%A0%E8%AF%B7%E6%B1%82%E5%A4%B4/


PTL

打开在线图片之后关闭

import requests
from io import BytesIO
import psutil

img_url = 'https://www.leolan.top/usr/themes/default/images/thumbs/3.jpg'
image = Image.open(BytesIO(requests.get(img_url).content))
# 这里使用了psutil库,先记录已有进程,然后show()图片,之后再找出进程杀死。
process_list = []
for proc in psutil.process_iter():
    process_list.append(proc)

# 展示图片
image.show()

# 杀死进程
for proc in psutil.process_iter():
    if not proc in process_list:
       proc.kill()

参考资料:
https://blog.csdn.net/ygfrancois/article/details/84781087


通知、消息

微信小号、群消息

微信通知需要准备一个小号,而且容易被封。不方便的一点是需要扫码登陆。
参考:https://wxpy.readthedocs.io/zh/latest/index.html


企业微信

首先管理员登陆企业微信后台创建应用,设置通知范围等,并记录必要信息。

#!/usr/bin/python
# -*- coding: utf-8 -*-

import sys
import json
import requests
import datetime

# 接收信息的用户列表(即用户的账户;也可设置成群组,见参考资料)
touser = 'userid1|userid2'
# 企业ID
corpid = 'xxxxxx'
# 密钥
corpsecret = 'xxxxxxx'
# 应用ID(换成自己的)
AgentId = 100002


def get_token():
    """
    获取Token
    """
    result = requests.get(
        'https://qyapi.weixin.qq.com/cgi-bin/gettoken?corpid={}&corpsecret={}'.format(corpid, corpsecret), timeout=5)
    access_token = json.loads(result.text)['access_token']
    return access_token


def push_msg(access_token):
    """
    接收shell参数:[部署应用名] [git更新记录]
    :param access_token:
    :return:
    """
    now_time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
    if sys.argv[2] == '':
        msg = "{}: {} 更新失败,请再次提交代码!".format(sys.argv[1], now_time)
    else:
        msg = "{}: {} 更新:{}".format(sys.argv[1], now_time, sys.argv[2])
    # 构造请求消息体
    msg_body = {"touser": touser, "msgtype": "text", "agentid": AgentId, "text": {"content": msg}, "safe": 0}
    result = requests.post('https://qyapi.weixin.qq.com/cgi-bin/message/send?access_token={}'.format(access_token),
                           timeout=5, data=json.dumps(msg_body))
    print(result.text)


if __name__ == '__main__':
    access_token = get_token()
    push_msg(access_token)


# 后续步骤
如果微信绑定了企业微信,别忘了在企业详情页打开“接收应用信息”。

参考资料:
https://work.weixin.qq.com/api/doc#90000/90003/90487
https://open.work.weixin.qq.com/api/doc#90000/90135/90236
https://work.weixin.qq.com/api/doc#90000/90135/90236/%E6%96%87%E6%9C%AC%E6%B6%88%E6%81%AF


公众号

有个人号和服务号之分,个人号到2019年基本没啥用了,接口权限都没有。
以下主要以服务号的方式推送信息。
1、第三方平台(如:Server 酱)
https://hoxis.github.io/python-serverchan.html
http://sc.ftqq.com

2、认证的服务号
认证的服务号可以在后台权限中找到文档。处理方式类似企业微信应用。


钉钉

官方提供的文档就很强大了。

参考资料:
https://github.com/typ431127/zabbix_dingding
https://github.com/magician000/DingTalkRobot-python
https://github.com/Twotiger/dingding
https://github.com/zhuifengshen/DingtalkChatbot


QQ

网页版QQ于2019年1月停止服务,目前只能用一些其他的方式,模拟登陆等等。
酷Q:https://cqhttp.cc/docs/4.10/


邮箱

#!/usr/bin/python
# -*- coding: utf-8 -*-

import smtplib
from email.message import EmailMessage

# 账户信息
EMAIL_HOST = 'smtp.qq.com'
EMAIL_PORT = 465  #ssl端口,建议使用ssl
EMAIL_HOST_USER = '842632422@qq.com'
EMAIL_HOST_PASSWORD = '[QQ 邮箱此处填写授权码]'

# 邮件主题
EMAIL_SUBJECT = '消息提醒'
# 发件人
EMAIL_FROM = '842632422@qq.com'
# 收件人
EMAIL_TO = 'xxxxx@qq.com'
# 消息内容
EMAIL_CONTENT = '这里是内容,这是一封测试邮件'


def send_msg(content):
    msg = EmailMessage()
    msg['Subject'] = EMAIL_SUBJECT
    msg['From'] = EMAIL_FROM
    msg['To'] = EMAIL_TO
    msg.set_content(content)

    # 不使用ssl把SMTP_SSL改为SMTP即可
    with smtplib.SMTP_SSL(EMAIL_HOST, EMAIL_PORT) as con:
        con.login(EMAIL_HOST_USER, EMAIL_HOST_PASSWORD)
        con.send_message(msg)
        print("发送成功")


if __name__ == '__main__':
    send_msg(EMAIL_CONTENT)

参考资料:https://www.runoob.com/python/python-email.html


手机短信

https://www.twilio.com/
这是收费的短信服务,支持多个国家,注册送15美金,大概能免费发500条。

# Download the helper library from https://www.twilio.com/docs/python/install
from twilio.rest import Client

# Your Account Sid and Auth Token from twilio.com/console
account_sid = 'ACXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX'
auth_token = 'your_auth_token'
client = Client(account_sid, auth_token)

message = client.messages \
                .create(
                     body="Join Earth's mightiest heroes. Like Kevin Bacon.",
                     from_='+15017122661',
                     to='+15558675310'
                 )

print(message.sid)

参考:
https://mp.weixin.qq.com/s/auRrvR6EVtVAdptEymMRUw
https://www.twilio.com/docs/sms/quickstart/python
https://zhuanlan.zhihu.com/p/46992160


控制函数执行时间

import signal
import time


def set_runing_time(num):
    """通过信号控制执行时间"""

    def wrap(func):
        def handle(signum, frame):  # 收到信号 SIGALRM 后的回调函数,第一个参数是信号的数字,第二个参数是the interrupted stack frame.
            raise RuntimeError

        def to_do(*args):
            try:
                signal.signal(signal.SIGALRM, handle)  # 设置信号和回调函数
                signal.alarm(num)  # 设置 num 秒的闹钟
                print('start alarm signal.')
                r = func(*args)
                print('close alarm signal.')
                signal.alarm(0)  # 关闭闹钟
                return r
            except RuntimeError as e:
                return "超时啦"

        return to_do

    return wrap


@set_runing_time(5)  # 限时 5 秒超时
def my_func():  # 要执行的函数
    while True:
        print('@@@@@@@@@@@@@@')
        time.sleep(0.5)
        print('##############')


if __name__ == '__main__':
    a = time.time() * 1000
    s = my_func()
    b = time.time() * 1000
    print('执行时间(毫秒):', int(b - a))

参考资料:
https://www.cnblogs.com/chenxiyuxiao/p/10864180.html
https://www.cnblogs.com/lyxdw/p/10033118.html


爬虫URL管理

# 通过set管理已爬和未爬的URL
class UrlManager(object):

    def __init__(self):
        self.new_urls = set()
        self.old_urls = set()

    # 判断待爬取url是否在容器中
    def add_new_url(self, url):
        if url is None:
            return
        if url not in self.new_urls and url not in self.old_urls:
            self.new_urls.add(url)

    # 添加新url到待爬取集合中
    def add_new_urls(self, urls):
        if urls is None or len(urls) == 0:
            return
        for url in urls:
            self.add_new_url(url)

    # 判断是否还有待爬取的url
    def has_new_url(self):

        return len(self.new_urls) != 0

    # 获取待爬取url并将url从待爬取移动到已爬取
    def get_new_url(self):
        new_url = self.new_urls.pop()
        self.old_urls.add(new_url)
        return new_url

公众号:Crossin的编程教室(编程实例)

https://crossincode.com/oj/practice_list/

django通过celery实现发送邮件的异步执行

http://www.imooc.com/article/16164

python的多路复用实现聊天群

http://www.imooc.com/article/39675

超常用的Python代码片段

https://mp.weixin.qq.com/s/QDX1J21RSaa609jUnmdgQw


Flask生成静态html

from flask import render_template
import flask

app = flask.Flask('my app')

document = {
            'title': 'test',
            'keyword': 'hahahahha'
        }

if __name__ == "__main__":
    with app.app_context():
        rendered = render_template('index.html', datas=document)
        print(rendered)

MongoDB操作

import pymongo
# 连接数据库(有、无密码)
connection = pymongo.MongoClient('127.0.0.1', 27017)

connection = pymongo.MongoClient(host=MONGO_HOST,
                                 port=MONGO_PORT,
                                 username=MONGO_USERNAME,
                                 password=MONGO_PASSWORD, 
                                 authMechanism='SCRAM-SHA-1',
                                 authSource=MONGO_DB)
col = connection[MONGO_DB][MONGO_COLLECTION]

# 插入数据
def save_to_mongodb(link_data):
    # 连接数据库col
    if col.insert(link_data):
        print("存储成功", link_data)
        return True
    return False

参考:
https://blog.csdn.net/qq_34162294/article/details/73441559
http://www.runoob.com/python3/python-mongodb.html


异步数据库

https://www.jianshu.com/p/6d6fa94a01ef


Python 3 类型转换

https://mp.weixin.qq.com/s/H0uZCU9-j-RMhmaPay-XUw


地址拼接

# 地址形式:http://www.leolan.top/?paged=[页码]
# n是总页数;url是首页网址
def Address_stitching(url, n):
    url_list = []
    pagenumber = 1
    while pagenumber <= n:
        urlPart = url + '/?paged=%s' % (pagenumber)
        pagenumber += 1
        url_list.append(urlPart)
    return url_list

获取页面最大页数

动态加载,总数写在js里的(如CSDN)

# 获取最大页数,url是用户首页网址
def csdn_last_page(url):
    '''
    # 每页加载20篇,855一共是42.75页,所以有43页。
    # 以下是html文件里截取出来的。
    <script>
    var currentPage = 1;
    var baseUrl = 'https://blog.csdn.net/chszs/article/list' ;
    var pageSize = 20 ;
    var listTotal = 855 ;
    var pageQueryStr = '';
    function getAllUrl(page) {
        return baseUrl + "/" + page + pageQueryStr;
    }
    </script>
    '''

    response = requests.get(url)
    response_text = response.text
    response.encoding = 'UTF-8'
    pattern1 = re.compile('<script>.*?pageSize.*?=.*?(\d+).*?;', re.S)
    pageSize = re.findall(pattern1, response_text)[0]                      #每页篇数
    pattern2 = re.compile('<script>.*?listTotal.*?=.*?(\d+).*?;', re.S)
    listTotal = re.findall(pattern2, response_text)[0]                     #总篇数
    if int(listTotal) % int(pageSize) == 0:   #整除判断
        last_page_num = int(listTotal) // int(pageSize)
    else:
        last_page_num = int(listTotal) // int(pageSize) + 1
    return last_page_num

直接从源文件中提取页数

def hexo_next_last_page(url):
    response = requests.get(url)
    response_text = response.text
    soup = BeautifulSoup(response_text, "lxml")
    #选择器选择的结果是一个list,直接取出最后一个元素就包含了最大页数
    m = soup.select('[class="page-number"]')[-1]
    last_page_num = m.get_text()
    return int(last_page_num)

对两个列表同时取出对应值(转字典)

# zip函数可以多个对象中的元素打包为元组:http://www.runoob.com/python3/python3-func-zip.html
for key, value in zip(title_list, url_list):
    print(key, value)

判断网站状态

def web_status(url):
    response = requests.get(url)
    status_code = response.status_code
    if status_code == 200:
        return url
    else:
        with open('Error_Web_Url.txt', "a+") as f:
            f.write(datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S') + ' ')
            f.write(url)
            f.write('\r\n')
            print("网站访问异常,失败网址已记录在:Error_Web_Url.txt")
            return False

代理

从开源免费代理池获取代理

# 项目地址:https://github.com/fengzhizi715/ProxyPool
def get_proxy():
    response = requests.get('http://47.97.7.119:8080/proxypool/proxys/1')
    data = json.loads(response.text).get('data')[0]
    get_ip = data.get('proxyAddress')
    get_port = data.get('proxyPort')
    get_type = data.get('proxyType')
    proxy = {str(get_type) : str(get_type) + '://' + str(get_ip) + ':' + str(get_port)}
    return proxy

随机从mongoDB随机取出一个代理IP

def get_random_proxy_ip():
    # 随机获取一个IP,这里不能用DB.py中自己写的class
    connection = pymongo.MongoClient(MongoDB_IP, MongoDB_Port)
    cdb = connection.BlogWorm_DB.ProxyIP
    items = cdb.find()
    length = items.count()
    ind = random.randint(0, length-1)
    ip = items[ind]['http'].replace('\n','')
    print(ip)
    return ip

从ProxyPool取出ip拼接为可用的格式

def get_proxy():
    try:
        response = requests.get(PROXY_POOL_URL)
        if response.status_code == 200:
            #print(response.text)
            proxy = {'http': 'http://' + response.text}
            try:
                # 获取的IP再次测试是否可用
                response = requests.get("http://ip.chinaz.com/getip.aspx", proxies=proxy, timeout=5)
                return proxy         # 代理可用,返回代理
            except Exception:
                return get_proxy()   # 如果代理无效重新获取一个
    except ConnectionError:
        print("代理池好像挂掉了!!!")
        return None                  # 连接失败说明代理池挂掉了

多线程

from multiprocessing import Pool

if __name__ == '__main__':
    groups = [index * 1 for index in range(ProxyPages_Start, ProxyPages_End)]
    pool = Pool()
    pool.map(main, groups)

模拟登录

selenium

# 小米商城登录
from selenium import webdriver

# 登录地址
login_url = 'https://account.xiaomi.com/pass/serviceLogin'
#登录成功后的地址
login_sec = 'https://account.xiaomi.com/pass/auth/security/home'
# 账户、密码
username = 'xxxxxx@qq.com'
passwd = '123456'


# 登录
def login(name ,pwd):
    browser.get(login_url)  #打开登录网址
    time.sleep(1)
    # 如果找不到标签ID,可以使用其他方法来确定元素位置,find_element_by_class_name
    browser.find_element_by_id("username").send_keys(name)   #利用账号标签的ID,确定位置并send信息
    browser.find_element_by_id("pwd").send_keys(pwd)         #利用密码标签的ID,确定位置并send信息
    try:
        browser.find_element_by_id("login-button").click()   #利用登录按钮的ID,确定位置并点击
        # 循环等待登录,登录成功,跳出循环
        while True:
            if browser.current_url[:50] != login_sec:
                time.sleep(1)
            else:
                # logbticket.info("登陆成功...")
                print('登录成功!')
                break
        browser.refresh()  # 刷新页面
    except Exception as e:
        print(e)

gzip压缩

# 获取网页压缩过的源码
import urllib2, httplib
request = urllib2.Request('http://xxxx.com')
request.add_header('Accept-encoding', 'gzip')        1
opener = urllib2.build_opener()
f = opener.open(request)

# 解压缩
import StringIO
import gzip

compresseddata = f.read() 
compressedstream = StringIO.StringIO(compresseddata)
gzipper = gzip.GzipFile(fileobj=compressedstream) 
print(gzipper.read())
(●゚ω゚●)