# # Copyright (C) 2020-2021 Soleta Networks # # This program is free software: you can redistribute it and/or modify it under # the terms of the GNU Affero General Public License as published by the # Free Software Foundation; either version 3 of the License, or # (at your option) any later version. import threading import platform import time from enum import Enum import json import queue import sys import os import signal import logging from logging.handlers import SysLogHandler from src.restRequest import * from src.log import OgError class ThreadState(Enum): IDLE = 0 BUSY = 1 class jsonBody(): def __init__(self, dictionary=None): if dictionary: self.jsontree = dictionary else: self.jsontree = {} def add_element(self, key, value): self.jsontree[key] = value def dump(self): return json.dumps(self.jsontree) class restResponse(): def __init__(self, response, json_body=None, seq=None): self.msg = '' if response == ogResponses.BAD_REQUEST: self.msg = 'HTTP/1.0 400 Bad Request' elif response == ogResponses.IN_PROGRESS: self.msg = 'HTTP/1.0 202 Accepted' elif response == ogResponses.OK: self.msg = 'HTTP/1.0 200 OK' elif response == ogResponses.INTERNAL_ERR: self.msg = 'HTTP/1.0 500 Internal Server Error' elif response == ogResponses.UNAUTHORIZED: self.msg = 'HTTP/1.0 401 Unauthorized' elif response == ogResponses.SERVICE_UNAVAILABLE: self.msg = 'HTTP/1.0 503 Service Unavailable' elif response == ogResponses.EARLY_HINTS: self.msg = 'HTTP/1.0 103 Early Hints' else: return self.msg if response in {ogResponses.OK, ogResponses.IN_PROGRESS}: logging.debug(self.msg[:ogRest.LOG_LENGTH]) else: logging.warning(self.msg[:ogRest.LOG_LENGTH]) self.msg += '\r\n' if seq: self.seq = seq self.msg += 'X-Sequence: ' + str(seq) self.msg += '\r\n' if json_body: self.msg += 'Content-Length: ' + str(len(json_body.dump())) self.msg += '\r\nContent-Type: application/json' self.msg += '\r\n\r\n' + json_body.dump() else: self.msg += 'Content-Length: 0\r\n' \ 'Content-Type: application/json\r\n\r\n' def get(self): return self.msg class ogThread(): def shellrun(client, request, ogRest): if not request.getrun(): response = restResponse(ogResponses.BAD_REQUEST, seq=client.seq) client.send(response.get()) ogRest.state = ThreadState.IDLE return try: (retcode, cmd, shellout) = ogRest.operations.shellrun(request, ogRest) except Exception as e: ogRest.send_internal_server_error(client, exc=e) return if request.getEcho(): json_body = jsonBody() json_body.add_element('cmd', cmd) json_body.add_element('out', shellout) json_body.add_element('retcode', retcode) response = restResponse(ogResponses.OK, json_body, seq=client.seq) client.send(response.get()) else: response = restResponse(ogResponses.OK, seq=client.seq) client.send(response.get()) ogRest.state = ThreadState.IDLE def poweroff(ogRest): time.sleep(2) ogRest.operations.poweroff() def reboot(ogRest): ogRest.operations.reboot() def session(client, request, ogRest): try: ogRest.operations.session(request, ogRest) except Exception as e: ogRest.send_internal_server_error(client, exc=e) return response = restResponse(ogResponses.OK, seq=client.seq) client.send(response.get()) client.disconnect() def software(client, request, ogRest): try: software = ogRest.operations.software(request, ogRest) except Exception as e: ogRest.send_internal_server_error(client, exc=e) return json_body = jsonBody() json_body.add_element('partition', request.getPartition()) json_body.add_element('software', software) response = restResponse(ogResponses.OK, json_body, seq=client.seq) client.send(response.get()) ogRest.state = ThreadState.IDLE def hardware(client, ogRest): try: result = ogRest.operations.hardware(ogRest) except Exception as e: ogRest.send_internal_server_error(client, exc=e) return json_body = jsonBody() json_body.add_element('hardware', result) response = restResponse(ogResponses.OK, json_body, seq=client.seq) client.send(response.get()) ogRest.state = ThreadState.IDLE def setup(client, request, ogRest): try: out = ogRest.operations.setup(request, ogRest) except Exception as e: ogRest.send_internal_server_error(client, exc=e) return json_body = jsonBody(out) response = restResponse(ogResponses.OK, json_body, seq=client.seq) client.send(response.get()) ogRest.state = ThreadState.IDLE def image_restore(client, request, ogRest): try: payload = ogRest.operations.image_restore(request, ogRest) except Exception as e: ogRest.send_internal_server_error(client, exc=e) return json_body = jsonBody(payload) response = restResponse(ogResponses.OK, json_body, seq=client.seq) client.send(response.get()) ogRest.state = ThreadState.IDLE def image_create(client, request, ogRest): try: image_info = ogRest.operations.image_create(request, ogRest) software = ogRest.operations.software(request, ogRest) except Exception as e: ogRest.send_internal_server_error(client, exc=e) return kibi = 1024 datasize = int(image_info.datasize) * kibi json_body = jsonBody() json_body.add_element('disk', request.getDisk()) json_body.add_element('partition', request.getPartition()) json_body.add_element('code', request.getCode()) json_body.add_element('id', request.getId()) json_body.add_element('name', request.getName()) json_body.add_element('repository', request.getRepo()) json_body.add_element('software', software) json_body.add_element('clonator', image_info.clonator) json_body.add_element('compressor', image_info.compressor) json_body.add_element('filesystem', image_info.filesystem) json_body.add_element('datasize', datasize) json_body.add_element('size', image_info.size) json_body.add_element('perms', image_info.perms) json_body.add_element('lastupdate', image_info.mtime) json_body.add_element('checksum', image_info.checksum) response = restResponse(ogResponses.OK, json_body, seq=client.seq) client.send(response.get()) ogRest.state = ThreadState.IDLE def cache_delete(client, request, ogRest): try: out = ogRest.operations.cache_delete(request, ogRest) except Exception as e: ogRest.send_internal_server_error(client, exc=e) return json_body = jsonBody(out) response = restResponse(ogResponses.OK, json_body, seq=client.seq) client.send(response.get()) ogRest.state = ThreadState.IDLE def cache_fetch(client, request, ogRest): try: out = ogRest.operations.cache_fetch(request, ogRest) except Exception as e: ogRest.send_internal_server_error(client, exc=e) return json_body = jsonBody(out) response = restResponse(ogResponses.OK, json_body, seq=client.seq) client.send(response.get()) ogRest.state = ThreadState.IDLE def refresh(client, ogRest): try: out = ogRest.operations.refresh(ogRest) except Exception as e: ogRest.send_internal_server_error(client, exc=e) return json_body = jsonBody(out) response = restResponse(ogResponses.OK, json_body, seq=client.seq) client.send(response.get()) ogRest.state = ThreadState.IDLE class ogResponses(Enum): BAD_REQUEST=0 IN_PROGRESS=1 OK=2 INTERNAL_ERR=3 UNAUTHORIZED=4 SERVICE_UNAVAILABLE=5 EARLY_HINTS=6 class ogRest(): LOG_LENGTH = 32 def __init__(self, config): self.proc = None self.terminated = False self.state = ThreadState.IDLE self.CONFIG = config self.mode = self.CONFIG['opengnsys']['mode'] self.samba_config = self.CONFIG['samba'] if self.mode == 'live': from src.live.ogOperations import OgLiveOperations self.operations = OgLiveOperations(self.CONFIG) elif self.mode == 'virtual': from src.virtual.ogOperations import \ OgVirtualOperations self.operations = OgVirtualOperations() threading.Thread(target=self.operations.check_vm_state_loop, args=(self,)).start() elif self.mode == 'linux': from src.linux.ogOperations import OgLinuxOperations self.operations = OgLinuxOperations() elif self.mode == 'windows': from src.windows.ogOperations import OgWindowsOperations self.operations = OgWindowsOperations() else: raise OgError(f'Ogrest mode \'{self.mode}\'not supported') def send_internal_server_error(self, client, exc=None): if isinstance(exc, OgError): logging.error(exc) else: logging.exception(exc) response = restResponse(ogResponses.INTERNAL_ERR, seq=client.seq) client.send(response.get()) self.state = ThreadState.IDLE def process_request(self, request, client): method = request.get_method() URI = request.get_uri() logging.debug('Incoming request: %s%s', method, URI[:ogRest.LOG_LENGTH]) if (not "stop" in URI and not "reboot" in URI and not "poweroff" in URI): if self.state == ThreadState.BUSY: logging.error('Request has been received while ogClient is busy') response = restResponse(ogResponses.SERVICE_UNAVAILABLE, seq=client.seq) client.send(response.get()) return else: self.state = ThreadState.BUSY if ("GET" in method): if "hardware" in URI: self.process_hardware(client) elif ("software" in URI): self.process_software(client, request) elif ("run/schedule" in URI): self.process_schedule(client) elif "refresh" in URI: self.process_refresh(client) else: logging.error('Unsupported request: %s', {URI[:ogRest.LOG_LENGTH]}) response = restResponse(ogResponses.BAD_REQUEST, seq=client.seq) client.send(response.get()) self.state = ThreadState.IDLE elif ("POST" in method): if ("poweroff" in URI): self.process_poweroff(client) elif ("reboot" in URI): self.process_reboot(client) elif ("shell/run" in URI): self.process_shellrun(client, request) elif ("session" in URI): self.process_session(client, request) elif ("setup" in URI): self.process_setup(client, request) elif ("image/restore" in URI): self.process_imagerestore(client, request) elif ("stop" in URI): self.process_stop(client) elif ("image/create" in URI): self.process_imagecreate(client, request) elif ("cache/delete" in URI): self.process_cache_delete(client, request) elif ("cache/fetch" in URI): self.process_cache_fetch(client, request) else: logging.error('Unsupported request: %s', URI[:ogRest.LOG_LENGTH]) response = restResponse(ogResponses.BAD_REQUEST, seq=client.seq) client.send(response.get()) self.state = ThreadState.IDLE else: response = restResponse(ogResponses.BAD_REQUEST, seq=client.seq) client.send(response.get()) self.state = ThreadState.IDLE return 0 def kill_process(self): try: os.kill(self.proc.pid, signal.SIGTERM) except: pass time.sleep(2) try: os.kill(self.proc.pid, signal.SIGKILL) except: pass self.state = ThreadState.IDLE def process_reboot(self, client): response = restResponse(ogResponses.IN_PROGRESS, seq=client.seq) client.send(response.get()) if self.mode != 'virtual': client.disconnect() if self.state == ThreadState.BUSY: self.kill_process() threading.Thread(target=ogThread.reboot, args=(self,)).start() def process_poweroff(self, client): response = restResponse(ogResponses.IN_PROGRESS, seq=client.seq) client.send(response.get()) if self.mode != 'virtual': client.disconnect() if self.state == ThreadState.BUSY: self.kill_process() threading.Thread(target=ogThread.poweroff, args=(self,)).start() def process_shellrun(self, client, request): threading.Thread(target=ogThread.shellrun, args=(client, request, self,)).start() def process_session(self, client, request): threading.Thread(target=ogThread.session, args=(client, request, self,)).start() def process_software(self, client, request): threading.Thread(target=ogThread.software, args=(client, request, self,)).start() def process_hardware(self, client): threading.Thread(target=ogThread.hardware, args=(client, self,)).start() def process_schedule(self, client): response = restResponse(ogResponses.OK, seq=client.seq) client.send(response.get()) self.state = ThreadState.IDLE def process_setup(self, client, request): threading.Thread(target=ogThread.setup, args=(client, request, self,)).start() def process_imagerestore(self, client, request): threading.Thread(target=ogThread.image_restore, args=(client, request, self,)).start() def process_stop(self, client): client.disconnect() if self.state == ThreadState.BUSY: self.kill_process() self.terminated = True sys.exit(0) def process_imagecreate(self, client, request): threading.Thread(target=ogThread.image_create, args=(client, request, self,)).start() def process_cache_delete(self, client, request): threading.Thread(target=ogThread.cache_delete, args=(client, request, self,)).start() def process_cache_fetch(self, client, request): threading.Thread(target=ogThread.cache_fetch, args=(client, request, self,)).start() def process_refresh(self, client): threading.Thread(target=ogThread.refresh, args=(client, self,)).start() def check_interactive_session_change(self): return self.operations.check_interactive_session_change()