# # Copyright (C) 2020-2024 Soleta Networks # # This program is free software: you can redistribute it and/or modify it under # the terms of the GNU Affero General Public License as published by the # Free Software Foundation; either version 3 of the License, or # (at your option) any later version. import json import os.path import shlex import subprocess from src.log import OgError from collections import namedtuple from enum import Enum, auto SYSFS_EFI_PATH = '/sys/firmware/efi' class HardwareType(Enum): MULTIMEDIA = 1 BOOTMODE = 2 FIRMWARE = 3 GRAPHICS = 4 STORAGE = 5 NETWORK = 6 CHASSIS = 7 MEMORY = 8 MODEL = 9 DISK = 10 CPU = 11 USB = 12 CD = 13 class HardwareElement(): """ Simple container of a hardware type and its name """ def __init__(self, hwtype, name): self.type = hwtype self.name = name def __str__(self): return f'Hardware element {self.type}: {self.name}' class HardwareInventory(): """ Collection of hardware elements """ def __init__(self): self.elements = list() def add_element(self, elem): if elem.type not in HardwareType: raise OgError(f'Unsupported hardware type, received {elem.type}') if not elem.name: raise OgError('Empty hardware element name') self.elements.append(elem) def _bytes_to_human(size): suffixes = ['B', 'MiB', 'GiB', 'TiB'] if type(size) is not int: raise OgError(f'Invalid type in _bytes_to_human, got: {size} {type(size)}') for exponent, suffix in enumerate(suffixes, start=1): conv = size / (1024**exponent) if conv < 1024: return f'{conv:.2f} {suffix}' # Utility methods for lshw json output processing def _fill_computer_model(inventory, root): model = ' '.join([root.get('vendor', 'Unknown vendor'), root.get('product', 'Unknown prouct'), root.get('version', 'Unknown version')]) elem = HardwareElement(HardwareType.MODEL, model) inventory.add_element(elem) def _fill_chassis_type(inventory, root): chassis = root['configuration']['chassis'] elem = HardwareElement(HardwareType.CHASSIS, chassis) inventory.add_element(elem) def _fill_bootmode(inventory): bootmode = 'UEFI' if os.path.exists(SYSFS_EFI_PATH) else 'BIOS' elem = HardwareElement(HardwareType.BOOTMODE, bootmode) inventory.add_element(elem) def _process_core_firmware(inventory, obj): desc = ' '.join([obj.get('description', ''), obj.get('vendor', ''), obj.get('version', '')]) firmware_elem = HardwareElement(HardwareType.FIRMWARE, desc) inventory.add_element(firmware_elem) def _process_core_cpu(inventory, obj): cpu = obj.get('product', 'Unknown product') cpu_elem = HardwareElement(HardwareType.CPU, cpu) inventory.add_element(cpu_elem) def _process_core_mem_bank(inventory, obj): slot = obj.get('slot', 'Unknown slot') if 'size' in obj: size = obj['size'] human_size = _bytes_to_human(size) mem = ' '.join([obj.get('vendor', 'Unknown vendor'), obj.get('product', 'Unknown product'), human_size, f'({slot})']) else: mem = ' '.join([obj.get('vendor', 'Unknown vendor'), obj.get('product', 'Unknown product'), 'Empty slot', f'({slot})']) mem_elem = HardwareElement(HardwareType.MEMORY, mem) inventory.add_element(mem_elem) def _process_core_mem(inventory, obj): banks = obj.get('children', []) for bank in banks: _process_core_mem_bank(inventory, bank) def _process_core_pci_usb(inventory, obj): name = ' '.join([obj.get('vendor', 'Unknown vendor'), obj.get('product', 'Unknown product')]) usb_elem = HardwareElement(HardwareType.USB, name) inventory.add_element(usb_elem) def _process_core_pci_display(inventory, obj): name = ' '.join([obj.get('vendor', 'Unknown vendor'), obj.get('product', 'Unknown product')]) display_elem = HardwareElement(HardwareType.GRAPHICS, name) inventory.add_element(display_elem) def _process_core_pci_network(inventory, obj): link = obj.get('size', 'Unknown link speed') if type(link) == int: if link >= 1e9: link_human = f'{link/1e9} Gbit/s' else: link_human = f'{link/1e6} Mbit/s' name = ' '.join([obj.get('vendor', 'Unknown vendor'), obj.get('product', 'Unknown product'), f'({link_human})']) else: name = ' '.join([obj.get('vendor', 'Unknown vendor'), obj.get('product', 'Unknown product')]) elem = HardwareElement(HardwareType.NETWORK, name) inventory.add_element(elem) def _process_core_pci_storage_child(inventory, obj): obj_id = obj.get('id', '') if obj_id.startswith('disk') or obj_id.startswith('nvme'): size = _bytes_to_human(obj['size']) if 'size' in obj else 'Unknown size' name = ' '.join([obj.get('description', ''), obj.get('product', 'Unknown product'), size]) elem = HardwareElement(HardwareType.DISK, name) inventory.add_element(elem) def _process_core_pci_storage(inventory, obj): name = ' '.join([obj.get('vendor', 'Unknown vendor'), obj.get('product', 'Unknown product')]) elem = HardwareElement(HardwareType.STORAGE, name) inventory.add_element(elem) # Disks follow a storage section for storage_child in obj.get('children', []): _process_core_pci_storage_child(inventory, storage_child) def _process_core_pci_disk(inventory, obj): name = ' '.join([obj.get('vendor', 'Unknown vendor'), obj.get('product', 'Unknown product')]) elem = HardwareElement(HardwareType.DISK, name) inventory.add_element(elem) def _process_core_pci_multimedia(inventory, obj): name = ' '.join([obj.get('vendor', 'Unknown vendor'), obj.get('product', 'Unknown product')]) elem = HardwareElement(HardwareType.MULTIMEDIA, name) inventory.add_element(elem) def _process_core_pci_child(inventory, obj): obj_id = obj.get('id', '') if obj_id.startswith('usb'): _process_core_pci_usb(inventory, obj) elif obj_id.startswith('display'): _process_core_pci_display(inventory, obj) elif obj_id.startswith('network'): _process_core_pci_network(inventory, obj) elif obj_id.startswith('sata') or obj_id.startswith('storage'): _process_core_pci_storage(inventory, obj) elif obj_id.startswith('multimedia'): _process_core_pci_multimedia(inventory, obj) elif obj_id.startswith('pci'): # PCI bridge bridge_children = obj.get('children', []) for bridge_child in bridge_children: _process_core_pci_child(inventory, bridge_child) def _process_core_pci(inventory, obj): children = obj.get('children', []) for child in children: _process_core_pci_child(inventory, child) def _process_core_scsi_disk(inventory, obj): vendor = obj.get('vendor', 'Unknown vendor') name = ' '.join([vendor, obj.get('product', 'Unknown product')]) elem = HardwareElement(HardwareType.DISK, name) inventory.add_element(elem) def _process_core_scsi_cdrom(inventory, obj): name = ' '.join([obj.get('vendor', 'Unknown vendor'), obj.get('product', 'Unknown product')]) elem = HardwareElement(HardwareType.CD, name) inventory.add_element(elem) def _process_core_scsi_child(inventory, obj): obj_id = obj.get('id', '') if obj_id.startswith('disk'): _process_core_scsi_disk(inventory, obj) elif obj_id.startswith('cdrom'): _process_core_scsi_cdrom(inventory, obj) def _process_core_scsi(inventory, obj): children = obj.get('children', []) for child in children: _process_core_scsi_child(inventory, child) def _process_core(inventory, core): for element in core['children']: element_id = element['id'] if element_id.startswith('firmware'): _process_core_firmware(inventory, element) elif element_id.startswith('cpu'): _process_core_cpu(inventory, element) elif element_id.startswith('memory'): _process_core_mem(inventory, element) elif element_id.startswith('pci'): _process_core_pci(inventory, element) elif element_id.startswith('scsi'): _process_core_scsi(inventory, element) def legacy_hardware_element(element): """ Legacy string representation of a hardware element. For example, a graphics card named "Foo" would be represented as "vga=Foo" """ if type(element) is not HardwareElement: raise OgError('Invalid hardware element type') elif element.type is HardwareType.MULTIMEDIA: nemonic = 'mul' elif element.type is HardwareType.BOOTMODE: nemonic = 'boo' elif element.type is HardwareType.FIRMWARE: nemonic = 'bio' elif element.type is HardwareType.GRAPHICS: nemonic = 'vga' elif element.type is HardwareType.CHASSIS: nemonic = 'cha' elif element.type is HardwareType.STORAGE: nemonic = 'sto' elif element.type is HardwareType.NETWORK: nemonic = 'net' elif element.type is HardwareType.MEMORY: nemonic = 'mem' elif element.type is HardwareType.MODEL: nemonic = 'mod' elif element.type is HardwareType.DISK: nemonic = 'dis' elif element.type is HardwareType.CPU: nemonic = 'cpu' elif element.type is HardwareType.USB: nemonic = 'usb' elif element.type is HardwareType.CD: nemonic = 'cdr' return f'{nemonic}={element.name}' def legacy_list_hardware_inventory(inventory): """ Return a hardware inventory as a legacy string. Map each hardware component to its legacy string representation and concatenate using a new line. This is the same output as legacy script ogListHardware. """ return '\n'.join([legacy_hardware_element(elem) for elem in inventory.elements]) def get_hardware_inventory(): proc = subprocess.run(['lshw', '-json'], stdout=subprocess.PIPE, stderr=subprocess.DEVNULL) root = json.loads(proc.stdout) if type(root) is list: root = root[0] if type(root) is not dict: raise OgError('Invalid lshw json output') inventory = HardwareInventory() _fill_computer_model(inventory, root) _fill_chassis_type(inventory, root) _fill_bootmode(inventory) # Process 'children' node # Usually there are two type of children from lshw json root: # 'core' and 'power'. # We are only interested in 'core'. children = root['children'] for child in children: child_id = child['id'] if child_id == 'core': _process_core(inventory, child) return inventory