# # Copyright (C) 2021 Soleta Networks # # This program is free software: you can redistribute it and/or modify it under # the terms of the GNU Affero General Public License as published by the # Free Software Foundation; either version 3 of the License, or # (at your option) any later version. import os import psutil import subprocess from subprocess import CalledProcessError from src.log import OgError from src.ogRest import ThreadState class OgLinuxOperations: def __init__(self): self.session = False def _restartBrowser(self, url): raise OgError('Function not implemented') def poweroff(self): os.system('systemctl poweroff') def reboot(self): os.system('systemctl reboot') def shellrun(self, request, ogRest): cmd = request.getrun() try: result = subprocess.run(cmd, shell=True, stdin=subprocess.DEVNULL, capture_output=True, text=True, check=True) except CalledProcessError as error: if error.stderr: return error.stderr if error.stdout: return error.stdout return "{Non zero exit code and empty output}" return result.stdout def session(self, request, ogRest): raise OgError('Function not implemented') def software(self, request, ogRest): raise OgError('Function not implemented') def hardware(self, ogRest): raise OgError('Function not implemented') def setup(self, request, ogRest): raise OgError('Function not implemented') def image_restore(self, request, ogRest): raise OgError('Function not implemented') def image_create(self, request, ogRest): raise OgError('Function not implemented') def cache_delete(self, request, ogRest): raise OgError('Function not implemented') def cache_fetch(self, request, ogRest): raise OgError('Function not implemented') def refresh(self, ogRest): if self.session: session_value = 'LINUXS' else: session_value = 'LINUX' return {"status": session_value} def check_interactive_session_change(self): old_status = self.session has_logged_user = False for user in psutil.users(): if user.terminal: try: process = psutil.Process(user.pid) env = process.environ() if "DISPLAY" in env or "WAYLAND_DISPLAY" in env: has_logged_user = True break except (psutil.NoSuchProcess, psutil.AccessDenied): continue self.session = has_logged_user if self.session != old_status: return self.session return None