# # Copyright (C) 2022 Soleta Networks # # This program is free software: you can redistribute it and/or modify it under # the terms of the GNU Affero General Public License as published by the # Free Software Foundation; either version 3 of the License, or # (at your option) any later version. import subprocess import shutil import logging import hivex import shlex from src.log import OgError from src.utils.grub import install_main_grub, install_linux_grub from src.utils.bcd import update_bcd from src.utils.probe import * from src.utils.disk import * from src.utils.winreg import * from src.utils.fs import * from src.utils.uefi import * from src.utils.fstab import * from socket import gethostname CONFIGUREOS_LEGACY_ENABLED = False def set_windows_hostname(disk, partition, name): logging.info(f'Setting Windows hostname to {name}') if len(name) > 15: logging.warning(f'Windows does not permit hostnames that exceed 15 characters. Truncating {name}') name = name[0:15] byte_name = name.encode(WINDOWS_HIVE_ENCODING) device = get_partition_device(disk, partition) mountpoint = device.replace('dev', 'mnt') if not mount_mkdir(device, mountpoint): raise OgError(f'Unable to mount {device} into {mountpoint}') try: hive_path = mountpoint + WINDOWS_HIVE_SYSTEM hive = hive_handler_open(hive_path, write=True) root = hive.root() select_node = get_node_child_from_path(hive, root, 'Select') current_control_set_number = get_value_from_node(hive, select_node, 'Current') control_set = f'ControlSet{current_control_set_number:03}' computer_name_node = get_node_child_from_path(hive, root, f'{control_set}/Control/ComputerName/ComputerName') name_value = {'key': 'ComputerName', 't': RegistryType.SZ.value, 'value': byte_name} hive.node_set_value(computer_name_node, name_value) parameters_node = get_node_child_from_path(hive, root, f'{control_set}/Services/Tcpip/Parameters') hostname_value = {'key': 'HostName', 't': RegistryType.SZ.value, 'value': byte_name} hive.node_set_value(parameters_node, hostname_value) nvhostname_value = {'key': 'NV Hostname', 't': RegistryType.SZ.value, 'value': byte_name} hive.node_set_value(parameters_node, nvhostname_value) hive.commit(hive_path) except Exception as e: raise OgError(f'Unable to set Windows hostname: {e}') from e finally: umount(mountpoint) def set_linux_hostname(disk, partition, name): logging.info(f'Setting Linux hostname to {name}') if len(name) > 64: logging.warning(f'Linux does not permit hostnames that exceed 64 characters. Truncating {name}') name = name[0:64] device = get_partition_device(disk, partition) mountpoint = device.replace('dev', 'mnt') if not mount_mkdir(device, mountpoint): raise OgError(f'Unable to mount {device} into {mountpoint}') try: hostname_path = f'{mountpoint}/etc/hostname' with open(hostname_path, 'w') as f: f.write(name) except OSError as e: raise OgError(f'Unable to set Linux hostname: {e}') from e finally: umount(mountpoint) def configure_os_custom(disk, partition): command_path = shutil.which('configureOsCustom') if not command_path: raise OgError('configureOsCustom not found') logging.info(f'Found configureOsCustom script, invoking it...') cmd_configure = f"{command_path} {disk} {partition}" proc = subprocess.run(shlex.split(cmd_configure), stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, encoding='utf-8', check=True) if proc.returncode != 0: logging.warning(f'{cmd_configure} returned non-zero exit status {proc.returncode}') def windows_register_c_drive(disk, partition): device = get_partition_device(disk, partition) mountpoint = device.replace('dev', 'mnt') if not mount_mkdir(device, mountpoint): raise OgError(f'Unable to mount {device} into {mountpoint}') hive_path = f'{mountpoint}{WINDOWS_HIVE_SYSTEM}' try: hive = hive_handler_open(hive_path, write=True) root = hive.root() device_node = get_node_child_from_path(hive, root, 'MountedDevices') if is_uefi_supported(): part_id = get_part_id_bytes(disk, partition) value = b'DMIO:ID:' + part_id else: disk_id = get_disk_id_bytes(disk) part_offset = get_part_id_bytes(disk, partition) value = bytes(disk_id + part_offset) device_value = {'key': '\DosDevices\C:', 't': RegistryType.BINARY.value, 'value': value} hive.node_set_value(device_node, device_value) hive.commit(hive_path) finally: umount(mountpoint) def configure_mbr_boot_sector(disk, partition): cmd_configure = f"ogFixBootSector {disk} {partition}" proc = subprocess.run(cmd_configure, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, encoding='utf-8', shell=True, check=True) if proc.returncode != 0: logging.warning(f'{cmd_configure} returned non-zero exit status {proc.returncode}') def configure_fstab(disk, partition): logging.info(f'Configuring /etc/fstab') device = get_partition_device(disk, partition) mountpoint = device.replace('dev', 'mnt') if not mount_mkdir(device, mountpoint): raise OgError(f'Unable to mount {device} into {mountpoint}') try: update_fstab(disk, partition, mountpoint) finally: umount(mountpoint) def configure_os_linux(disk, partition): hostname = gethostname() set_linux_hostname(disk, partition, hostname) configure_fstab(disk, partition) install_linux_grub(disk, partition) if is_uefi_supported(): _, _, esp_part_number = get_efi_partition(disk, enforce_gpt=True) install_main_grub() def configure_os_windows(disk, partition): hostname = gethostname() set_windows_hostname(disk, partition, hostname) if is_uefi_supported(): restore_windows_efi_bootloader(disk, partition) _, _, esp_part_number = get_efi_partition(disk, enforce_gpt=True) install_main_grub() else: configure_mbr_boot_sector(disk, partition) update_bcd(disk, partition) windows_register_c_drive(disk, partition) def configure_os_legacy(disk, partition): cmd_configure = f"configureOs {disk} {partition}" try: proc = subprocess.run(cmd_configure, stdout=subprocess.PIPE, encoding='utf-8', shell=True, check=True) out = proc.stdout except OSError as e: raise OgError(f'Error processing configureOs: {e}') from e def configure_os(disk, partition): if CONFIGUREOS_LEGACY_ENABLED: configure_os_legacy(disk, partition) if shutil.which('configureOsCustom'): configure_os_custom(disk, partition) return device = get_partition_device(disk, partition) mountpoint = device.replace('dev', 'mnt') logging.info(f'Configuring OS at {device}...') if not mount_mkdir(device, mountpoint): raise OgError(f'Cannot probe OS family. Unable to mount {device} into {mountpoint}') os_family = get_os_family(mountpoint) umount(mountpoint) if os_family == OSFamily.WINDOWS: configure_os_windows(disk, partition) elif os_family == OSFamily.LINUX: configure_os_linux(disk, partition) if shutil.which('configureOsCustom'): configure_os_custom(disk, partition)