#!/usr/bin/python3 # # Copyright (C) 2020-2024 Soleta Networks # # This program is free software: you can redistribute it and/or modify it under # the terms of the GNU Affero General Public License as published by the # Free Software Foundation; either version 3 of the License, or # (at your option) any later version. import time import multiprocessing as mp import psutil from PIL import Image, ImageDraw from pystray import Icon, Menu, MenuItem SERVICE_POLL_INTERVAL = 5 def _create_default_image(): """ Creates a default image for the tray icon. Use in case no favicon.ico is found. The image will be a blue circle. """ width = height = 250 circle_color = (45, 158, 251) # Create a new image with a transparent background image = Image.new('RGBA', (width, height), (0, 0, 0, 0)) dc = ImageDraw.Draw(image) # Draw circle circle_radius = min(width, height) // 2 - 10 circle_center = (width // 2, height // 2) dc.ellipse( (circle_center[0] - circle_radius, circle_center[1] - circle_radius, circle_center[0] + circle_radius, circle_center[1] + circle_radius), fill=circle_color ) return image def create_icon_image(): try: image = Image.open(r'./favicon.ico') image = Image.composite(image, Image.new('RGB', image.size, 'white'), image) except FileNotFoundError: image = _create_default_image() return image def create_systray(): menu = Menu(MenuItem('ogclient service running', lambda icon, item: None)) icon = Icon('ogClient', create_icon_image(), menu=menu) icon.run() def is_ogclient_service_active(): service_name = 'ogclient' try: service = psutil.win_service_get(service_name) service = service.as_dict() return service.get('status') == 'running' except Exception: return False def _session_check_loop(): systray_process = None try: while True: session_status = is_ogclient_service_active() is_systray_active = systray_process and systray_process.is_alive() if session_status and not is_systray_active: systray_process = mp.Process(target=create_systray, daemon=True) systray_process.start() elif not session_status and is_systray_active: systray_process.terminate() systray_process.join() systray_process = None time.sleep(SERVICE_POLL_INTERVAL) except KeyboardInterrupt: if systray_process and systray_process.is_alive(): systray_process.terminate() systray_process.join() def main(): mp.freeze_support() mp.set_start_method('spawn') _session_check_loop() if __name__ == "__main__": main()