# # Copyright (C) 2023 Soleta Networks # # This program is free software: you can redistribute it and/or modify it under # the terms of the GNU Affero General Public License as published by the # Free Software Foundation; either version 3 of the License, or # (at your option) any later version. import json import logging import os import shlex import subprocess import shutil from src.utils.disk import * from src.utils.fs import * from src.utils.probe import * from src.log import OgError import fdisk EFIBOOTMGR_BIN='/opt/opengnsys/bin/efibootmgr' def _find_bootentry(entries, label): for entry in entries: if entry['description'] == label: return entry else: raise OgError('Boot entry {label} not found') def _strip_boot_prefix(entry): try: num = entry['name'][4:] except: raise OgError('Unable to strip "Boot" prefix from boot entry') return num def _check_efibootmgr_json(): """ Checks if efibootmgr --json option is supported. Returns True if supported, False otherwise. """ supported = True try: subprocess.run([EFIBOOTMGR_BIN, '--json'], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, check=True) except subprocess.CalledProcessError: supported = False return supported def is_uefi_supported(): return os.path.exists("/sys/firmware/efi") def _validate_efibootmgr_json(dict_json): missing_fields = [] if not 'BootOrder' in dict_json: missing_fields.append('BootOrder') if not 'vars' in dict_json: missing_fields.append('vars') for i, entry in enumerate(dict_json.get('vars', [])): if not 'name' in entry: missing_fields.append(f'vars[{i}].name') if not 'active' in entry: missing_fields.append('vars[{i}].active') if not 'description' in entry: missing_fields.append('vars[{i}].description') if missing_fields: raise OgError('Missing required efibootmgr fields: ' + ', '.join(missing_fields)) def run_efibootmgr_json(validate): if _check_efibootmgr_json() is False: raise OgError(f'{EFIBOOTMGR_BIN} not available') proc = subprocess.run([EFIBOOTMGR_BIN, '--json'], capture_output=True, text=True) dict_json = json.loads(proc.stdout) if validate: _validate_efibootmgr_json(dict_json) return dict_json def efibootmgr_bootnext(description): logging.info(f'Setting BootNext to value from boot entry with label {description}') bootnext_cmd = '{efibootmgr} -n {bootnum}' boot_entries = run_efibootmgr_json(validate=False).get('vars', []) entry = _find_bootentry(boot_entries, description) num = _strip_boot_prefix(entry) # efibootmgr output uses BootXXXX for each entry, remove the "Boot" prefix. bootnext_cmd = bootnext_cmd.format(bootnum=num, efibootmgr=EFIBOOTMGR_BIN) subprocess.run(shlex.split(bootnext_cmd), check=True, stdout=subprocess.DEVNULL) def efibootmgr_delete_bootentry(label): dict_json = run_efibootmgr_json(validate=False) efibootmgr_cmd = '{efibootmgr} -q -b {bootnum} -B' for entry in dict_json.get('vars', []): if entry['description'] == label: num = entry['name'][4:] # Remove "Boot" prefix to extract num efibootmgr_cmd = efibootmgr_cmd.format(bootnum=num, efibootmgr=EFIBOOTMGR_BIN) subprocess.run(shlex.split(efibootmgr_cmd), check=True) break else: logging.info(f'Cannot delete boot entry {label} because it was not found.') def efibootmgr_create_bootentry(disk, part, loader, label, add_to_bootorder=True): entries = run_efibootmgr_json(validate=False).get('vars', []) create_opt = '-c' if add_to_bootorder else '-C' index_opt = f'-I {len(entries)}' # Requires efibootmgr version 18 efibootmgr_cmd = f'{EFIBOOTMGR_BIN} {create_opt} {index_opt} -d {disk} -p {part} -L {label} -l {loader}' logging.info(f'{EFIBOOTMGR_BIN} command creating boot entry: {efibootmgr_cmd}') try: proc = subprocess.run(shlex.split(efibootmgr_cmd), check=True, text=True) except OSError as e: raise OgError(f'Unexpected error adding boot entry to nvram. UEFI firmware might be buggy') from e def efibootmgr_set_entry_order(label, position): logging.info(f'Setting {label} entry to position {position} of boot order') boot_info = run_efibootmgr_json(validate=False) boot_entries = boot_info.get('vars', []) boot_order = boot_info.get('BootOrder', []) entry = _find_bootentry(boot_entries, label) target_grub_entry = _strip_boot_prefix(entry) if target_grub_entry in boot_order: boot_order.remove(target_grub_entry) boot_order.insert(position, target_grub_entry) try: proc = subprocess.run([EFIBOOTMGR_BIN, "-o", ",".join(boot_order)], check=True, text=True) except OSError as e: raise OgError(f'Unexpected error setting boot order to NVRAM. UEFI firmware might be buggy') from e def _find_efi_loader(loader_paths): for efi_app in loader_paths: logging.info(f'Searching for {efi_app}') if os.path.exists(efi_app): logging.info(f'Found bootloader at ESP partition: {efi_app}') return efi_app else: raise OgError('Unable to locate any EFI bootloader at ESP') def find_windows_efi_loader(esp_mountpoint, bootlabel): loader_paths = [f'{esp_mountpoint}/EFI/{bootlabel}/Boot/bootmgfw.efi', f'{esp_mountpoint}/EFI/Microsoft/Boot/bootmgfw.efi'] return _find_efi_loader(loader_paths) def find_linux_efi_loader(esp_mountpoint, bootlabel): loader_paths = [f'{esp_mountpoint}/EFI/{bootlabel}/Boot/shimx64.efi', f'{esp_mountpoint}/EFI/ubuntu/shimx64.efi'] return _find_efi_loader(loader_paths) def copy_windows_efi_bootloader(disk, partition): device = get_partition_device(disk, partition) mountpoint = device.replace('dev', 'mnt') if not mount_mkdir(device, mountpoint): raise OgError(f'Cannot probe OS family. Unable to mount {device} into {mountpoint}') os_family = get_os_family(mountpoint) is_uefi = is_uefi_supported() if not is_uefi or os_family != OSFamily.WINDOWS: return bootlabel = f'Part-{disk:02d}-{partition:02d}' esp, esp_disk, esp_part_number = get_efi_partition(disk, enforce_gpt=True) esp_mountpoint = esp.replace('dev', 'mnt') if not mount_mkdir(esp, esp_mountpoint): umount(mountpoint) raise OgError(f'Unable to mount detected EFI System Partition at {esp} into {esp_mountpoint}') try: loader = find_windows_efi_loader(esp_mountpoint, bootlabel) loader_dir = os.path.dirname(loader) destination_dir = f'{mountpoint}/ogBoot' if os.path.exists(destination_dir): try: shutil.rmtree(destination_dir) except OSError as e: raise OgError(f'Failed to delete {destination_dir}: {e}') from e logging.info(f'Copying {loader_dir} into {destination_dir}') try: shutil.copytree(loader_dir, destination_dir) except OSError as e: raise OgError(f'Failed to copy {loader_dir} into {destination_dir}: {e}') from e finally: umount(mountpoint) umount(esp_mountpoint) def restore_windows_efi_bootloader(disk, partition): device = get_partition_device(disk, partition) mountpoint = device.replace('dev', 'mnt') if not mount_mkdir(device, mountpoint): raise OgError(f'Cannot probe OS family. Unable to mount {device} into {mountpoint}') bootlabel = f'Part-{disk:02d}-{partition:02d}' esp, esp_disk, esp_part_number = get_efi_partition(disk, enforce_gpt=True) esp_mountpoint = esp.replace('dev', 'mnt') if not mount_mkdir(esp, esp_mountpoint): umount(mountpoint) raise OgError(f'Unable to mount detected EFI System Partition at {esp} into {esp_mountpoint}') try: loader_dir = f'{mountpoint}/ogBoot' destination_dir = f'{esp_mountpoint}/EFI/{bootlabel}/Boot' if os.path.exists(destination_dir): try: shutil.rmtree(destination_dir) except OSError as e: raise OgError(f'Failed to delete {destination_dir}: {e}') from e logging.info(f'Copying {loader_dir} into {destination_dir}') try: shutil.copytree(loader_dir, destination_dir) except OSError as e: raise OgError(f'Failed to copy {loader_dir} into {destination_dir}: {e}') from e finally: umount(mountpoint) umount(esp_mountpoint)