邮件推送到企业微信
将未读邮件推送到企业微信(每10秒获取一次新邮件,我是自建邮箱没有速率限制,如果你是其他邮箱不知道要不要考虑速率限制)
目录
[TOC]
效果:
说明:
需要配置的有:
USERS_CONFIG
:
- imap_server: imap服务器地址
- email_user: 邮箱
- email_pass: 邮箱密码
- wechat: 企业微信网页--通讯录--姓名--账号
CORP_ID
: 企业微信网页--我的企业--企业ID
CORP_SECRET
: 企业微信网页--应用管理--自建--<应用名称>--Secret--查看--去手机企业微信app查看
AGENT_ID
:企业微信网页--应用管理--自建--<应用名称>--Secret--AgentId
可信ip(2022年6月20日起,新开启的通讯录同步和新创建的自建应用需配置企业可信IP):
Nginx:
location /cgi-bin/gettoken {
proxy_pass https://qyapi.weixin.qq.com;
}
location /cgi-bin/message/send {
proxy_pass https://qyapi.weixin.qq.com;
}
location /cgi-bin/menu/create {
proxy_pass https://qyapi.weixin.qq.com;
}
不配置这三个也行,但至少可信域名第一次nginx要配置一个他指定的东西验证一下,通过验证就行,然后把要跑脚本的服务器的ip添加到可信ip
代码:
import imaplib
import email
from email.header import decode_header
import time
import requests
import json
import logging
logging.basicConfig(
filename="mail_push.log",
level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(message)s"
)
# 企业微信推送类
class WeChatPush:
def __init__(self, corp_id, corp_secret, agent_id):
self.corp_id = corp_id
self.corp_secret = corp_secret
self.agent_id = agent_id
self.token_url = f"https://qyapi.weixin.qq.com/cgi-bin/gettoken?corpid={self.corp_id}&corpsecret={self.corp_secret}"
self.token = self.get_access_token()
def get_access_token(self):
response = requests.get(self.token_url)
if response.status_code == 200:
data = response.json()
if data.get("errcode") == 0:
return data.get("access_token")
raise Exception(f"获取 access_token 失败: {response.text}")
def send_message(self, to_users, content):
url = f"https://qyapi.weixin.qq.com/cgi-bin/message/send?access_token={self.token}"
headers = {"Content-Type": "application/json"}
payload = {
"touser": "|".join(to_users),
"msgtype": "text",
"agentid": self.agent_id,
"text": {"content": content},
"safe": 0
}
try:
response = requests.post(url, headers=headers, data=json.dumps(payload))
if response.status_code == 200:
data = response.json()
if data.get("errcode") == 0:
logging.info(f"消息发送成功: {to_users}, 内容:{content}")
else:
logging.error(f"消息发送失败: {data}")
else:
logging.error(f"HTTP请求失败: {response.text}")
except Exception as e:
logging.error(f"发送消息时发生错误: {e}")
# 邮件监控类
class MailMonitor:
def __init__(self, imap_server, email_user, email_pass):
self.imap_server = imap_server
self.email_user = email_user
self.email_pass = email_pass
self.mail = self.connect_to_mail()
def connect_to_mail(self):
try:
mail = imaplib.IMAP4_SSL(self.imap_server)
mail.login(self.email_user, self.email_pass)
return mail
except Exception as e:
logging.error(f"连接邮箱失败: {e}")
exit()
def ensure_connected(self):
try:
self.mail.noop()
except imaplib.IMAP4.abort:
logging.warning("IMAP 连接中断,重新连接中...")
self.mail = self.connect_to_mail()
except Exception as e:
logging.error(f"IMAP 未知错误,尝试重新连接: {e}")
self.mail = self.connect_to_mail()
def check_new_emails(self):
try:
self.mail.select("inbox")
status, messages = self.mail.search(None, 'UNSEEN')
if status != "OK":
logging.warning("未能获取邮件列表")
return []
return messages[0].split()
except imaplib.IMAP4.abort as e:
logging.error(f"IMAP4.abort 错误: {e}")
self.mail = self.connect_to_mail()
return []
except Exception as e:
logging.error(f"未知错误: {e}")
return []
def fetch_email(self, mail_id):
try:
status, msg_data = self.mail.fetch(mail_id, '(RFC822)')
if status != "OK":
logging.warning(f"无法获取邮件 ID {mail_id}")
return None
for response_part in msg_data:
if isinstance(response_part, tuple):
msg = email.message_from_bytes(response_part[1])
return msg
except Exception as e:
logging.error(f"获取邮件时发生错误: {e}")
return None
@staticmethod
def decode_header_value(value):
decoded_parts = decode_header(value)
decoded_string = ""
for part, encoding in decoded_parts:
if isinstance(part, bytes):
decoded_string += part.decode(encoding if encoding else "utf-8", errors="ignore")
else:
decoded_string += part
return decoded_string
@staticmethod
def parse_email(msg):
subject = MailMonitor.decode_header_value(msg["Subject"])
from_raw = msg.get("From")
from_decoded = MailMonitor.decode_header_value(from_raw)
content = MailMonitor.extract_email_body(msg)
return subject, from_decoded, content
@staticmethod
def extract_email_body(msg):
if msg.is_multipart():
for part in msg.walk():
content_type = part.get_content_type()
content_disposition = str(part.get("Content-Disposition"))
if content_type == "text/plain" and "attachment" not in content_disposition:
return part.get_payload(decode=True).decode(errors="ignore")
elif content_type == "text/html" and "attachment" not in content_disposition:
return part.get_payload(decode=True).decode(errors="ignore")
else:
if msg.get_content_type() == "text/plain":
return msg.get_payload(decode=True).decode(errors="ignore")
return "(无正文内容)"
@staticmethod
def split_content(content, max_bytes=20):
# 分割内容,确保每部分不超过 max_bytes 字节
encoded_content = content.encode('utf-8')
parts = []
for i in range(0, len(encoded_content), max_bytes):
part = encoded_content[i:i+max_bytes].decode('utf-8', errors='ignore')
parts.append(part)
return parts
# 配置多个邮箱和接收者
USERS_CONFIG = [
{
"email": {
"imap_server": "mail.xxx.com",
"email_user": "[email protected]",
"email_pass": "password1"
},
"wechat": ["XingMing1", "XingMing2"]
},
{
"email": {
"imap_server": "mail.xxx.com",
"email_user": "[email protected]",
"email_pass": "password2"
},
"wechat": ["XingMing3"]
},
]
# 企业微信配置
CORP_ID = "ww6d039b3bced14855" # 企业ID
CORP_SECRET = "TfldX0q6VFa_6hFus-pMbxKxi4mTq3O8e0EBlq0JQGk" # 应用Secret
AGENT_ID = "1000003" # 应用ID
# 初始化微信推送类
wechat_push = WeChatPush(CORP_ID, CORP_SECRET, AGENT_ID)
# 初始化邮件监控服务
monitors = [
{
"monitor": MailMonitor(config["email"]["imap_server"],
config["email"]["email_user"],
config["email"]["email_pass"]),
"receivers": config["wechat"]
}
for config in USERS_CONFIG
]
logging.info("开始监控多个用户的新邮件...")
while True:
try:
for item in monitors:
monitor = item["monitor"]
receivers = item["receivers"]
monitor.ensure_connected()
new_emails = monitor.check_new_emails()
for mail_id in new_emails:
msg = monitor.fetch_email(mail_id)
if msg:
subject, sender, content = monitor.parse_email(msg)
content_parts = MailMonitor.split_content(content, max_bytes=20)
for part in content_parts:
notification = f"新邮件通知\n发件人: {sender}\n主题: {subject}\n内容: {part}"
wechat_push.send_message(receivers, notification)
time.sleep(10)
except Exception as e:
logging.error(f"全局异常: {e}")
time.sleep(10)