# # Copyright (C) 2022 Soleta Networks # # This program is free software: you can redistribute it and/or modify it under # the terms of the GNU Affero General Public License as published by the # Free Software Foundation; either version 3 of the License, or # (at your option) any later version. import subprocess import shutil import logging import hivex from src.log import OgError from src.utils.bcd import update_bcd from src.utils.probe import * from src.utils.disk import * from src.utils.winreg import * from src.utils.fs import * from src.utils.uefi import * from socket import gethostname CONFIGUREOS_LEGACY_ENABLED = False def set_windows_hostname(disk, partition, name): logging.info(f'Setting Windows hostname to {name}') if len(name) > 15: logging.warning(f'Windows does not permit hostnames that exceed 15 characters. Truncating {name}') name = name[0:15] byte_name = name.encode(WINDOWS_HIVE_ENCODING) device = get_partition_device(disk, partition) mountpoint = device.replace('dev', 'mnt') if not mount_mkdir(device, mountpoint): raise OgError(f'Unable to mount {device} into {mountpoint}') try: hive_path = mountpoint + WINDOWS_HIVE_SYSTEM hive = hive_handler_open(hive_path, write=True) root = hive.root() select_node = get_node_child_from_path(hive, root, 'Select') current_control_set_number = get_value_from_node(hive, select_node, 'Current') control_set = f'ControlSet{current_control_set_number:03}' computer_name_node = get_node_child_from_path(hive, root, f'{control_set}/Control/ComputerName/ComputerName') name_value = {'key': 'ComputerName', 't': RegistryType.SZ.value, 'value': byte_name} hive.node_set_value(computer_name_node, name_value) parameters_node = get_node_child_from_path(hive, root, f'{control_set}/Services/Tcpip/Parameters') hostname_value = {'key': 'HostName', 't': RegistryType.SZ.value, 'value': byte_name} hive.node_set_value(parameters_node, hostname_value) nvhostname_value = {'key': 'NV Hostname', 't': RegistryType.SZ.value, 'value': byte_name} hive.node_set_value(parameters_node, nvhostname_value) hive.commit(hive_path) except Exception as e: raise OgError(f'Unable to set Windows hostname: {e}') from e finally: umount(mountpoint) def set_linux_hostname(disk, partition, name): logging.info(f'Setting Linux hostname to {name}') if len(name) > 64: logging.warning(f'Linux does not permit hostnames that exceed 64 characters. Truncating {name}') name = name[0:64] device = get_partition_device(disk, partition) mountpoint = device.replace('dev', 'mnt') if not mount_mkdir(device, mountpoint): raise OgError(f'Unable to mount {device} into {mountpoint}') try: hostname_path = f'{mountpoint}/etc/hostname' with open(hostname_path, 'w') as f: f.write(name) except OSError as e: raise OgError(f'Unable to set Linux hostname: {e}') from e finally: umount(mountpoint) def configure_os_custom(disk, partition): if not shutil.which('configureOsCustom'): raise OgError('configureOsCustom not found') logging.info(f'Found configureOsCustom script, invoking it...') cmd_configure = f"configureOsCustom {disk} {partition}" try: proc = subprocess.run(cmd_configure, stdout=subprocess.PIPE, encoding='utf-8', shell=True, check=True) out = proc.stdout except OSError as e: raise OgError(f'Error processing configureOsCustom: {e}') from e def windows_register_c_drive(disk, partition): cmd_configure = f"ogWindowsRegisterPartition {disk} {partition} C {disk} {partition}" try: proc = subprocess.run(cmd_configure, stdout=subprocess.PIPE, encoding='utf-8', shell=True, check=True) except OSError as e: raise OgError(f'Error processing ogWindowsRegisterPartition: {e}') from e def configure_mbr_boot_sector(disk, partition): cmd_configure = f"ogFixBootSector {disk} {partition}" try: proc = subprocess.run(cmd_configure, stdout=subprocess.PIPE, encoding='utf-8', shell=True, check=True) except OSError as e: raise OgError(f'Error processing ogFixBootSector: {e}') from e def configure_grub_in_mbr(disk, partition): cmd_configure = f"ogGrubInstallMbr {disk} {partition} TRUE" try: proc = subprocess.run(cmd_configure, stdout=subprocess.PIPE, encoding='utf-8', shell=True, check=True) except OSError as e: raise OgError(f'Error processing ogGrubInstallMbr: {e}') from e def configure_fstab(disk, partition): cmd_configure = f"ogConfigureFstab {disk} {partition}" try: proc = subprocess.run(cmd_configure, stdout=subprocess.PIPE, encoding='utf-8', shell=True, check=True) except OSError as e: raise OgError(f'Error processing ogConfigureFstab: {e}') from e def install_grub(disk, partition): cmd_configure = f"ogGrubInstallPartition {disk} {partition}" try: proc = subprocess.run(cmd_configure, stdout=subprocess.PIPE, encoding='utf-8', shell=True, check=True) except OSError as e: raise OgError(f'Error processing ogGrubInstallPartition: {e}') from e def configure_os_linux(disk, partition): hostname = gethostname() set_linux_hostname(disk, partition, hostname) configure_fstab(disk, partition) if is_uefi_supported(): _, _, esp_part_number = get_efi_partition(disk, enforce_gpt=True) configure_grub_in_mbr(disk, esp_part_number) install_grub(disk, partition) def configure_os_windows(disk, partition): hostname = gethostname() set_windows_hostname(disk, partition, hostname) if is_uefi_supported(): restore_windows_efi_bootloader(disk, partition) _, _, esp_part_number = get_efi_partition(disk, enforce_gpt=True) configure_grub_in_mbr(disk, esp_part_number) else: configure_mbr_boot_sector(disk, partition) update_bcd(disk, partition) windows_register_c_drive(disk, partition) def configure_os_legacy(disk, partition): cmd_configure = f"configureOs {disk} {partition}" try: proc = subprocess.run(cmd_configure, stdout=subprocess.PIPE, encoding='utf-8', shell=True, check=True) out = proc.stdout except OSError as e: raise OgError(f'Error processing configureOs: {e}') from e def configure_os(disk, partition): if CONFIGUREOS_LEGACY_ENABLED: configure_os_legacy(disk, partition) if shutil.which('configureOsCustom'): configure_os_custom(disk, partition) return device = get_partition_device(disk, partition) mountpoint = device.replace('dev', 'mnt') if not mount_mkdir(device, mountpoint): raise OgError(f'Cannot probe OS family. Unable to mount {device} into {mountpoint}') os_family = get_os_family(mountpoint) umount(mountpoint) if os_family == OSFamily.WINDOWS: configure_os_windows(disk, partition) elif os_family == OSFamily.LINUX: configure_os_linux(disk, partition) if shutil.which('configureOsCustom'): configure_os_custom(disk, partition)