You asked for my python script but now I can't seem to load that comment to reply directly to it. Anyway, here's the script, I haven't bothered to upload the repo anywhere. I'm sure it isn't perfect but it works fine for me. The action for opening evolution when you click the tray icon is specific to hyprland so will probably need to be modified to suit your needs.
import asyncio
import concurrent.futures
import logging
import signal
import sqlite3
import sys
from pathlib import Path
from subprocess import run
import pkg_resources
from inotify_simple import INotify, flags
from PySimpleGUIQt import SystemTray
menu_def = ["BLANK", ["Exit"]]
empty_icon = pkg_resources.resource_filename(
"evolution_tray", "resources/inbox-empty.svg"
)
full_icon = pkg_resources.resource_filename(
"evolution_tray", "resources/inbox-full.svg"
)
inotify = INotify()
tray = SystemTray(filename=empty_icon, menu=menu_def, tooltip="Inbox empty")
logging.getLogger("asyncio").setLevel(logging.WARNING)
handler = logging.StreamHandler(sys.stdout)
logger = logging.getLogger()
logger.setLevel("DEBUG")
logger.addHandler(handler)
def handle_menu_events():
while True:
menu_item = tray.read()
if menu_item == "Exit":
signal.raise_signal(signal.SIGTERM)
elif menu_item == "__ACTIVATED__":
run(["hyprctl", "dispatch", "exec", "evolution"])
# tray.update(filename=paused_icon)
logger.info("Opened evolution")
def get_all_databases():
cache_path = Path.home() / ".cache" / "evolution" / "mail"
return list(cache_path.glob("**/folders.db"))
def check_unread() -> int:
unread = 0
for db in get_all_databases():
conn = sqlite3.connect(db)
cursor = conn.cursor()
try:
cursor.execute("select count(*) read from INBOX where read == 0")
unread += cursor.fetchone()[0]
except:
pass
finally:
conn.close()
if unread > 0:
tray.update(filename=full_icon, tooltip=f"{unread} unread emails")
else:
tray.update(filename=empty_icon, tooltip="Inbox empty")
return unread
def watch_inbox():
while True:
for database in get_all_databases():
inotify.add_watch(database, mask=flags.MODIFY)
while inotify.read():
logger.info("New mail")
logger.info(f"{check_unread()} new emails")
async def main():
executor = concurrent.futures.ThreadPoolExecutor(max_workers=1)
loop = asyncio.get_running_loop()
check_unread()
watch_task = asyncio.wait(
fs={
loop.run_in_executor(executor, watch_inbox),
},
return_when=asyncio.FIRST_COMPLETED,
)
await asyncio.gather(watch_task, loop.create_task(handle_menu_events()))
def entrypoint():
signal.signal(signal.SIGINT, signal.SIG_DFL)
signal.signal(signal.SIGTERM, signal.SIG_DFL)
try:
asyncio.run(main())
except Exception as e:
logger.exception(e)
if __name__ == "__main__":
entrypoint()