# # Copyright (C) 2020-2024 Soleta Networks # # This program is free software: you can redistribute it and/or modify it under # the terms of the GNU Affero General Public License as published by the # Free Software Foundation; either version 3 of the License, or # (at your option) any later version. import hivex from enum import Enum from src.utils.uefi import is_uefi_supported from src.utils.winreg import * from src.utils.disk import * from src.utils.fs import mount_mkdir, umount class BCDEntryType(Enum): BOOT_MANAGER = 'Windows Boot Manager' MEM_DIAGNOSTIC = 'Windows Memory Diagnostic' RESUME_HIBERNATION = 'Resume from Hibernation' BOOT_LOADER_OS = 'Windows Boot Loader OS' BOOT_LOADER_RECOVERY = 'Windows Boot Loader Recovery' WINDOWS_RECOVERY = 'Windows Recovery' UNKNOWN = 'Unknown' known_guids = { '{9dea862c-5cdd-4e70-acc1-f32b344d4795}': BCDEntryType.BOOT_MANAGER, '{b2721d73-1db4-4c62-bf78-c548a880142d}': BCDEntryType.MEM_DIAGNOSTIC, } special_entries = { (1, 2, 3): BCDEntryType.BOOT_LOADER_OS, (1, 2, 4): BCDEntryType.RESUME_HIBERNATION, (3, 0, 0): BCDEntryType.WINDOWS_RECOVERY, } class BCDEntryElement(Enum): DEVICE = '11000001' DESCRIPTION = '12000004' OS_DEVICE = '21000001' RAMDISK_DEVICE = '31000003' RECOVERY_ENABLED = '16000009' PART_BCD_OFFSET = 32 DISK_BCD_OFFSET = 56 PART_RECOVERY_BCD_OFFSET = 84 DISK_RECOVERY_BCD_OFFSET = 108 def _int_to_tuple(value): X1_MASK = 0xf0000000 X2_MASK = 0x00f00000 X3_MASK = 0x000fffff x1 = (X1_MASK & value) >> 28 x2 = (X2_MASK & value) >> 20 x3 = X3_MASK & value return (x1, x2, x3) def _bootloader_entry_is_recovery(hive, entry_node): element_name_node = get_node_child_from_path(hive, entry_node, f'Elements/{BCDEntryElement.DESCRIPTION.value}') v = get_value_from_node(hive, element_name_node, 'Element') return v == 'Windows Recovery Environment' def _match_node_with_type_data(hive, entry_node): description_node = get_node_child(hive, entry_node, 'Description') val = get_value_from_node(hive, description_node, 'Type') if isinstance(val, int): val = _int_to_tuple(val) if val in special_entries: entry_type = special_entries[val] if _bootloader_entry_is_recovery(hive, entry_node): entry_type = BCDEntryType.BOOT_LOADER_RECOVERY return entry_type return BCDEntryType.UNKNOWN def disable_recovery_field(hive, entry_node): try: recovery_node = get_node_child_from_path(hive, entry_node, f'Elements/{BCDEntryElement.RECOVERY_ENABLED.value}') except: logging.info("no Recovery entry is found, skipping") return recovery_value = {'key': 'Element', 't': RegistryType.BINARY.value, 'value': b'\x00'} hive.node_set_value(recovery_node, recovery_value) def entry_node_to_bcd_entry_type(hive, entry_node): node_name = hive.node_name(entry_node) if node_name in known_guids: return known_guids[node_name] return _match_node_with_type_data(hive, entry_node) def bytes_replace_at(data, new_data, index): l = list(data) l[index:(index+len(new_data))] = list(new_data) return bytes(l) def update_element_entry(hive, entry_node, element_name, disk_id, disk_offset, part_id, part_offset): device_node = get_node_child_from_path(hive, entry_node, f'Elements/{element_name}') v = get_value_from_node(hive, device_node, 'Element') v = bytes_replace_at(v, disk_id, disk_offset) v = bytes_replace_at(v, part_id, part_offset) device_value = {'key': 'Element', 't': RegistryType.BINARY.value, 'value': v} hive.node_set_value(device_node, device_value) def _update_hive_entries(bcd_path, disk_id, part_id, boot_part_id): logging.info(f'Processing BCD at {bcd_path}') hive = hive_handler_open(bcd_path, write = True) root_node = hive.root() disk_id_bytes = uuid_to_bytes(disk_id) part_id_bytes = uuid_to_bytes(part_id) boot_part_id_bytes = uuid_to_bytes(boot_part_id) objects_node = get_node_child(hive, root_node, 'Objects') for entry_node in hive.node_children(objects_node): node_name = hive.node_name(entry_node) entry_type = entry_node_to_bcd_entry_type(hive, entry_node) if entry_type == BCDEntryType.RESUME_HIBERNATION or entry_type == BCDEntryType.BOOT_LOADER_OS: logging.info(f'Editing BCD entry {entry_type.value}') update_element_entry(hive, entry_node, element_name = BCDEntryElement.DEVICE.value, disk_id = disk_id_bytes, disk_offset = DISK_BCD_OFFSET, part_id = part_id_bytes, part_offset = PART_BCD_OFFSET) update_element_entry(hive, entry_node, element_name = BCDEntryElement.OS_DEVICE.value, disk_id = disk_id_bytes, disk_offset = DISK_BCD_OFFSET, part_id = part_id_bytes, part_offset = PART_BCD_OFFSET) disable_recovery_field(hive, entry_node) elif entry_type == BCDEntryType.BOOT_LOADER_RECOVERY: logging.info(f'Editing BCD entry {entry_type.value}') update_element_entry(hive, entry_node, element_name = BCDEntryElement.DEVICE.value, disk_id = disk_id_bytes, disk_offset = DISK_RECOVERY_BCD_OFFSET, part_id = part_id_bytes, part_offset = PART_RECOVERY_BCD_OFFSET) update_element_entry(hive, entry_node, element_name = BCDEntryElement.OS_DEVICE.value, disk_id = disk_id_bytes, disk_offset = DISK_RECOVERY_BCD_OFFSET, part_id = part_id_bytes, part_offset = PART_RECOVERY_BCD_OFFSET) elif entry_type == BCDEntryType.WINDOWS_RECOVERY: logging.info(f'Editing BCD entry {entry_type.value}') update_element_entry(hive, entry_node, element_name = BCDEntryElement.RAMDISK_DEVICE.value, disk_id = disk_id_bytes, disk_offset = DISK_BCD_OFFSET, part_id = part_id_bytes, part_offset = PART_BCD_OFFSET) elif entry_type == BCDEntryType.BOOT_MANAGER: logging.info(f'Editing BCD entry {entry_type.value}') update_element_entry(hive, entry_node, element_name = BCDEntryElement.DEVICE.value, disk_id = disk_id_bytes, disk_offset = DISK_BCD_OFFSET, part_id = boot_part_id_bytes, part_offset = PART_BCD_OFFSET) elif entry_type == BCDEntryType.MEM_DIAGNOSTIC: logging.info(f'Editing BCD entry {entry_type.value}') update_element_entry(hive, entry_node, element_name = BCDEntryElement.DEVICE.value, disk_id = disk_id_bytes, disk_offset = DISK_BCD_OFFSET, part_id = boot_part_id_bytes, part_offset = PART_BCD_OFFSET) hive.commit(bcd_path) def update_bcd(disk, part): disk_id = get_disk_id(disk) part_id = get_partition_id(disk, part) uefi_enabled = is_uefi_supported() if uefi_enabled: esp, _esp_disk, _esp_part_number = get_efi_partition(disk, enforce_gpt=True) mountpoint = esp.replace('dev', 'mnt') if not mount_mkdir(esp, mountpoint): raise OgError(f'Unable to mount detected EFI System Partition at {esp} into {mountpoint}') _bootlabel = f'Part-{disk:02d}-{part:02d}' bcd_path = f'{mountpoint}/EFI/{_bootlabel}/Boot/BCD' boot_part_id = get_partition_id(disk, _esp_part_number) else: logging.info(f'MBR system detected, skipping BCD update') return try: _update_hive_entries(bcd_path, disk_id, part_id, boot_part_id) finally: umount(mountpoint)