Python3使用连接池连接163或outlook邮箱的服务器,利用asyncio实现异步IO进行协程并发,批量发送邮件
作者:互联网
最近我们的服务需要批量发送邮件,先后试过了163邮箱和outlook企业邮箱,还是后者比较稳定。写完以后把代码整理成了一个脚本,如下所示,喜欢的客官可以拿去用了,有问题欢迎流言交流。
import ssl
import uuid
import time
import json
import redis
import django
import smtplib
import logging
import traceback
from random import choice
from threading import Thread
from django.conf import settings
from django.template.base import Template, Context
from email.utils import formatdate, formataddr, make_msgid
from django.core.mail.message import SafeMIMEText, sanitize_address
from asyncio import run_coroutine_threadsafe, ensure_future, gather, get_event_loop
TEMPLATES = [
{
'BACKEND': 'django.template.backends.django.DjangoTemplates',
'DIRS': ['.'],
}
]
settings.configure(TEMPLATES=TEMPLATES)
django.setup()
class Email:
def __init__(self, title, message, sender_name, receives, charset, sender_host=''):
'''
sender_host是发件人邮箱地址
填空值时收件人看到的是发件用户的邮箱账号
填非空值时收件人看到的是填写的地址,以及由发件用户的邮箱账号代发的提示
'''
self.title = title
self.message = message
self.sender_name = sender_name
self.receives = receives
self.charset = charset
self.sender_host = sender_host
class UnknownError(smtplib.SMTPException):
def __init__(self, recipients):
self.recipients = recipients
self.args = (recipients,)
class ConnectionPool:
def __init__(self, host='', port='', send_email_user='', send_email_password='', max_connections=0, use_ssl=False, connection_lifetime=60, re_helo_time=10):
self.host = host # 邮箱服务器的地址
self.port = port # 邮箱服务器的端口号
self.send_email_user = send_email_user # 发件用户的SMPT服务账号(收件人看到的发件地址)
self.send_email_password = send_email_password # 发件用户的SMPT服务账号的密码,注意是发件邮箱配置的SMPT服务的密码,不是发件邮箱登陆密码
self.max_connections = max_connections # 一个IP地址能够同时建立的连接数(连接池的大小),163邮箱为10个,outlook邮箱为20个
self.use_ssl = use_ssl # smtp服务是否开启了ssl验证
self.connection_lifetime = connection_lifetime # 连接的存活时间,到达这个时间后就替换掉该连接,一般不用配置
self.re_helo_time = re_helo_time # 连接的心跳时间间隔,每隔一定时间和邮箱服务器helo一下保证服务器不断开连接,一般不用配置
self.connections = {}
self.context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
self.running = True
def __create(self):
while self.running:
try:
connection = smtplib.SMTP(timeout=30, host=self.host, port=self.port)
if self.use_ssl:
connection.starttls(context=self.context)
connection.login(self.send_email_user, self.send_email_password)
key = uuid.uuid1().hex
self.connections[key] = [connection, time.time() + len(self.connections.keys())]
if not self.running:
self.replace(key, connection)
break
except Exception as e:
try:
connection.quit()
except:
pass
if e.args[0] == 421: # (421, b'Too many connections')
sleep_time = choice(range(5, 11))
elif e.args[0] == 554: # (554, b'IP<*****> in blacklist')
sleep_time = 1800
else:
sleep_time = choice(range(5, 11))
time.sleep(sleep_time)
def __add(self):
thread = Thread(target=self.__create)
thread.setDaemon(True)
thread.start()
def __keep(self):
last_check_helo = time.time()
while self.running:
if time.time() - last_check_helo >= self.re_helo_time:
re_helo = True
last_check_helo = time.time()
else:
re_helo = False
connections = dict(self.connections.items())
for key, connection_info in connections.items():
connection, connection_time = connection_info
if time.time() - connection_time >= self.connection_lifetime:
self.replace(key, connection)
elif re_helo:
try:
connection.helo()
except:
self.replace(key, connection)
time.sleep(1)
def start(self):
threads = [Thread(target=self.__create) for index in range(self.max_connections)]
for thread in threads:
thread.setDaemon(True)
thread.start()
thread = Thread(target=self.__keep)
thread.setDaemon(True)
thread.start()
def close(self):
self.running = False
connections = dict(self.connections.items())
while connections:
for key, connection_info in connections.items():
connection, connection_time = connection_info
self.replace(key, connection)
connections = dict(self.connections.items())
def replace(self, key, connection):
try:
connection.quit()
except:
pass
try:
self.connections.pop(key, None)
except:
pass
if self.running:
self.__add()
def get(self):
time_now = time.time()
while self.running:
try:
connections = dict(self.connections.items())
key = choice(list(connections.keys()))
connection, connection_time = connections[key]
except (IndexError, KeyError):
if time.time() - time_now > 3:
return
else:
time.sleep(0.1)
continue
if time.time() - connection_time >= self.connection_lifetime:
if time.time() - time_now > 3:
return
else:
self.replace(key, connection)
continue
try:
connection.helo()
return key, connection
except:
self.replace(key, connection)
class EmailServer:
def __init__(self, send_step=1, emails_list_key='', redis=None, connection_pool_kwargs={}):
self.emails_list_key = emails_list_key # 邮件队列的key
self.send_step = send_step # 发送并发量大小,163邮箱每批次只能发送11封邮件,outlook邮箱为20封
self.connection_pool = ConnectionPool(**connection_pool_kwargs)
self.io_loop = get_event_loop()
self.logging = logging.getLogger()
self.redis = redis
self.running = True
def format_email(self, host_user, email, charset='utf-8', use_localtime=True):
# use_localtime 是否使用本地时间,True使用本地时间(东8区),False使用标准世界时间
from_email = sanitize_address(host_user, charset)
recipients = [sanitize_address(receive, charset) for receive in email.receives]
if not from_email or not recipients:
return ('', [], '')
subtype = 'html' if email.message.strip().endswith('</html>') else 'plain'
msg = SafeMIMEText(email.message, subtype, email.charset)
msg['Subject'] = email.title
email_sender_host = email.sender_name or host_user
msg['From'] = formataddr([email.sender_name, email_sender_host])
msg['To'] = ', '.join(map(str, email.receives))
msg['Date'] = formatdate(localtime=use_localtime)
msg['Message-ID'] = make_msgid()
return from_email, recipients, msg.as_bytes(linesep='\r\n')
async def send_one_email(self, email):
# 发送一封邮件
error_str, no_connection_error, retry_num = '', False, self.connection_pool.max_connections * 2
for index in range(retry_num):
try:
try:
key, connection = self.connection_pool.get()
if connection:
from_email, recipients, message = self.format_email(connection.user, Email(*email))
if not recipients:
return '邮件发送失败,请检查收件人信息是否正确'
if not from_email:
return '邮件发送失败,请检查发件人信息是否正确'
senderrs = connection.sendmail(from_email, recipients, message)
if senderrs:
raise UnknownError(senderrs)
return True
elif index == retry_num - 1:
no_connection_error = True
except Exception as e:
error_str = traceback.format_exc()
raise e
# 异常处理根据需要自定义
except smtplib.SMTPRecipientsRefused as e:
self.logging.error('%s\nEmail info: %s' % (error_str, email))
return '邮件发送失败,请检查收件人邮箱是否正确'
except (smtplib.SMTPSenderRefused, smtplib.SMTPDataError, AttributeError, ValueError) as e:
self.logging.error('%s\nEmail info: %s' % (error_str, email))
self.connection_pool.replace(key, connection)
except (ssl.SSLError, smtplib.SMTPServerDisconnected) as e:
self.logging.error('%s\nEmail info: %s' % (error_str, email))
self.connection_pool.replace(key, connection)
except Exception:
self.logging.error('%s\nEmail info: %s' % (error_str, email))
return '邮件发送失败,请稍后再试'
if no_connection_error:
if error_str:
error_str = '邮件连接全部失效,请检查是否被邮箱服务器加入黑名单,最后的异常:\n' + error_str
else:
error_str = '邮件连接全部失效,请检查是否被邮箱服务器加入黑名单'
self.logging.error('%s\nEmail info: %s' % (error_str, email))
return '邮件发送失败,请稍后再试'
async def send_some_emails(self, emails):
''' 发送一个批次的邮件 '''
tasks = [ensure_future(self.send_one_email(email), loop=self.io_loop) for email in emails]
results = await gather(*tasks, loop=self.io_loop, return_exceptions=True)
return results
async def send_all_emails(self, emails):
''' 按照步长分批次发送所有邮件 '''
# 把邮件按照步长分成多个批次
tasks = [ensure_future(self.send_some_emails(emails[index: index + self.send_step]), loop=self.io_loop) for index in range(0, len(emails), self.send_step)]
the_results = await gather(*tasks, loop=self.io_loop, return_exceptions=True)
results = []
for result in the_results:
results.extend(result)
return results
def run_send_email_server(self):
while self.running:
email_info = self.redis.lpop(self.emails_list_key)
if email_info:
email_info = json.loads(email_info)
result = run_coroutine_threadsafe(self.send_all_emails(email_info['emails']), self.io_loop).result()
self.redis.set(email_info['send_task_id'], json.dumps(result), 60)
else:
time.sleep(0.1)
def start(self):
# 初始化与邮箱服务器的连接和连接保活服务
self.connection_pool.start()
print('Connection pool started.')
# 启动一个协程事件循环
thread = Thread(target=self.io_loop.run_forever)
thread.setDaemon(True)
thread.start()
# 启动发送短信的服务
thread = Thread(target=self.run_send_email_server)
thread.setDaemon(True)
thread.start()
def stop(self):
self.running = False
self.connection_pool.close()
print('Connection pool closed.')
class EmailSender:
def __init__(self, emails_list_key='', redis=None):
self.emails_list_key = emails_list_key
self.redis = redis
def send_emails(self, emails):
# 发送邮件并等待结果
ok_redis_key = 'emails_ok:%s' % uuid.uuid1().hex
self.redis.rpush(self.emails_list_key, json.dumps({'send_task_id': ok_redis_key, 'emails': emails}))
while True:
result = self.redis.get(ok_redis_key)
if result:
self.redis.delete(ok_redis_key)
return json.loads(result)
else:
time.sleep(0.1)
def get_html_content(email_title='', email_charset='utf-8'):
# 格式化邮件内容,以发送html格式的邮件为例
content = '''
<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Transitional//EN" "http://www.w3.org/TR/xhtml1/DTD/xhtml1-transitional.dtd">
<html xmlns="http://www.w3.org/1999/xhtml">
<head>
<meta http-equiv="Content-Type" content="text/html; charset={{ render_data.charset }}" />
<title>{{ render_data.title }}</title>
<meta name="viewport" content="width=device-width, initial-scale=1.0"/>
<style>
table th, table td{
line-height: 1.4em;
font-size: 14px;
}
</style>
</head>
<body style="margin: 0; padding: 0;">
{{ render_data.data }}
</body>
</html>
'''
template = Template(content)
render_data = {'title': email_title, 'data': {'示例': '内容'}, 'charset': email_charset}
return template.render(Context({'render_data': render_data}))
def get_emails(email_num=2, receivers=[]):
email_charset = 'utf-8'
emails = []
for email_index in range(email_num):
email_title = '这是一封测试邮件[%s]-[%.2f]' % (email_index, time.time())
# 格式化邮件内容,以发送html格式的邮件为例
email_content = get_html_content(email_title, email_charset)
sender_host = '' # 填写则为代发模式
emails.append([email_title, email_content, '旷古的寂寞', receivers, email_charset, sender_host])
# emails.append([email_title, '测试邮件内容', '旷古的寂寞', receivers, email_charset])
return emails
emails_list_key = 'emails_list'
redis_session = redis.Redis()
receivers = ['*******@163.com', '********@qq.com']
email_server = EmailServer(send_step=20, emails_list_key=emails_list_key, redis=redis_session, connection_pool_kwargs={
'host': '******', # 邮箱服务器的地址
'port': 587, # 邮箱服务器的端口号
'send_email_user': '*****', # 发件用户的SMPT服务账号(收件人看到的发件地址)
'send_email_password': '****', # 发件用户的SMPT服务账号的密码,注意是发件邮箱配置的SMPT服务的密码,不是发件邮箱登陆密码
'max_connections': 20, # 一个IP地址能够同时建立的连接数(连接池的大小),163邮箱为10个,outlook邮箱为20个
'use_ssl': True, # smtp服务是否开启了ssl验证
# 'connection_lifetime': 60, # 连接的存活时间,到达这个时间后就替换掉该连接,一般不用配置
# 're_helo_time': 10 # 连接的心跳时间间隔,每隔一定时间和邮箱服务器helo一下保证服务器不断开连接,一般不用配置
})
email_sender = EmailSender(emails_list_key, redis_session)
if __name__ == '__main__':
email_server.start()
result = email_sender.send_emails(get_emails(email_num=2, receivers=receivers))
print(result)
email_server.stop()
标签:outlook,协程,self,emails,connection,key,time,email,Python3 来源: https://blog.csdn.net/kuanggudejimo/article/details/94588184