From 49a86bddd9cee5941bdcafd304a00ad127368ed2 Mon Sep 17 00:00:00 2001 From: "Jose M. Guisado" Date: Mon, 17 Apr 2023 12:53:49 +0200 Subject: utils: add hw_inventory.py hw_inventory.py defines classes and helpers functions enabling fetching of hardware inventory from a running client. Uses a subprocess call to the command 'lshw -json' to obtain hardware information. Relevant public functions: > get_hardware_inventory() Main function encapsulating subprocess and output processing logic. Returns a HardwareInventory object. > legacy_list_hardware_inventory(inventory) Legacy string representation of parameter HardwareInventory object --- src/utils/hw_inventory.py | 309 ++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 309 insertions(+) create mode 100644 src/utils/hw_inventory.py (limited to 'src/utils') diff --git a/src/utils/hw_inventory.py b/src/utils/hw_inventory.py new file mode 100644 index 0000000..db24b83 --- /dev/null +++ b/src/utils/hw_inventory.py @@ -0,0 +1,309 @@ +# +# Copyright (C) 2023 Soleta Networks +# +# This program is free software: you can redistribute it and/or modify it under +# the terms of the GNU Affero General Public License as published by the +# Free Software Foundation; either version 3 of the License, or +# (at your option) any later version. + +import json +import os.path +import shlex +import subprocess + +from collections import namedtuple +from enum import Enum, auto + + +SYSFS_EFI_PATH = '/sys/firmware/efi' + + +class HardwareType(Enum): + MULTIMEDIA = 1 + BOOTMODE = 2 + FIRMWARE = 3 + GRAPHICS = 4 + STORAGE = 5 + NETWORK = 6 + CHASSIS = 7 + MEMORY = 8 + MODEL = 9 + DISK = 10 + CPU = 11 + USB = 12 + CD = 13 + + +class HardwareElement(): + """ + Simple container of a hardware type and its name + """ + def __init__(self, hwtype, name): + self.type = hwtype + self.name = name + + def __str__(self): + return f'Hardware element {self.type}: {self.name}' + + +class HardwareInventory(): + """ + Collection of hardware elements + """ + def __init__(self): + self.elements = list() + + def add_element(self, elem): + if elem.type not in HardwareType: + raise ValueError('Unsupported hardware type') + if not elem.name: + raise ValueError('Empty hardware element name') + self.elements.append(elem) + + +def _bytes_to_human(size): + suffixes = ['B', 'MiB', 'GiB', 'TiB'] + if type(size) is not int: + raise TypeError('Invalid type') + for exponent, suffix in enumerate(suffixes, start=1): + conv = size / (1024**exponent) + if conv < 1024: + return f'{conv:.2f} {suffix}' + +# Utility methods for lshw json output processing + +def _fill_computer_model(inventory, root): + model = ' '.join([root['vendor'], root['product'], root['version']]) + elem = HardwareElement(HardwareType.MODEL, model) + inventory.add_element(elem) + + +def _fill_chassis_type(inventory, root): + chassis = root['configuration']['chassis'] + elem = HardwareElement(HardwareType.CHASSIS, chassis) + inventory.add_element(elem) + + +def _fill_bootmode(inventory): + bootmode = 'UEFI' if os.path.exists(SYSFS_EFI_PATH) else 'BIOS' + elem = HardwareElement(HardwareType.BOOTMODE, bootmode) + inventory.add_element(elem) + + +def _process_core_firmware(inventory, obj): + desc = ' '.join([obj['description'], obj['vendor'], obj['version']]) + firmware_elem = HardwareElement(HardwareType.FIRMWARE, desc) + inventory.add_element(firmware_elem) + + +def _process_core_cpu(inventory, obj): + cpu = obj['product'] + cpu_elem = HardwareElement(HardwareType.CPU, cpu) + inventory.add_element(cpu_elem) + + +def _process_core_mem_bank(inventory, obj): + slot = obj.get('slot', 'Unknown slot') + size = _bytes_to_human(obj['size']) if 'size' in obj else None + if size: + mem = ' '.join([obj['vendor'], obj['product'], size, f'({slot})']) + mem_elem = HardwareElement(HardwareType.MEMORY, mem) + inventory.add_element(mem_elem) + + +def _process_core_mem(inventory, obj): + banks = obj['children'] + for bank in banks: + _process_core_mem_bank(inventory, bank) + + +def _process_core_pci_usb(inventory, obj): + name = ' '.join([obj['vendor'], obj['product']]) + usb_elem = HardwareElement(HardwareType.USB, name) + inventory.add_element(usb_elem) + + +def _process_core_pci_display(inventory, obj): + name = ' '.join([obj['vendor'], obj['product']]) + display_elem = HardwareElement(HardwareType.GRAPHICS, name) + inventory.add_element(display_elem) + + +def _process_core_pci_network(inventory, obj): + link = obj.get('size', 'Unknown link speed') + if type(link) == int: + if link >= 1000000000: + linkh = f'{link/1e9} Gbit/s' + else: + linkh = f'{link/1e6} Mbit/s' + name = ' '.join([obj['vendor'], obj['product'], f'({linkh})']) + elem = HardwareElement(HardwareType.NETWORK, name) + inventory.add_element(elem) + + +def _process_core_pci_storage_child(inventory, obj): + obj_id = obj['id'] + if obj_id.startswith('disk') or obj_id.startswith('nvme'): + size = bytes_to_human(obj['size']) if 'size' in obj else 'Unknown size' + name = ' '.join([obj['description'], obj['product'], size]) + elem = HardwareElement(HardwareType.DISK, name) + inventory.add_element(elem) + + +def _process_core_pci_storage(inventory, obj): + name = ' '.join([obj['vendor'], obj['product']]) + elem = HardwareElement(HardwareType.STORAGE, name) + inventory.add_element(elem) + # Disks follow a storage section + for storage_child in obj.get('children', []): + _process_core_pci_storage_child(inventory, storage_child) + + +def _process_core_pci_disk(inventory, obj): + name = ' '.join([obj['vendor'], obj['product']]) + elem = HardwareElement(HardwareType.DISK, name) + inventory.add_element(elem) + + +def _process_core_pci_multimedia(inventory, obj): + name = ' '.join([obj['vendor'], obj['product']]) + elem = HardwareElement(HardwareType.MULTIMEDIA, name) + inventory.add_element(elem) + + +def _process_core_pci_child(inventory, obj): + obj_id = obj['id'] + if obj_id.startswith('usb'): + _process_core_pci_usb(inventory, obj) + elif obj_id.startswith('display'): + _process_core_pci_display(inventory, obj) + elif obj_id.startswith('network'): + _process_core_pci_network(inventory, obj) + elif obj_id.startswith('sata') or obj_id.startswith('storage'): + _process_core_pci_storage(inventory, obj) + elif obj_id.startswith('multimedia'): + _process_core_pci_multimedia(inventory, obj) + elif obj_id.startswith('pci'): # PCI bridge + bridge_children = obj.get('children', []) + for bridge_child in bridge_children: + _process_core_pci_child(inventory, bridge_child) + + +def _process_core_pci(inventory, obj): + children = obj.get('children', []) + for child in children: + _process_core_pci_child(inventory, child) + + +def _process_core_scsi_disk(inventory, obj): + vendor = obj.get('vendor', 'Unknown vendor') + name = ' '.join([vendor, obj['product']]) + elem = HardwareElement(HardwareType.DISK, name) + inventory.add_element(elem) + + +def _process_core_scsi_cdrom(inventory, obj): + name = ' '.join([obj['vendor'], obj['product']]) + elem = HardwareElement(HardwareType.CD, name) + inventory.add_element(elem) + + +def _process_core_scsi_child(inventory, obj): + obj_id = obj['id'] + if obj_id.startswith('disk'): + _process_core_scsi_disk(inventory, obj) + elif obj_id.startswith('cdrom'): + _process_core_scsi_cdrom(inventory, obj) + + +def _process_core_scsi(inventory, obj): + children = obj['children'] + for child in children: + _process_core_scsi_child(inventory, child) + +def _process_core(inventory, core): + for element in core['children']: + element_id = element['id'] + if element_id.startswith('firmware'): + _process_core_firmware(inventory, element) + elif element_id.startswith('cpu'): + _process_core_cpu(inventory, element) + elif element_id.startswith('memory'): + _process_core_mem(inventory, element) + elif element_id.startswith('pci'): + _process_core_pci(inventory, element) + elif element_id.startswith('scsi'): + _process_core_scsi(inventory, element) + +def legacy_hardware_element(element): + """ + Legacy string representation of a hardware element. + + For example, a graphics card named "Foo" would be + represented as "vga=Foo" + """ + if type(element) is not HardwareElement: + raise TypeError('Invalid type') + elif element.type is HardwareType.MULTIMEDIA: + nemonic = 'mul' + elif element.type is HardwareType.BOOTMODE: + nemonic = 'boo' + elif element.type is HardwareType.FIRMWARE: + nemonic = 'bio' + elif element.type is HardwareType.GRAPHICS: + nemonic = 'vga' + elif element.type is HardwareType.CHASSIS: + nemonic = 'cha' + elif element.type is HardwareType.STORAGE: + nemonic = 'sto' + elif element.type is HardwareType.NETWORK: + nemonic = 'net' + elif element.type is HardwareType.MEMORY: + nemonic = 'mem' + elif element.type is HardwareType.MODEL: + nemonic = 'mod' + elif element.type is HardwareType.DISK: + nemonic = 'dis' + elif element.type is HardwareType.CPU: + nemonic = 'cpu' + elif element.type is HardwareType.USB: + nemonic = 'usb' + elif element.type is HardwareType.CD: + nemonic = 'cdr' + return f'{nemonic}={element.name}' + +def legacy_list_hardware_inventory(inventory): + """ + Return a hardware inventory as a legacy string. Map each hardware + component to its legacy string representation and concatenate using a new + line. + + This is the same output as legacy script ogListHardware. + """ + return '\n'.join([legacy_hardware_element(elem) for elem in inventory.elements]) + + +def get_hardware_inventory(): + proc = subprocess.run(['lshw', '-json'], stdout=subprocess.PIPE, stderr=subprocess.DEVNULL) + j = json.loads(proc.stdout) + + if type(j) is list: + root = j[0] + if type(root) is not dict: + raise ValueError('Unvalid lshw json output') + + inventory = HardwareInventory() + _fill_computer_model(inventory, root) + _fill_chassis_type(inventory, root) + _fill_bootmode(inventory) + # Process 'children' node + # Usually there are two type of children from lshw json root: + # 'core' and 'power'. + # We are only interested in 'core'. + children = root['children'] + for child in children: + child_id = child['id'] + if child_id == 'core': + _process_core(inventory, child) + + return inventory -- cgit v1.2.3-18-g5258