import imaplib import email import html2text import re from email.header import make_header, decode_header from pathlib import Path import os from dotenv import load_dotenv path_to_env = Path(__file__).parent / '.env' load_dotenv(path_to_env) EMAIL = os.getenv("EMAIL") PASSWORD = os.getenv("PASSWORD") MAX_MSG_COUNT = int(os.getenv("MAX_MSG_COUNT", 5)) MAX_BODY_LENGTH = int(os.getenv("MAX_BODY_LENGTH", 100)) GREEN = "\033[1;32m" END_COLOR = "\033[0m" UNSEEN_FLAG = "(UNSEEN)" BODY_PEEK_FLAG = "(BODY.PEEK[])" def colorful_text(text, color): """ Function to format text with Pango markup for color. """ return f"{text}" def format_body(body, max_length=MAX_BODY_LENGTH): body = body.decode("utf-8", errors="ignore") if "DOCTYPE" in body: body = html2text.html2text(body) body = body.replace("\n", "").replace("\r\n", "").replace(" ", "") body = re.sub(r"\[.*\]\(.*\)", "", body) return body if len(body) < max_length else body[:max_length] + "..." def get_short_email_str(M, num_emails=MAX_MSG_COUNT): rv, data = M.search(None, UNSEEN_FLAG) email_list = list(reversed(data[0].split()))[:num_emails] for num in email_list: try: rv, data = M.fetch(num, BODY_PEEK_FLAG) if rv != "OK": print("ERROR getting message", num) continue msg = email.message_from_bytes(data[0][1]) sender = make_header(decode_header(msg["From"])) subject = make_header(decode_header(msg["Subject"])) date = make_header(decode_header(msg["Date"])) email_info = ( f"From: {colorful_text(str(sender).replace('<', '').replace('>', ''), 'green')}\n" f"Subject: {colorful_text(str(subject), 'red')}\n" f"Date: {date}\n" ) if msg.is_multipart(): for part in msg.walk(): content_type = part.get_content_type() content_disposition = str(part.get("Content-Disposition")) if ( content_type == "text/plain" and "attachment" not in content_disposition ): body = part.get_payload(decode=True) email_info += format_body(body) break elif ( content_type == "text/html" and "attachment" not in content_disposition ): body = part.get_payload(decode=True) body = html2text.html2text( body.decode("utf-8", errors="ignore") ) email_info += format_body(body) break else: body = msg.get_payload(decode=True) email_info += format_body(body) email_info += "\n" + "=" * 50 + "\n" yield email_info except Exception: print("ERROR processing message: ", num) if __name__ == "__main__": # Example usage: # read_unread_emails.py import time start = time.time() M = imaplib.IMAP4_SSL("imap.gmail.com", 993) try: M.login(EMAIL, PASSWORD) rv, data = M.select("INBOX") if rv == "OK": for email_str in get_short_email_str(M, MAX_MSG_COUNT): print(email_str) else: print("Error selecting INBOX") except Exception: print("Error logging in: ", EMAIL) finally: M.logout() print(f"Time taken: {time.time() - start:.2f} seconds")