# # Copyright (C) 2022 Soleta Networks # # This program is free software: you can redistribute it and/or modify it under # the terms of the GNU Affero General Public License as published by the # Free Software Foundation; either version 3 of the License, or # (at your option) any later version. import os import logging import shlex import subprocess import json from src.log import OgError import fdisk class ogPartitionData: def __init__(self, parttype, fstype, padev, size, partno): self.parttype = parttype self.fstype = fstype self.padev = padev self.size = size self.partno = partno class ogDiskData: def __init__(self, nsectors, sector_size, label_name): self.nsectors = nsectors self.sector_size = sector_size self.label_name = label_name def get_partition_data(device): res = [] try: cxt = fdisk.Context(device=device, details=True) except Exception as e: raise OgError(f'Partition query error: {e}') from e for i, p in enumerate(cxt.partitions): pd = ogPartitionData( parttype = cxt.partition_to_string(p, fdisk.FDISK_FIELD_TYPEID), fstype = cxt.partition_to_string(p, fdisk.FDISK_FIELD_FSTYPE), padev = cxt.partition_to_string(p, fdisk.FDISK_FIELD_DEVICE), size = cxt.partition_to_string(p, fdisk.FDISK_FIELD_SIZE), partno = p.partno) res.append(pd) return res def get_disk_data(device): try: cxt = fdisk.Context(device=device, details=True) except Exception as e: raise OgError(f'Partition query error: {e}') from e return ogDiskData( nsectors = cxt.nsectors, sector_size = cxt.sector_size, label_name = cxt.label.name if cxt.label else "") def get_disks(): """ Walks /sys/block/ and returns files starting with 'sd', 'nvme' or 'vd' """ return sorted([ dev for dev in os.listdir('/sys/block/') if dev.startswith('sd') or dev.startswith('nvme') or dev.startswith('vd')]) def get_partition_device(disknum, partnum): """ Returns the device path, given a disk and partition number """ disk_index = disknum - 1 if disk_index < 0 or disk_index >= len(get_disks()): raise OgError(f'Invalid disk number {disknum}, {len(get_disks())} disks available.') disk = get_disks()[disk_index] cxt = fdisk.Context(f'/dev/{disk}') for pa in cxt.partitions: if pa.partno == partnum - 1: return cxt.partition_to_string(pa, fdisk.FDISK_FIELD_DEVICE) raise OgError(f'No such partition with disk index {disknum} and partition index {partnum}') def get_efi_partition(disknum, enforce_gpt): """ Look for an EFI System Partition at the n-th disk. If disknum is invalid an exception is thrown. If enforce_gpt is set to True the ESP will be ignored in a MBR partition scheme. Returns tuple with: - Device name containing the ESP - /dev/{device} string - Partition number (starting at 1) """ disk_index = disknum - 1 if disk_index < 0 or disk_index >= len(get_disks()): raise OgError(f'Invalid disk number {disknum} when trying to find EFI partition, {len(get_disks())} disks available.') disk = get_disks()[disk_index] cxt = fdisk.Context(f'/dev/{disk}') if enforce_gpt and cxt.label == fdisk.FDISK_DISKLABEL_DOS: raise OgError(f'Windows EFI partition requires GPT partition scheme, but /dev/{disk} has DOS partition scheme') logging.info('Searching EFI partition...') for pa in cxt.partitions: if pa.type.name == 'EFI System': logging.info(f'EFI partition found at /dev/{disk}') return cxt.partition_to_string(pa, fdisk.FDISK_FIELD_DEVICE), f'/dev/{disk}', pa.partno + 1 raise OgError(f'Cannot find EFI partition at /dev/{disk}') def get_partition_id(disk_index, part_index): device = get_partition_device(disk_index, part_index) cmd = f'blkid -s PARTUUID -o value {device}' proc = subprocess.run(shlex.split(cmd), stdout=subprocess.PIPE, encoding='utf-8') if proc.returncode != 0: raise OgError(f'failed to query partition UUID for {device}') return proc.stdout.strip() def get_disk_id(disk_index): disk = get_disks()[disk_index - 1] disk_path = f'/dev/{disk}' cmd = f'blkid -s PTUUID -o value {disk_path}' proc = subprocess.run(shlex.split(cmd), stdout=subprocess.PIPE, encoding='utf-8') if proc.returncode != 0: raise OgError(f'failed to query disk UUID for {disk_path}') return proc.stdout.strip() def get_filesystem_id(disk_index, part_index): device = get_partition_device(disk_index, part_index) cmd = f'blkid -s UUID -o value {device}' proc = subprocess.run(shlex.split(cmd), stdout=subprocess.PIPE, encoding='utf-8') if proc.returncode != 0: raise OgError(f'failed to query filesystem UUID for {device}') return proc.stdout.strip() def get_sector_size(disk): disk_index = disk - 1 if disk_index < 0 or disk_index >= len(get_disks()): raise OgError(f'Invalid disk number {disk} when trying to find ESP, {len(get_disks())} disks available.') device_name = get_disks()[disk_index] file_path = f'/sys/class/block/{device_name}/queue/hw_sector_size' try: with open(file_path, 'r') as f: data = f.read().strip() except OSError as e: raise OgError(f'Error while trying to read {file_path}: {e}') from e return int(data) def get_partition_start_offset(disk, partition): disk_name = get_disks()[disk - 1] disk_path = f'/dev/{disk_name}' part_number = partition - 1 cmd = f'sfdisk -J {disk_path}' proc = subprocess.run(shlex.split(cmd), capture_output=True, text=True) if proc.returncode != 0: raise OgError(f'Failed to query sfdisk') try: part_data_json = json.loads(proc.stdout) except json.JSONDecodeError as e: raise OgError(f'Invalid sfdisk output: {e}') from e try: part_data = part_data_json['partitiontable']['partitions'] start_offset = part_data[part_number]['start'] except KeyError as e: raise OgError(f'Error while trying to parse sfdisk: {e}') from e return start_offset