邮件推送到企业微信

将未读邮件推送到企业微信(每10秒获取一次新邮件,我是自建邮箱没有速率限制,如果你是其他邮箱不知道要不要考虑速率限制)


目录

[TOC]


效果:


说明:

需要配置的有:

USERS_CONFIG:

  • imap_server: imap服务器地址
  • email_user: 邮箱
  • email_pass: 邮箱密码
  • wechat: 企业微信网页--通讯录--姓名--账号

CORP_ID: 企业微信网页--我的企业--企业ID

CORP_SECRET: 企业微信网页--应用管理--自建--<应用名称>--Secret--查看--去手机企业微信app查看

AGENT_ID:企业微信网页--应用管理--自建--<应用名称>--Secret--AgentId

可信ip(2022年6月20日起,新开启的通讯录同步和新创建的自建应用需配置企业可信IP):

Nginx:

location /cgi-bin/gettoken {
proxy_pass https://qyapi.weixin.qq.com;
}
location /cgi-bin/message/send {
proxy_pass https://qyapi.weixin.qq.com;
}
location  /cgi-bin/menu/create {
proxy_pass https://qyapi.weixin.qq.com;
}

不配置这三个也行,但至少可信域名第一次nginx要配置一个他指定的东西验证一下,通过验证就行,然后把要跑脚本的服务器的ip添加到可信ip


代码:

import imaplib
import email
from email.header import decode_header
import time
import requests
import json
import logging

logging.basicConfig(
    filename="mail_push.log",
    level=logging.INFO,
    format="%(asctime)s - %(levelname)s - %(message)s"
)

# 企业微信推送类
class WeChatPush:
    def __init__(self, corp_id, corp_secret, agent_id):
        self.corp_id = corp_id
        self.corp_secret = corp_secret
        self.agent_id = agent_id
        self.token_url = f"https://qyapi.weixin.qq.com/cgi-bin/gettoken?corpid={self.corp_id}&corpsecret={self.corp_secret}"
        self.token = self.get_access_token()

    def get_access_token(self):
        response = requests.get(self.token_url)
        if response.status_code == 200:
            data = response.json()
            if data.get("errcode") == 0:
                return data.get("access_token")
        raise Exception(f"获取 access_token 失败: {response.text}")

    def send_message(self, to_users, content):
        url = f"https://qyapi.weixin.qq.com/cgi-bin/message/send?access_token={self.token}"
        headers = {"Content-Type": "application/json"}
        payload = {
            "touser": "|".join(to_users),
            "msgtype": "text",
            "agentid": self.agent_id,
            "text": {"content": content},
            "safe": 0
        }
        try:
            response = requests.post(url, headers=headers, data=json.dumps(payload))
            if response.status_code == 200:
                data = response.json()
                if data.get("errcode") == 0:
                    logging.info(f"消息发送成功: {to_users}, 内容:{content}")
                else:
                    logging.error(f"消息发送失败: {data}")
            else:
                logging.error(f"HTTP请求失败: {response.text}")
        except Exception as e:
            logging.error(f"发送消息时发生错误: {e}")


