# # Copyright (C) 2020-2021 Soleta Networks # # This program is free software: you can redistribute it and/or modify it under # the terms of the GNU Affero General Public License as published by the # Free Software Foundation; either version 3 of the License, or # (at your option) any later version. import datetime import hashlib import logging import os import stat import subprocess import shlex import shutil from subprocess import Popen, PIPE import fdisk from src.ogRest import ThreadState from src.live.partcodes import GUID_MAP from src.live.parttypes import get_parttype from src.utils.legacy import * from src.utils.net import ethtool from src.utils.menu import generate_menu from src.utils.fs import * from src.utils.probe import os_probe, cache_probe from src.utils.disk import * from src.utils.cache import generate_cache_txt, umount_cache, init_cache from src.utils.tiptorrent import * from src.utils.uefi import * from src.utils.boot import * from src.utils.sw_inventory import get_package_set from src.utils.hw_inventory import get_hardware_inventory, legacy_list_hardware_inventory OG_SHELL = '/bin/bash' OG_CACHE_PATH = '/opt/opengnsys/cache/opt/opengnsys/images' class OgLiveOperations: def __init__(self, config): self._url = config['opengnsys']['url'] self._url_log = config['opengnsys']['url_log'] self._smb_user = config['samba']['user'] self._smb_pass = config['samba']['pass'] def _restartBrowser(self, url): try: proc = subprocess.call(["pkill", "-9", "browser"]) proc = subprocess.Popen(["browser", "-qws", url]) except Exception as e: raise RuntimeError('Cannot restart browser') from e def _refresh_payload_disk(self, cxt, part_setup, num_disk): part_setup['disk'] = str(num_disk) part_setup['disk_type'] = 'DISK' part_setup['partition'] = '0' part_setup['filesystem'] = '' part_setup['os'] = '' part_setup['size'] = str(cxt.nsectors * cxt.sector_size // 1024) part_setup['used_size'] = '0' if not cxt.label: part_setup['code'] = '0' else: part_setup['code'] = '2' if cxt.label.name == 'gpt' else '1' def _refresh_payload_partition(self, cxt, pa, part_setup, disk): parttype = cxt.partition_to_string(pa, fdisk.FDISK_FIELD_TYPEID) fstype = cxt.partition_to_string(pa, fdisk.FDISK_FIELD_FSTYPE) padev = cxt.partition_to_string(pa, fdisk.FDISK_FIELD_DEVICE) size = cxt.partition_to_string(pa, fdisk.FDISK_FIELD_SIZE) partnum = pa.partno + 1 source = padev target = padev.replace('dev', 'mnt') if cxt.label.name == 'gpt': code = GUID_MAP.get(parttype, 0x0) else: code = int(parttype, base=16) if mount_mkdir(source, target): probe_result = os_probe(target) part_setup['os'] = probe_result part_setup['used_size'] = get_usedperc(target) umount(target) else: part_setup['os'] = '' part_setup['used_size'] = '0' part_setup['disk_type'] = '' part_setup['partition'] = str(partnum) part_setup['filesystem'] = fstype.upper() if fstype else 'EMPTY' # part_setup['code'] = hex(code).removeprefix('0x') part_setup['code'] = hex(code)[2:] part_setup['size'] = str(int(size) // 1024) if (part_setup['filesystem'] == 'VFAT'): part_setup['filesystem'] = 'FAT32' def _refresh_part_setup_cache(self, cxt, pa, part_setup, cache): padev = cxt.partition_to_string(pa, fdisk.FDISK_FIELD_DEVICE) if padev == cache: part_setup['filesystem'] = 'CACHE' part_setup['code'] = 'ca' def _compute_md5(self, path, bs=2**20): m = hashlib.md5() with open(path, 'rb') as f: while True: buf = f.read(bs) if not buf: break m.update(buf) return m.hexdigest() def _write_md5_file(self, path): if not os.path.exists(path): raise ValueError(f'Invalid image path {path} when computing md5 checksum') filename = path + ".full.sum" dig = self._compute_md5(path) try: with open(filename, 'w') as f: f.write(dig) except: logging.error(f'cannot write {filename}') return -1 return 0 def _copy_image_to_cache(self, image_name): """ Copies /opt/opengnsys/image/{image_name} into /opt/opengnsys/cache/opt/opengnsys/images/ Implies a unicast transfer. Does not use tiptorrent. """ src = f'/opt/opengnsys/images/{image_name}.img' dst = f'/opt/opengnsys/cache/opt/opengnsys/images/{image_name}.img' try: r = shutil.copy(src, dst) tip_write_csum(image_name) except Exception as e: raise RuntimeError(f'Error copying image {image_name} to cache. Reported: {e}') from e def _restore_image_unicast(self, repo, name, devpath, cache=False): if ogChangeRepo(repo, smb_user=self._smb_user, smb_pass=self._smb_pass) != 0: self._restartBrowser(self._url) raise ValueError(f'Cannot change repository to {repo}') logging.debug(f'restore_image_unicast: name => {name}') if cache: image_path = f'/opt/opengnsys/cache/opt/opengnsys/images/{name}.img' if (not os.path.exists(image_path) or not tip_check_csum(repo, name)): self._copy_image_to_cache(name) else: image_path = f'/opt/opengnsys/images/{name}.img' self._restore_image(image_path, devpath) def _restore_image_tiptorrent(self, repo, name, devpath): if not os.path.exists(OG_CACHE_PATH): raise RuntimeError('No cache partition is mounted') image_path = f'/opt/opengnsys/cache/opt/opengnsys/images/{name}.img' try: if (not os.path.exists(image_path) or not tip_check_csum(repo, name)): tip_client_get(repo, name) except: self._restartBrowser(self._url) if (not os.path.exists(image_path)): raise RuntimeError(f'Image file {image_path} does not exist') if (not tip_check_csum(repo, name)): raise RuntimeError(f'checksum file {name}.full.sum is missing in repository {repo}') raise RuntimeError(f'Unexpected error when restoring image file {image_path}') self._restore_image(image_path, devpath) def _restore_image(self, image_path, devpath): logging.debug(f'Restoring image at {image_path} into {devpath}') logging.debug(f'This process can take some time, please *DO NOT SHUT DOWN OR REBOOT* this client') cmd_lzop = shlex.split(f'lzop -dc {image_path}') cmd_pc = shlex.split(f'partclone.restore -d0 -C -I -o {devpath}') if not os.path.exists(image_path): raise RuntimeError(f'Image not found at {image_path} during image restore') proc_lzop = subprocess.Popen(cmd_lzop, stdout=subprocess.PIPE, stderr=subprocess.PIPE) proc_pc = subprocess.Popen(cmd_pc, stdin=proc_lzop.stdout, stderr=subprocess.PIPE) pc_stderr = proc_pc.communicate()[1] lzop_stderr = proc_lzop.stderr.read() proc_lzop.poll() # update returncode with open('/tmp/command.log', 'wb', 0) as logfile: logfile.write(lzop_stderr) logfile.write(pc_stderr) if proc_lzop.returncode != 0: raise OSError(f'lzop subprocess failed: {lzop_stderr.decode("utf-8")}') if proc_pc.returncode != 0: raise OSError(f'partclone subprocess failed: {pc_stderr.decode("utf-8")}') logging.info('Image restore successful') def _ogbrowser_clear_logs(self): logfiles = ['/tmp/command.log', '/tmp/session.log'] for logfile in logfiles: with open(logfile, 'wb', 0) as f: f.truncate(0) def _poweroff_oglive(self, operation='poweroff'): interface = os.getenv('DEVICE') cmd_ethtool = shlex.split(f'ethtool -s {interface} wol g') cmd_browser = shlex.split('pkill -9 browser') if not shutil.which('busyboxOLD'): busybox = 'busybox' else: busybox = shutil.which('busyboxOLD') cmd_busybox = shlex.split(f'{busybox} {operation}') umount_all() umount_cache() if subprocess.run(cmd_ethtool).returncode != 0: logging.error('Error running ethtool subprocess') if subprocess.run(cmd_browser).returncode != 0: logging.error('Error terminating ogBrowser process') if subprocess.run(cmd_busybox) != 0: logging.error('Error running "busybox poweroff" subprocess') def poweroff(self): logging.info('Powering off client') self._poweroff_oglive() def reboot(self): logging.info('Rebooting client') self._poweroff_oglive(operation='reboot') def shellrun(self, request, ogRest): cmd = request.getrun() cmds = cmd.split(";|\n\r") self._restartBrowser(self._url_log) try: ogRest.proc = subprocess.Popen(cmds, stdout=subprocess.PIPE, shell=True, executable=OG_SHELL) (output, error) = ogRest.proc.communicate() except Exception as e: raise RuntimeError(f'Error when running "shell run" subprocess: {e}') from e if ogRest.proc.returncode != 0: logging.warn('Non zero exit code when running: %s', ' '.join(cmds)) else: logging.info('Shell run command OK') self.refresh(ogRest) return output.decode('utf-8') def session(self, request, ogRest): disk = request.getDisk() partition = request.getPartition() boot_os_at(int(disk), int(partition)) self.reboot() def software(self, request, ogRest): disk = request.getDisk() partition = request.getPartition() partdev = get_partition_device(int(disk), int(partition)) mountpoint = partdev.replace('dev', 'mnt') if not mount_mkdir(partdev, mountpoint): raise RuntimeError(f'Error mounting {partdev} at {mountpoint}') if not os.path.ismount(mountpoint): raise RuntimeError(f'Invalid mountpoint {mountpoint} for software inventory') self._restartBrowser(self._url_log) pkgset = get_package_set(mountpoint) self._restartBrowser(self._url) umount(mountpoint) logging.info('Software inventory command OK') # Software inventory result is still processed by legacy server code # (ogAdmServer.c). Legacy response format is string where each # software package is separated by a newline '\n'. # Each package/software line follows this format: # "{package_name} {package_version}" return '\n'.join(map(str,pkgset)) def hardware(self, ogRest): self._restartBrowser(self._url_log) logging.info('Running hardware inventory command') try: inventory = get_hardware_inventory() except Exception as e: raise RuntimeError(f'Error while running hardware inventory. {e}') from e finally: self._restartBrowser(self._url) result = legacy_list_hardware_inventory(inventory) logging.info('Successful hardware inventory command execution') return result def setup(self, request, ogRest): table_type = request.getType() disk = request.getDisk() cache = request.getCache() cache_size = request.getCacheSize() partlist = request.getPartitionSetup() self._ogbrowser_clear_logs() self._restartBrowser(self._url_log) diskname = get_disks()[int(disk)-1] cxt = fdisk.Context(f'/dev/{diskname}', details=True) if table_type == 'MSDOS': cxt.create_disklabel('dos') elif table_type == 'GPT': cxt.create_disklabel('gpt') else: raise ValueError(f'Unsupported partition scheme {table_type}, only MSDOS and GPT are supported') logging.info(f'Setting up partition layout to {table_type}') for part in partlist: logging.info(f'Creating partition {part["partition"]} with {part["code"]} of {int(part["size"])//1024} Mbytes') if part["code"] == 'EMPTY': continue if ogRest.terminated: break if part["code"] == 'CACHE': umount_cache() pa = fdisk.Partition(start_follow_default=True, end_follow_default=False, partno_follow_default=False) parttype = get_parttype(cxt, part["code"]) size = int(part["size"]) pa.size = (size * (1 << 10)) // cxt.sector_size pa.partno = int(part["partition"]) - 1 pa.type = parttype cxt.add_partition(pa) cxt.write_disklabel() subprocess.run(['partprobe', f'/dev/{diskname}']) for part in partlist: if part["filesystem"] == 'EMPTY': continue fs = part["filesystem"].lower() logging.info(f'Formatting partition {part["partition"]} with filesystem {part["filesystem"]}') partition = int(part["partition"]) if fs == 'cache': mkfs('ext4', int(disk), partition, label='CACHE') init_cache() else: mkfs(fs, int(disk), partition) logging.info('Setup command OK') result = self.refresh(ogRest) self._restartBrowser(self._url) return result def image_restore(self, request, ogRest): disk = request.getDisk() partition = request.getPartition() name = request.getName() repo = request.getRepo() ctype = request.getType() profile = request.getProfile() cid = request.getId() partdev = get_partition_device(int(disk), int(partition)) self._ogbrowser_clear_logs() self._restartBrowser(self._url_log) logging.debug('Image restore params:') logging.debug(f'\tname: {name}') logging.debug(f'\trepo: {repo}') logging.debug(f'\tprofile: {profile}') logging.debug(f'\tctype: {ctype}') if shutil.which('restoreImageCustom'): restoreImageCustom(repo, name, disk, partition, ctype) elif 'UNICAST' in ctype: cache = 'DIRECT' not in ctype self._restore_image_unicast(repo, name, partdev, cache) elif ctype == 'TIPTORRENT': self._restore_image_tiptorrent(repo, name, partdev) output = configureOs(disk, partition) self.refresh(ogRest) logging.info('Image restore command OK') return output def image_create(self, request, ogRest): disk = int(request.getDisk()) partition = int(request.getPartition()) name = request.getName() repo = request.getRepo() backup = request.getBackup() image_path = f'/opt/opengnsys/images/{name}.img' self._ogbrowser_clear_logs() self._restartBrowser(self._url_log) if ogChangeRepo(repo, smb_user=self._smb_user, smb_pass=self._smb_pass) != 0: self._restartBrowser(self._url) raise RuntimeError(f'Cannot change image repository to {repo}') if ogRest.terminated: return try: diskname = get_disks()[disk-1] cxt = fdisk.Context(f'/dev/{diskname}', details=True) pa = None for i, p in enumerate(cxt.partitions): if (p.partno + 1) == partition: pa = cxt.partitions[i] if pa is None: self._restartBrowser(self._url) raise RuntimeError(f'Target partition /dev/{diskname} not found') padev = cxt.partition_to_string(pa, fdisk.FDISK_FIELD_DEVICE) fstype = cxt.partition_to_string(pa, fdisk.FDISK_FIELD_FSTYPE) if not fstype: raise RuntimeError(f'No filesystem detected in {padev}. Aborting image creation') if change_access(user=self._smb_user, pwd=self._smb_pass) == -1: raise RuntimeError('remount of /opt/opengnsys/images has failed') if os.access(f'/opt/opengnsys/images', os.R_OK | os.W_OK) == False: raise RuntimeError('Cannot access /opt/opengnsys/images in read and write mode, check permissions') if os.access(f'{image_path}', os.R_OK) == True: logging.info(f'image file {image_path} already exists, updating.') copy_windows_efi_bootloader(disk, partition) if ogReduceFs(disk, partition) == -1: raise ValueError(f'Failed to shrink {fstype} filesystem in {padev}') if os.path.exists(image_path) and backup: shutil.move(image_path, f'{image_path}.ant') logging.info(f'Creating image at {image_path} from {padev} using {fstype}') logging.info('*DO NOT REBOOT OR POWEROFF* the client during this time') cmd_pc = shlex.split(f'partclone.{fstype} -I -C --clone -s {padev} -O -') cmd_lzop = shlex.split(f'lzop -1 -fo {image_path}') proc_pc = subprocess.Popen(cmd_pc, stdout=subprocess.PIPE, stderr=subprocess.PIPE) proc_lzop = subprocess.Popen(cmd_lzop, stdin=proc_pc.stdout, stderr=subprocess.PIPE) lzop_stderr = proc_lzop.communicate()[1] pc_stderr = proc_pc.stderr.read() proc_pc.poll() # update returncode with open('/tmp/command.log', 'wb', 0) as logfile: logfile.write(pc_stderr) logfile.write(lzop_stderr) if proc_pc.returncode != 0: raise OSError(f'partclone subprocess failed: {pc_stderr.decode("utf-8")}') if proc_lzop.returncode != 0: raise OSError(f'lzop subprocess failed: {lzop_stderr.decode("utf-8")}') logging.info('Image creation successful') ogExtendFs(disk, partition) if os.access(f'{image_path}', os.R_OK) == False: raise RuntimeError(f'Cannot access partclone image file {image_path}') image_info = ogGetImageInfo(image_path) except Exception as e: if os.path.exists(image_path): os.unlink(image_path) if os.path.exists(f'{image_path}.ant'): shutil.move(f'{image_path}.ant', image_path) self._restartBrowser(self._url) raise RuntimeError(f'Failed to create image for {fstype} filesystem in device {padev}: {e}') from e try: st = os.stat(image_path) size = st.st_size perms = st.st_mode & (stat.S_IRWXU | stat.S_IRWXG | stat.S_IRWXO) mtime = int(st.st_mtime) except: logging.info(f'cannot retrieve stats from {image_path}') size = 0 perms = 0 mtime = 0 image_info.size = size image_info.perms = perms image_info.mtime = mtime if self._write_md5_file(f'/opt/opengnsys/images/{name}.img') == -1: raise ValueError(f'Cannot write {name}.full.sum file') self._restartBrowser(self._url) logging.info('Image creation command OK') return image_info def refresh(self, ogRest): self._restartBrowser(self._url_log) cache = cache_probe() disks = get_disks() interface = os.getenv('DEVICE') link = ethtool(interface) parsed = { 'serial_number': '', 'disk_setup': [], 'partition_setup': [], 'link': link } for num_disk, disk in enumerate(get_disks(), start=1): logging.debug('refresh: processing %s', disk) part_setup = {} try: cxt = fdisk.Context(device=f'/dev/{disk}', details=True) except: continue self._refresh_payload_disk(cxt, part_setup, num_disk) parsed['disk_setup'].append(part_setup) for pa in cxt.partitions: part_setup = part_setup.copy() self._refresh_payload_partition(cxt, pa, part_setup, disk) self._refresh_part_setup_cache(cxt, pa, part_setup, cache) parsed['partition_setup'].append(part_setup) generate_menu(parsed['partition_setup']) generate_cache_txt() self._restartBrowser(self._url) logging.info('Sending response to refresh request') return parsed def probe(self, ogRest): interface = os.getenv('DEVICE') speed = ethtool(interface) return {'status': 'OPG' if ogRest.state != ThreadState.BUSY else 'BSY', 'speed': speed}