# # Copyright (C) 2020-2024 Soleta Networks # # This program is free software: you can redistribute it and/or modify it under # the terms of the GNU Affero General Public License as published by the # Free Software Foundation; either version 3 of the License, or # (at your option) any later version. import logging import os import fdisk from src.log import OgError from src.utils.disk import * from src.utils.uefi import is_uefi_supported class FstabEntry(object): def __init__(self, device, mountpoint, fstype, options, dump_code='0', pass_code='0'): self.device = device self.mountpoint = mountpoint self.fstype = fstype self.options = options self.dump_code = dump_code self.pass_code = pass_code if not self.options: self.options = "default" def __str__(self): return "{} {} {} {} {} {}".format(self.device, self.mountpoint, self.fstype, self.options, self.dump_code, self.pass_code) def get_fields(self): return [self.device, self.mountpoint, self.fstype, self.options, self.dump_code, self.pass_code] class FstabBuilder(object): def __init__(self): self.header = "# /etc/fstab: static file system information.\n" \ "#\n" \ "# Use 'blkid -o value -s UUID' to print the universally unique identifier\n" \ "# for a device; this may be used with UUID= as a more robust way to name\n" \ "# devices that works even if disks are added and removed. See fstab(5).\n" \ "#\n" self.entries = [] def __str__(self): # Group fields by columns an compute the max length of each column # to obtain the needed tabulation. res = self.header field_matrix = [entry.get_fields() for entry in self.entries] transposed = list(zip(*field_matrix)) col_widths = [] for col in transposed: max_length_col = max([len(item) for item in col]) col_widths.append(max_length_col) for row in field_matrix: alligned_row = [] for i, item in enumerate(row): formatted_item = f"{item:<{col_widths[i]}}" alligned_row.append(formatted_item) res += " ".join(alligned_row) res += '\n' return res def load(self, file_path): self.entries.clear() try: with open(file_path, 'r') as file: for line in file: line = line.strip() if not line or line.startswith('#'): continue line = line.replace('\t', ' ') fields = list(filter(None, line.split(' '))) if len(fields) < 4 or len(fields) > 6: raise OgError(f'Invalid line in {file_path}: {line}') self.entries.append(FstabEntry(*fields)) except FileNotFoundError as e: raise OgError(f"File {file_path} not found") from e except IOError as e: raise OgError(f"Could not read file {file_path}: {e}") from e except Exception as e: raise OgError(f"An unexpected error occurred: {e}") from e def write(self, file_path): with open(file_path, 'w') as file: file.write(str(self)) def get_entry_by_fstype(self, fstype): res = [] for entry in self.entries: if entry.fstype == fstype: res.append(entry) return res def get_entry_by_mountpoint(self, mountpoint): for entry in self.entries: if entry.mountpoint == mountpoint: return entry return None def remove_entry(self, entry): if entry in self.entries: self.entries.remove(entry) def add_entry(self, entry): self.entries.append(entry) def get_formatted_device(disk, partition): uuid = get_filesystem_id(disk, partition) if uuid: return f'UUID={uuid}' return get_partition_device(disk, partition) def configure_root_partition(disk, partition, fstab): root_device = get_formatted_device(disk, partition) root_entry = fstab.get_entry_by_mountpoint('/') if not root_entry: raise OgError('Invalid fstab configuration: no root mountpoint defined') root_entry.device = root_device def configure_swap(disk, mountpoint, fstab): swap_entries = fstab.get_entry_by_fstype('swap') swap_entry = None for entry in swap_entries: if entry.device.startswith('/'): old_swap_device_path = os.path.join(mountpoint, entry.device[1:]) is_swapfile = os.path.isfile(old_swap_device_path) if is_swapfile: continue if swap_entry: logging.warning(f'Removing device {entry.device} from fstab, only one swap partition is supported') fstab.remove_entry(entry) else: swap_entry = entry diskname = get_disks()[disk-1] partitions = get_partition_data(device=f'/dev/{diskname}') swap_device = '' for pa in partitions: if pa.fstype == 'swap': swap_device = get_formatted_device(disk, pa.partno + 1) break if not swap_device: if swap_entry: fstab.remove_entry(swap_entry) return if swap_entry: swap_entry.device = swap_device return swap_entry = FstabEntry(swap_device, 'none', 'swap', 'sw', '0', '0') fstab.add_entry(swap_entry) def configure_efi(disk, fstab): if not is_uefi_supported(): return efi_mnt_list = ['/boot/efi', '/efi'] for efi_mnt in efi_mnt_list: efi_entry = fstab.get_entry_by_mountpoint(efi_mnt) if efi_entry: break _, _, efi_partition = get_efi_partition(disk, enforce_gpt=False) esp_device = get_formatted_device(disk, efi_partition) if efi_entry: efi_entry.device = esp_device return efi_entry = FstabEntry(esp_device, '/boot/efi', 'vfat', 'umask=0077,shortname=winnt', '0', '2') fstab.add_entry(efi_entry) def update_fstab(disk, partition, mountpoint): fstab_path = os.path.join(mountpoint, 'etc/fstab') fstab = FstabBuilder() fstab.load(fstab_path) configure_root_partition(disk, partition, fstab) configure_swap(disk, mountpoint, fstab) configure_efi(disk, fstab) fstab.write(fstab_path)