# 邮件监控类
class MailMonitor:
    def __init__(self, imap_server, email_user, email_pass):
        self.imap_server = imap_server
        self.email_user = email_user
        self.email_pass = email_pass
        self.mail = self.connect_to_mail()

    def connect_to_mail(self):
        try:
            mail = imaplib.IMAP4_SSL(self.imap_server)
            mail.login(self.email_user, self.email_pass)
            return mail
        except Exception as e:
            logging.error(f"连接邮箱失败: {e}")
            exit()

    def ensure_connected(self):
        try:
            self.mail.noop()
        except imaplib.IMAP4.abort:
            logging.warning("IMAP 连接中断,重新连接中...")
            self.mail = self.connect_to_mail()
        except Exception as e:
            logging.error(f"IMAP 未知错误,尝试重新连接: {e}")
            self.mail = self.connect_to_mail()

    def check_new_emails(self):
        try:
            self.mail.select("inbox")
            status, messages = self.mail.search(None, 'UNSEEN')
            if status != "OK":
                logging.warning("未能获取邮件列表")
                return []
            return messages[0].split()
        except imaplib.IMAP4.abort as e:
            logging.error(f"IMAP4.abort 错误: {e}")
            self.mail = self.connect_to_mail()
            return []
        except Exception as e:
            logging.error(f"未知错误: {e}")
            return []

    def fetch_email(self, mail_id):
        try:
            status, msg_data = self.mail.fetch(mail_id, '(RFC822)')
            if status != "OK":
                logging.warning(f"无法获取邮件 ID {mail_id}")
                return None
            for response_part in msg_data:
                if isinstance(response_part, tuple):
                    msg = email.message_from_bytes(response_part[1])
                    return msg
        except Exception as e:
            logging.error(f"获取邮件时发生错误: {e}")
        return None

    @staticmethod
    def decode_header_value(value):
        decoded_parts = decode_header(value)
        decoded_string = ""
        for part, encoding in decoded_parts:
            if isinstance(part, bytes):
                decoded_string += part.decode(encoding if encoding else "utf-8", errors="ignore")
            else:
                decoded_string += part
        return decoded_string

    @staticmethod
    def parse_email(msg):
        subject = MailMonitor.decode_header_value(msg["Subject"])
        from_raw = msg.get("From")
        from_decoded = MailMonitor.decode_header_value(from_raw)
        content = MailMonitor.extract_email_body(msg)
        return subject, from_decoded, content

    @staticmethod
    def extract_email_body(msg):
        if msg.is_multipart():
            for part in msg.walk():
                content_type = part.get_content_type()
                content_disposition = str(part.get("Content-Disposition"))
                if content_type == "text/plain" and "attachment" not in content_disposition:
                    return part.get_payload(decode=True).decode(errors="ignore")
                elif content_type == "text/html" and "attachment" not in content_disposition:
                    return part.get_payload(decode=True).decode(errors="ignore")
        else:
            if msg.get_content_type() == "text/plain":
                return msg.get_payload(decode=True).decode(errors="ignore")
        return "(无正文内容)"

    @staticmethod
    def split_content(content, max_bytes=20):
        # 分割内容,确保每部分不超过 max_bytes 字节
        encoded_content = content.encode('utf-8')
        parts = []
        for i in range(0, len(encoded_content), max_bytes):
            part = encoded_content[i:i+max_bytes].decode('utf-8', errors='ignore')
            parts.append(part)
        return parts


# 配置多个邮箱和接收者
USERS_CONFIG = [
    {
        "email": {
            "imap_server": "mail.xxx.com",
            "email_user": "[email protected]",
            "email_pass": "password1"
        },
        "wechat": ["XingMing1", "XingMing2"]
    },
    {
        "email": {
            "imap_server": "mail.xxx.com",
            "email_user": "[email protected]",
            "email_pass": "password2"
        },
        "wechat": ["XingMing3"]
    },
]

# 企业微信配置
CORP_ID = "ww6d039b3bced14855"  # 企业ID
CORP_SECRET = "TfldX0q6VFa_6hFus-pMbxKxi4mTq3O8e0EBlq0JQGk"  # 应用Secret
AGENT_ID = "1000003"  # 应用ID

# 初始化微信推送类
wechat_push = WeChatPush(CORP_ID, CORP_SECRET, AGENT_ID)

# 初始化邮件监控服务
monitors = [
    {
        "monitor": MailMonitor(config["email"]["imap_server"],
                               config["email"]["email_user"],
                               config["email"]["email_pass"]),
        "receivers": config["wechat"]
    }
    for config in USERS_CONFIG
]

logging.info("开始监控多个用户的新邮件...")
while True:
    try:
        for item in monitors:
            monitor = item["monitor"]
            receivers = item["receivers"]
            monitor.ensure_connected()
            new_emails = monitor.check_new_emails()
            for mail_id in new_emails:
                msg = monitor.fetch_email(mail_id)
                if msg:
                    subject, sender, content = monitor.parse_email(msg)
                    content_parts = MailMonitor.split_content(content, max_bytes=20)
                    for part in content_parts:
                        notification = f"新邮件通知\n发件人: {sender}\n主题: {subject}\n内容: {part}"
                        wechat_push.send_message(receivers, notification)
        time.sleep(10)
    except Exception as e:
        logging.error(f"全局异常: {e}")
        time.sleep(10)
Last modification:December 27, 2024
V50%看看实力