# # Copyright (C) 2020-2024 Soleta Networks # # This program is free software: you can redistribute it and/or modify it under # the terms of the GNU Affero General Public License as published by the # Free Software Foundation; either version 3 of the License, or # (at your option) any later version. import os import locale import ctypes import psutil import subprocess import multiprocessing as mp from multiprocessing import Process, freeze_support from src.log import OgError from PIL import Image, ImageDraw from pystray import Icon, Menu, MenuItem from src.ogRest import ThreadState def _create_default_image(): """ Creates a default image for the tray icon. Use in case no favicon.ico is found. The image will be a blue circle. """ width = height = 250 circle_color = (45, 158, 251) # Create a new image with a transparent background image = Image.new('RGBA', (width, height), (0, 0, 0, 0)) dc = ImageDraw.Draw(image) # Draw circle circle_radius = min(width, height) // 2 - 10 circle_center = (width // 2, height // 2) dc.ellipse( (circle_center[0] - circle_radius, circle_center[1] - circle_radius, circle_center[0] + circle_radius, circle_center[1] + circle_radius), fill=circle_color ) return image def create_image(): try: image = Image.open(r'./favicon.ico') image = Image.composite(image, Image.new('RGB', image.size, 'white'), image) except: image = _create_default_image() return image def create_systray(): menu = Menu(MenuItem('Powered by Soleta Networks!', lambda icon, item: 1)) icon = Icon('ogClient', create_image(), menu=menu) assert icon.icon icon.run() class OgWindowsOperations: def __init__(self): self.session = False freeze_support() mp.set_start_method('spawn') self.systray_p = Process(target=create_systray, daemon=True) self.systray_p.start() def _restartBrowser(self, url): raise OgError('Function not implemented') def poweroff(self): self.systray_p.terminate() os.system('shutdown -s -t 0') def reboot(self): self.systray_p.terminate() os.system('shutdown -r -t 0') def shellrun(self, request, ogRest): cmd = request.getrun() is_inline = request.get_inline() if not is_inline: raise OgError("Only inline mode is supported on Windows") env = os.environ.copy() env['OutputEncoding'] = 'utf8' try: ogRest.proc = subprocess.Popen( cmd, stdout=subprocess.PIPE, shell=True, env=env, ) output, error = ogRest.proc.communicate() except (OSError, subprocess.SubprocessError) as e: raise OgError(f'Error when running "shell run" subprocess: {e}') from e output = output.decode('utf-8-sig', errors='replace') return (ogRest.proc.returncode, cmd, output) def session(self, request, ogRest): raise OgError('Function not implemented') def software(self, request, ogRest): raise OgError('Function not implemented') def hardware(self, ogRest): raise OgError('Function not implemented') def setup(self, request, ogRest): raise OgError('Function not implemented') def image_restore(self, request, ogRest): raise OgError('Function not implemented') def image_create(self, request, ogRest): raise OgError('Function not implemented') def cache_delete(self, request, ogRest): raise OgError('Function not implemented') def cache_fetch(self, request, ogRest): raise OgError('Function not implemented') def refresh(self, ogRest): if self.session: session_value = 'WINS' else: session_value = 'WIN' return {"status": session_value} def check_interactive_session_change(self): old_status = self.session has_logged_user = False for user in psutil.users(): has_logged_user = True break self.session = has_logged_user if self.session != old_status: return self.session return None