# # Copyright (C) 2020-2024 Soleta Networks # # This program is free software: you can redistribute it and/or modify it under # the terms of the GNU Affero General Public License as published by the # Free Software Foundation; either version 3 of the License, or # (at your option) any later version. import datetime import hashlib import logging import os import subprocess import shlex import shutil from subprocess import Popen, PIPE import fdisk from src.ogRest import ThreadState from src.live.partcodes import GUID_MAP from src.live.parttypes import get_parttype from src.utils.image import * from src.utils.postinstall import configure_os from src.utils.net import * from src.utils.menu import generate_menu from src.utils.fs import * from src.utils.probe import os_probe, get_cache_dev_path from src.utils.disk import * from src.utils.cache import * from src.utils.tiptorrent import * from src.utils.uefi import * from src.utils.boot import * from src.utils.sw_inventory import get_package_set from src.utils.hw_inventory import get_hardware_inventory, legacy_list_hardware_inventory from src.log import OgError class OgLiveOperations: def __init__(self, config): self._url = config['opengnsys']['url'] self._url_log = config['opengnsys']['url_log'] self._smb_user = config['samba']['user'] self._smb_pass = config['samba']['pass'] interface = get_ethernet_interface() self._enable_wol(interface) def _restartBrowser(self, url): if not shutil.which('browser'): return try: proc = subprocess.call(["pkill", "-9", "browser"]) proc = subprocess.Popen(["browser", "-qws", url]) except OSError as e: raise OgError('Cannot restart browser') from e def _refresh_payload_disk(self, disk_data, part_setup, num_disk): part_setup['disk'] = str(num_disk) part_setup['disk_type'] = 'DISK' part_setup['partition'] = '0' part_setup['filesystem'] = '' part_setup['os'] = '' part_setup['size'] = str(disk_data.nsectors * disk_data.sector_size // 1024) part_setup['used_size'] = 0 part_setup['free_size'] = 0 if not disk_data.label_name: part_setup['code'] = '0' else: part_setup['code'] = '2' if disk_data.label_name == 'gpt' else '1' def _refresh_payload_partition(self, disk_data, pa, part_setup, disk): partnum = pa.partno + 1 target = pa.padev.replace('dev', 'mnt') if disk_data.label_name == 'gpt': code = GUID_MAP.get(pa.parttype, 0x0) else: code = int(pa.parttype, base=16) if mount_mkdir(pa.padev, target): probe_result = os_probe(target) part_setup['os'] = probe_result total, used, free = shutil.disk_usage(target) part_setup['used_size'] = used part_setup['free_size'] = free umount(target) else: part_setup['os'] = '' part_setup['used_size'] = 0 part_setup['free_size'] = 0 part_setup['disk_type'] = '' part_setup['partition'] = str(partnum) part_setup['filesystem'] = pa.fstype.upper() if pa.fstype else 'EMPTY' # part_setup['code'] = hex(code).removeprefix('0x') part_setup['code'] = hex(code)[2:] part_setup['size'] = str(int(pa.size) // 1024) if (part_setup['filesystem'] == 'VFAT'): part_setup['filesystem'] = 'FAT32' if (part_setup['filesystem'] == 'SWAP'): part_setup['filesystem'] = 'LINUX-SWAP' def _refresh_part_setup_cache(self, pa, part_setup, cache): if pa.padev == cache: part_setup['filesystem'] = 'CACHE' part_setup['code'] = 'ca' def _get_cache_contents(self): cache_contents = {} cache_mnt = mount_cache() if not cache_mnt: return cache_contents img_dir = OG_CACHE_IMAGE_PATH if not os.path.isdir(img_dir): return cache_contents image_list = [] for file_name in os.listdir(img_dir): file_path = os.path.join(img_dir, file_name) if not os.path.isfile(file_path): continue if not file_name.endswith('.img'): continue checksum_file_path = file_path + '.full.sum' image_checksum = '' try: with open(checksum_file_path, 'r') as f: image_checksum = f.read() except (FileNotFoundError, PermissionError, OSError) as e: logging.error(f'Cannot read checksum file {checksum_file_path}: {e}') if not image_checksum: logging.info(f'Removing {file_name} with no checksum file from cache, maybe partial download?') try: if os.path.exists(file_path): os.unlink(file_path) if os.path.exists(checksum_file_path): os.unlink(checksum_file_path) except OSError as e: pass continue image_size = os.stat(file_path).st_size image_list.append({ 'name': file_name, 'size': image_size, 'checksum': image_checksum}) total, used, free = shutil.disk_usage(cache_mnt) cache_contents['used_size'] = used cache_contents['free_size'] = free cache_contents['images'] = image_list return cache_contents def _get_boot_entry_data(self): data = {} if not is_uefi_supported(): return data try: efibootmgr_out = run_efibootmgr_json(validate=True) except Exception as e: logging.warning(e) return data data['entries'] = [] for idx, entry_name in enumerate(efibootmgr_out['BootOrder']): entry_name = 'Boot' + entry_name for entry in efibootmgr_out.get('vars', []): if entry.get('name', '') == entry_name: entry['order'] = idx data['entries'].append(entry) break for entry in efibootmgr_out['vars']: if not 'order' in entry: data['entries'].append(entry) return data def _write_md5_file(self, path, checksum): if not os.path.exists(path): raise OgError(f'Failed to calculate checksum, image file {path} does not exist') filename = path + ".full.sum" try: with open(filename, 'w') as f: f.write(checksum) except: logging.error(f'Cannot write checksum {filename}') return -1 return 0 def _fetch_image_unicast(self, repo, image_name): """ Copies /opt/opengnsys/image/{image_name} into /opt/opengnsys/cache/opt/opengnsys/images/ Implies a unicast transfer. Does not use tiptorrent. """ if not get_cache_dev_path(): raise OgError('No cache partition is mounted') dst = f'{OG_CACHE_IMAGE_PATH}{image_name}.img' if (os.path.exists(dst) and tip_check_csum(repo, image_name)): logging.info(f'Found up-to-date image {image_name}.img in cache') return try: if os.path.exists(dst): os.unlink(dst) if os.path.exists(f"{dst}.full.sum"): os.unlink(f"{dst}.full.sum") except OSError as e: raise OgError(f"Error deleting file {e.filename}: {e.strerror}") from e src = f'/opt/opengnsys/images/{image_name}.img' try: logging.info(f'Fetching image {image_name}.img from {src}') logging.info('*DO NOT REBOOT OR POWEROFF* the client during this time') r = shutil.copy(src, dst) tip_write_csum(image_name) except (OSError, OgError) as e: try: if os.path.exists(dst): os.unlink(dst) if os.path.exists(f"{dst}.full.sum"): os.unlink(f"{dst}.full.sum") except OSError as e: pass raise OgError(f'Error copying image {image_name} to cache. Reported: {e}') from e if (not os.path.exists(dst)): raise OgError(f'could not find {dst} in cache') if (not tip_check_csum(repo, image_name)): raise OgError(f'Failed to validate checksum for {image_name}.img') def _restore_image_unicast(self, repo, name, devpath, cache=False): if ogChangeRepo(repo, smb_user=self._smb_user, smb_pass=self._smb_pass) != 0: self._restartBrowser(self._url) raise OgError(f'Cannot change repository to {repo}') if cache: self._fetch_image_unicast(repo, name) image_path = f'{OG_CACHE_IMAGE_PATH}{name}.img' else: if os.access(f'/opt/opengnsys/images', os.R_OK) == False: raise OgError('Cannot access /opt/opengnsys/images in read mode, check permissions') logging.warning(f'Checksum validation is *NOT* available with UNICAST-DIRECT!') image_path = f'/opt/opengnsys/images/{name}.img' self._restore_image(image_path, devpath) def _fetch_image_tiptorrent(self, repo, name): if not get_cache_dev_path(): raise OgError('No cache partition is mounted') fetch = False image_path = f'{OG_CACHE_IMAGE_PATH}{name}.img' try: if (not os.path.exists(image_path) or not tip_check_csum(repo, name)): tip_client_get(repo, name) fetch = True except: self._restartBrowser(self._url) raise if fetch: if (not os.path.exists(image_path)): raise OgError(f'could not find {image_path} in cache') if (not tip_check_csum(repo, name)): raise OgError(f'Failed to validate checksum for {name}.img') return image_path def _restore_image_tiptorrent(self, repo, name, devpath): image_path = self._fetch_image_tiptorrent(repo, name) self._restore_image(image_path, devpath) def _restore_image(self, image_path, devpath): logging.info(f'Restoring image at {image_path} into {devpath}') logging.info('*DO NOT REBOOT OR POWEROFF* the client during this time') cmd_lzop = shlex.split(f'lzop -dc {image_path}') cmd_pc = shlex.split(f'partclone.restore -d0 -C -I -o {devpath}') if not os.path.exists(image_path): raise OgError(f'Image not found at {image_path} during image restore') with open('/tmp/command.log', 'wb', 0) as logfile: proc_lzop = subprocess.Popen(cmd_lzop, stdout=subprocess.PIPE) proc_pc = subprocess.Popen(cmd_pc, stdin=proc_lzop.stdout, stderr=logfile) proc_lzop.stdout.close() proc_pc.communicate() def _ogbrowser_clear_logs(self): logfiles = ['/tmp/command.log', '/tmp/session.log'] for logfile in logfiles: with open(logfile, 'wb', 0) as f: f.truncate(0) def _enable_wol(self, interface): cmd_ethtool = shlex.split(f'ethtool -s {interface} wol g') if subprocess.run(cmd_ethtool).returncode != 0: logging.error('Error running ethtool subprocess') def _shutdown(self, interface, cmd): cmd_browser = shlex.split('pkill -9 browser') if shutil.which('browser') and subprocess.run(cmd_browser).returncode != 0: logging.error('Error terminating ogBrowser process') umount_all() umount_cache() ret = subprocess.run(cmd) if ret.returncode != 0: logging.error(f'Error running {cmd} subprocess') def _poweroff_oglive(self, interface): self._enable_wol(interface) if shutil.which('poweroff'): busybox = '' elif shutil.which('busybox'): busybox = 'busybox ' else: logging.warning('No poweroff binary found') busybox = '' cmd_poweroff = shlex.split(f'{busybox}poweroff') self._shutdown(interface, cmd_poweroff) def poweroff(self): logging.info('Powering off client') interface = get_ethernet_interface() self._poweroff_oglive(interface) def _reboot_oglive(self, interface): self._enable_wol(interface) if shutil.which('reboot'): busybox = '' opts = ' --force --force' elif shutil.which('busybox'): busybox = 'busybox ' opts = '' else: logging.warning('No reboot binary found') busybox = '' opts = '' cmd_reboot = shlex.split(f'{busybox}reboot{opts}') self._shutdown(interface, cmd_reboot) def reboot(self): logging.info('Rebooting client') interface = get_ethernet_interface() self._reboot_oglive(interface) def shellrun(self, request, ogRest): cmd = request.getrun() is_inline = request.get_inline() if not cmd: raise OgError("No command provided for shell run") self._restartBrowser(self._url_log) if is_inline: cmds = cmd else: cmds = shlex.split(cmd) shell_path = '/opt/opengnsys/shell/' if not cmds: raise OgError("Parsed shell command list is empty") try: shell_path_files = os.listdir(shell_path) except OSError as e: raise OgError(f'Error accessing {shell_path}: {e}') from e for file_name in shell_path_files: file_path = os.path.join(shell_path, file_name) if cmds[0] == file_name: cmds[0] = file_path cmd = " ".join(cmds) break else: raise OgError(f'Script {cmds[0]} not found in {shell_path}') if not os.path.isfile(cmds[0]) or not os.access(cmds[0], os.X_OK): raise OgError(f"Command '{cmds[0]}' is not a valid executable") try: ogRest.proc = subprocess.Popen( cmds, stdout=subprocess.PIPE, shell=is_inline) (output, error) = ogRest.proc.communicate() except (OSError, subprocess.SubprocessError) as e: raise OgError(f'Error when running "shell run" subprocess: {e}') from e if ogRest.proc.returncode != 0: logging.warning('Non zero exit code when running: %s', ' '.join(cmds)) else: logging.info('Shell run command OK') self.refresh(ogRest) output = output.decode('utf-8-sig', errors='replace') return (ogRest.proc.returncode, cmd, output) def session(self, request, ogRest): disk = request.getDisk() partition = request.getPartition() boot_os_at(int(disk), int(partition)) self.reboot() def software(self, request, ogRest): disk = request.getDisk() partition = request.getPartition() partdev = get_partition_device(int(disk), int(partition)) mountpoint = partdev.replace('dev', 'mnt') if not mount_mkdir(partdev, mountpoint): raise OgError(f'Error mounting {partdev} at {mountpoint}') if not os.path.ismount(mountpoint): raise OgError(f'Invalid mountpoint {mountpoint} for software inventory') self._restartBrowser(self._url_log) pkgset = get_package_set(mountpoint) self._restartBrowser(self._url) umount(mountpoint) logging.info('Software inventory command OK') # Software inventory result is still processed by legacy server code # (ogAdmServer.c). Legacy response format is string where each # software package is separated by a newline '\n'. # Each package/software line follows this format: # "{package_name} {package_version}" return '\n'.join(map(str,pkgset)) def hardware(self, ogRest): self._restartBrowser(self._url_log) logging.info('Running hardware inventory command') try: inventory = get_hardware_inventory() except Exception as e: raise OgError(f'Error while running hardware inventory. {e}') from e finally: self._restartBrowser(self._url) result = legacy_list_hardware_inventory(inventory) logging.info('Successful hardware inventory command execution') return result def _partition(self, diskname, table_type, partlist): cxt = fdisk.Context(f'/dev/{diskname}', details=True) if table_type == 'MSDOS': cxt.create_disklabel('dos') elif table_type == 'GPT': cxt.create_disklabel('gpt') else: raise OgError(f'Unsupported partition scheme {table_type}, only MSDOS and GPT are supported') logging.info(f'Setting up partition layout to {table_type}') for part in partlist: logging.info(f'Creating partition {part["partition"]} with {part["code"]} of {int(part["size"])//1024} MiB') if part["code"] == 'EMPTY': continue pa = fdisk.Partition(start_follow_default=True, end_follow_default=False, partno_follow_default=False) parttype = get_parttype(cxt, part["code"]) size = int(part["size"]) pa.size = (size * (1 << 10)) // cxt.sector_size pa.partno = int(part["partition"]) - 1 pa.type = parttype cxt.add_partition(pa) cxt.write_disklabel() os.sync() def setup(self, request, ogRest): table_type = request.getType() disk = int(request.getDisk()) cache = request.getCache() cache_size = request.getCacheSize() partlist = request.getPartitionSetup() self._ogbrowser_clear_logs() self._restartBrowser(self._url_log) umount_all() umount_cache() if disk < 0 or disk > len(get_disks()): raise OgError(f'Invalid disk number {disk}, {len(get_disks())} disks available.') diskname = get_disks()[disk-1] self._partition(diskname, table_type, partlist) ret = subprocess.run(['partprobe', f'/dev/{diskname}']) logging.info(f'first partprobe /dev/{diskname} reports {ret.returncode}') for part in partlist: if part["filesystem"] == 'EMPTY': continue fs = part["filesystem"].lower() logging.info(f'Formatting partition {part["partition"]} with filesystem {part["filesystem"]}') partition = int(part["partition"]) if fs == 'cache': err = mkfs('ext4', disk, partition, label='CACHE') else: err = mkfs(fs, disk, partition) if err != 0: raise OgError(f'Failed to format {part["partition"]} with filesystem {part["filesystem"]}') ret = subprocess.run(['partprobe', f'/dev/{diskname}']) logging.info(f'second partprobe /dev/{diskname} reports {ret.returncode}') for part in partlist: if part["filesystem"] == 'EMPTY': continue fs = part["filesystem"].lower() if fs == 'cache': mount_cache() update_live_cache() logging.info('Partition setup command OK') result = self.refresh(ogRest) self._restartBrowser(self._url) return result def image_restore(self, request, ogRest): disk = int(request.getDisk()) partition = int(request.getPartition()) name = request.getName() repo = request.getRepo() ctype = request.getType() profile = request.getProfile() cid = request.getId() partdev = get_partition_device(disk, partition) mount_cache() self._ogbrowser_clear_logs() self._restartBrowser(self._url_log) logging.info(f'Request to restore image {name}.img via {ctype} from {repo}') if shutil.which('restoreImageCustom'): logging.warning(f'Ignoring restoreImageCustom, use postconfiguration scripts instead.') if 'UNICAST' in ctype: cache = 'DIRECT' not in ctype self._restore_image_unicast(repo, name, partdev, cache) elif ctype == 'TIPTORRENT': self._restore_image_tiptorrent(repo, name, partdev) extend_filesystem(disk, partition) configure_os(disk, partition) self.refresh(ogRest) logging.info('Image restore command OK') json_dict = { 'disk': request.getDisk(), 'partition': request.getPartition(), 'image_id': request.getId(), 'cache': self._get_cache_contents(), } return json_dict def image_create(self, request, ogRest): disk = int(request.getDisk()) partition = int(request.getPartition()) name = request.getName() repo = request.getRepo() backup = request.getBackup() image_path = f'/opt/opengnsys/images/{name}.img' mount_cache() self._ogbrowser_clear_logs() self._restartBrowser(self._url_log) logging.info(f'Request to create image {name}.img at repository {repo}') if ogChangeRepo(repo, smb_user=self._smb_user, smb_pass=self._smb_pass) != 0: self._restartBrowser(self._url) raise OgError(f'Cannot change image repository to {repo}') if ogRest.terminated: return if disk < 0 or disk > len(get_disks()): raise OgError(f'Invalid disk number {disk}, {len(get_disks())} disks available.') diskname = get_disks()[disk-1] partitions = get_partition_data(device=f'/dev/{diskname}') pa = None for p in partitions: if (p.partno + 1) == partition: pa = p if pa is None: self._restartBrowser(self._url) raise OgError(f'Target partition {partition} not found in /dev/{diskname}') padev = pa.padev fstype = pa.fstype if not fstype: raise OgError(f'No filesystem detected in {padev}. Aborting image creation') ro_mountpoint = padev.replace('dev', 'mnt') if not mount_mkdir(padev, ro_mountpoint, readonly=True): raise OgError(f'Cannot mount {ro_mountpoint} as readonly') try: if is_hibernation_enabled(ro_mountpoint): raise OgError(f'Target system in {padev} has hibernation enabled') finally: umount(ro_mountpoint) if change_access(user=self._smb_user, pwd=self._smb_pass) == -1: raise OgError('remount of /opt/opengnsys/images has failed') if os.access(f'/opt/opengnsys/images', os.R_OK | os.W_OK) == False: raise OgError('Cannot access /opt/opengnsys/images in read and write mode, check permissions') if os.access(f'{image_path}', os.R_OK) == True: logging.info(f'image file {image_path} already exists, updating.') copy_windows_efi_bootloader(disk, partition) if ogReduceFs(disk, partition) == -1: raise OgError(f'Failed to shrink {fstype} filesystem in {padev}') cmd1 = shlex.split(f'partclone.{fstype} -I -C --clone -s {padev} -O -') cmd2 = shlex.split(f'lzop -1 -fo {image_path}') logfile = open('/tmp/command.log', 'wb', 0) try: if os.path.exists(image_path) and backup: shutil.move(image_path, f'{image_path}.ant') except OSError as e: raise OgError(f'Cannot create backup for {image_path}: {e}') from e try: p1 = Popen(cmd1, stdout=PIPE, stderr=logfile) p2 = Popen(cmd2, stdin=p1.stdout) p1.stdout.close() logging.info(f'Creating image at {image_path} from {padev} using {fstype}') logging.info('*DO NOT REBOOT OR POWEROFF* the client during this time') try: retdata = p2.communicate() except OSError as e: raise OgError(f'Unexpected error when running partclone and lzop commands: {e}') from e finally: logfile.close() p2.terminate() p1.poll() logging.info(f'partclone process exited with code {p1.returncode}') logging.info(f'lzop process exited with code {p2.returncode}') extend_filesystem(disk, partition) if os.access(f'{image_path}', os.R_OK) == False: raise OgError(f'Cannot access partclone image file {image_path}') image_info = get_image_info(image_path) except Exception as e: if os.path.exists(image_path): os.unlink(image_path) if os.path.exists(f'{image_path}.ant'): shutil.move(f'{image_path}.ant', image_path) self._restartBrowser(self._url) if isinstance(e, OgError): raise OgError(f'Failed to create image for {fstype} filesystem in device {padev}: {e}') from e else: raise logging.info(f'Writing checksum file {name}.img.full.sum...') logging.info('*DO NOT REBOOT OR POWEROFF* the client during this time') checksum = compute_md5(f'/opt/opengnsys/images/{name}.img') if self._write_md5_file(f'/opt/opengnsys/images/{name}.img', checksum) == -1: raise OgError(f'Cannot write {name}.full.sum file') image_info.checksum = checksum self._restartBrowser(self._url) logging.info('Image creation command OK') return image_info def cache_delete(self, request, ogRest): images = request.getImages() deleted_images = [] logging.info(f'Request to remove files from cache') if not mount_cache(): logging.error(f'Cache is not mounted') return img_dir = OG_CACHE_IMAGE_PATH if not os.path.isdir(img_dir): logging.error(f'{img_dir} is not a directory') return cache_contents for file_name in os.listdir(img_dir): file_path = os.path.join(img_dir, file_name) if not os.path.isfile(file_path): continue if not file_name.endswith('.img'): continue if os.sep in file_name: logging.info(f'Detected cache deletion request of filename with a path, ignoring {file_name}') continue if file_name in images: logging.info(f'Removing file {file_path} from cache') os.remove(file_path) csum_path = file_path + '.full.sum' if not os.path.isfile(csum_path): logging.info(f'Missing checksum file {csum_path}') continue logging.info(f'Removing checksum file {csum_path} from cache') os.remove(csum_path) result = {'cache': self._get_cache_contents()} self._restartBrowser(self._url) logging.info('Delete file from cache request OK') return result def cache_fetch(self, request, ogRest): image = request.getImages() repo = request.getRepo() ctype = request.getType() mount_cache() logging.info(f'Request to cache image {image}.img via {ctype} from {repo}') if ctype == 'UNICAST': self._fetch_image_unicast(repo, image) elif ctype == 'TIPTORRENT': self._fetch_image_tiptorrent(repo, image) else: raise OgError(f'Invalid image fetch type {ctype}') logging.info('Cache fetch command OK') result = {'cache': self._get_cache_contents()} return result def refresh(self, ogRest): self._restartBrowser(self._url_log) cache = get_cache_dev_path() disks = get_disks() interface = get_ethernet_interface() link = ethtool(interface) json_body = { 'serial_number': '', 'disk_setup': [], 'partition_setup': [], 'link': link } for num_disk, disk in enumerate(get_disks(), start=1): logging.debug('refresh: processing %s', disk) part_setup = {} try: partitions = get_partition_data(device=f'/dev/{disk}') disk_data = get_disk_data(device=f'/dev/{disk}') except Exception as e: continue self._refresh_payload_disk(disk_data, part_setup, num_disk) json_body['disk_setup'].append(part_setup) for pa in partitions: part_setup = part_setup.copy() self._refresh_payload_partition(disk_data, pa, part_setup, disk) self._refresh_part_setup_cache(pa, part_setup, cache) json_body['partition_setup'].append(part_setup) json_body['cache'] = self._get_cache_contents() boot_entry_data = self._get_boot_entry_data() if boot_entry_data: json_body['efi'] = boot_entry_data generate_menu(json_body['partition_setup']) generate_cache_txt() self._restartBrowser(self._url) logging.info('Sending response to refresh request') return json_body def check_interactive_session_change(self): return False