summaryrefslogtreecommitdiffstats
path: root/src/utils/fstab.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/utils/fstab.py')
-rw-r--r--src/utils/fstab.py213
1 files changed, 213 insertions, 0 deletions
diff --git a/src/utils/fstab.py b/src/utils/fstab.py
new file mode 100644
index 0000000..b419aa2
--- /dev/null
+++ b/src/utils/fstab.py
@@ -0,0 +1,213 @@
+#
+# Copyright (C) 2024 Soleta Networks <info@soleta.eu>
+#
+# This program is free software: you can redistribute it and/or modify it under
+# the terms of the GNU Affero General Public License as published by the
+# Free Software Foundation; either version 3 of the License, or
+# (at your option) any later version.
+
+import logging
+import os
+import fdisk
+from src.log import OgError
+from src.utils.disk import *
+from src.utils.uefi import is_uefi_supported
+
+
+class FstabEntry(object):
+ def __init__(self, device, mountpoint, fstype, options,
+ dump_code='0', pass_code='0'):
+ self.device = device
+ self.mountpoint = mountpoint
+ self.fstype = fstype
+ self.options = options
+ self.dump_code = dump_code
+ self.pass_code = pass_code
+
+ if not self.options:
+ self.options = "default"
+
+ def __str__(self):
+ return "{} {} {} {} {} {}".format(self.device, self.mountpoint,
+ self.fstype, self.options,
+ self.dump_code, self.pass_code)
+
+ def get_fields(self):
+ return [self.device, self.mountpoint, self.fstype, self.options,
+ self.dump_code, self.pass_code]
+
+
+class FstabBuilder(object):
+ def __init__(self):
+ self.header = "# /etc/fstab: static file system information.\n" \
+ "#\n" \
+ "# Use 'blkid -o value -s UUID' to print the universally unique identifier\n" \
+ "# for a device; this may be used with UUID= as a more robust way to name\n" \
+ "# devices that works even if disks are added and removed. See fstab(5).\n" \
+ "#\n"
+ self.entries = []
+
+ def __str__(self):
+ # Group fields by columns an compute the max length of each column
+ # to obtain the needed tabulation.
+ res = self.header
+ field_matrix = [entry.get_fields() for entry in self.entries]
+ transposed = list(zip(*field_matrix))
+
+ col_widths = []
+ for col in transposed:
+ max_length_col = max([len(item) for item in col])
+ col_widths.append(max_length_col)
+
+ for row in field_matrix:
+ alligned_row = []
+ for i, item in enumerate(row):
+ formatted_item = f"{item:<{col_widths[i]}}"
+ alligned_row.append(formatted_item)
+
+ res += " ".join(alligned_row)
+ res += '\n'
+
+ return res
+
+ def load(self, file_path):
+ self.entries.clear()
+
+ try:
+ with open(file_path, 'r') as file:
+ for line in file:
+ line = line.strip()
+
+ if not line or line.startswith('#'):
+ continue
+
+ line = line.replace('\t', ' ')
+ fields = list(filter(None, line.split(' ')))
+
+ if len(fields) < 4 or len(fields) > 6:
+ raise OgError(f'Invalid line in {file_path}: {line}')
+
+ self.entries.append(FstabEntry(*fields))
+ except FileNotFoundError as e:
+ raise OgError(f"File {file_path} not found") from e
+ except IOError as e:
+ raise OgError(f"Could not read file {file_path}: {e}") from e
+ except Exception as e:
+ raise OgError(f"An unexpected error occurred: {e}") from e
+
+ def write(self, file_path):
+ with open(file_path, 'w') as file:
+ file.write(str(self))
+
+ def get_entry_by_fstype(self, fstype):
+ res = []
+ for entry in self.entries:
+ if entry.fstype == fstype:
+ res.append(entry)
+ return res
+
+ def get_entry_by_mountpoint(self, mountpoint):
+ for entry in self.entries:
+ if entry.mountpoint == mountpoint:
+ return entry
+ return None
+
+ def remove_entry(self, entry):
+ if entry in self.entries:
+ self.entries.remove(entry)
+
+ def add_entry(self, entry):
+ self.entries.append(entry)
+
+
+def get_formatted_device(disk, partition):
+ uuid = get_filesystem_id(disk, partition)
+
+ if uuid:
+ return f'UUID={uuid}'
+
+ return get_partition_device(disk, partition)
+
+
+def configure_root_partition(disk, partition, fstab):
+ root_device = get_formatted_device(disk, partition)
+ root_entry = fstab.get_entry_by_mountpoint('/')
+
+ if not root_entry:
+ raise OgError('Invalid fstab configuration: no root mountpoint defined')
+
+ root_entry.device = root_device
+
+
+def configure_swap(disk, mountpoint, fstab):
+ swap_entries = fstab.get_entry_by_fstype('swap')
+ swap_entry = None
+
+ for entry in swap_entries:
+ if entry.device.startswith('/'):
+ old_swap_device_path = os.path.join(mountpoint, entry.device[1:])
+ is_swapfile = os.path.isfile(old_swap_device_path)
+
+ if is_swapfile:
+ continue
+
+ if swap_entry:
+ logging.warning(f'Removing device {entry.device} from fstab, only one swap partition is supported')
+ fstab.remove_entry(entry)
+ else:
+ swap_entry = entry
+
+ diskname = get_disks()[disk-1]
+ cxt = fdisk.Context(f'/dev/{diskname}')
+
+ swap_device = ''
+ for pa in cxt.partitions:
+ if cxt.partition_to_string(pa, fdisk.FDISK_FIELD_FSTYPE) == 'swap':
+ swap_device = get_formatted_device(disk, pa.partno + 1)
+ break
+
+ if not swap_device:
+ if swap_entry:
+ fstab.remove_entry(swap_entry)
+ return
+
+ if swap_entry:
+ swap_entry.device = swap_device
+ return
+
+ swap_entry = FstabEntry(swap_device, 'none', 'swap', 'sw', '0', '0')
+ fstab.add_entry(swap_entry)
+
+
+def configure_efi(disk, fstab):
+ if not is_uefi_supported():
+ return
+
+ efi_mnt_list = ['/boot/efi', '/efi']
+ for efi_mnt in efi_mnt_list:
+ efi_entry = fstab.get_entry_by_mountpoint(efi_mnt)
+ if efi_entry:
+ break
+
+ _, _, efi_partition = get_efi_partition(disk, enforce_gpt=False)
+ esp_device = get_formatted_device(disk, efi_partition)
+
+ if efi_entry:
+ efi_entry.device = esp_device
+ return
+
+ efi_entry = FstabEntry(esp_device, '/boot/efi', 'vfat', 'umask=0077,shortname=winnt', '0', '2')
+ fstab.add_entry(efi_entry)
+
+
+def update_fstab(disk, partition, mountpoint):
+ fstab_path = os.path.join(mountpoint, 'etc/fstab')
+
+ fstab = FstabBuilder()
+ fstab.load(fstab_path)
+
+ configure_root_partition(disk, partition, fstab)
+ configure_swap(disk, mountpoint, fstab)
+ configure_efi(disk, fstab)
+
+ fstab.write(fstab_path)