summaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
authorJose M. Guisado <jguisado@soleta.eu>2023-04-17 12:53:49 +0200
committerJose M. Guisado <jguisado@soleta.eu>2023-04-18 12:02:29 +0200
commit49a86bddd9cee5941bdcafd304a00ad127368ed2 (patch)
tree59c39dc33bae98474b5ed8b1f79f0415d21ab761 /src
parent4c0904d8da3daf9dc1fc688e09ba4a21153d2ac9 (diff)
utils: add hw_inventory.py
hw_inventory.py defines classes and helpers functions enabling fetching of hardware inventory from a running client. Uses a subprocess call to the command 'lshw -json' to obtain hardware information. Relevant public functions: > get_hardware_inventory() Main function encapsulating subprocess and output processing logic. Returns a HardwareInventory object. > legacy_list_hardware_inventory(inventory) Legacy string representation of parameter HardwareInventory object
Diffstat (limited to 'src')
-rw-r--r--src/utils/hw_inventory.py309
1 files changed, 309 insertions, 0 deletions
diff --git a/src/utils/hw_inventory.py b/src/utils/hw_inventory.py
new file mode 100644
index 0000000..db24b83
--- /dev/null
+++ b/src/utils/hw_inventory.py
@@ -0,0 +1,309 @@
+#
+# Copyright (C) 2023 Soleta Networks <info@soleta.eu>
+#
+# This program is free software: you can redistribute it and/or modify it under
+# the terms of the GNU Affero General Public License as published by the
+# Free Software Foundation; either version 3 of the License, or
+# (at your option) any later version.
+
+import json
+import os.path
+import shlex
+import subprocess
+
+from collections import namedtuple
+from enum import Enum, auto
+
+
+SYSFS_EFI_PATH = '/sys/firmware/efi'
+
+
+class HardwareType(Enum):
+ MULTIMEDIA = 1
+ BOOTMODE = 2
+ FIRMWARE = 3
+ GRAPHICS = 4
+ STORAGE = 5
+ NETWORK = 6
+ CHASSIS = 7
+ MEMORY = 8
+ MODEL = 9
+ DISK = 10
+ CPU = 11
+ USB = 12
+ CD = 13
+
+
+class HardwareElement():
+ """
+ Simple container of a hardware type and its name
+ """
+ def __init__(self, hwtype, name):
+ self.type = hwtype
+ self.name = name
+
+ def __str__(self):
+ return f'Hardware element {self.type}: {self.name}'
+
+
+class HardwareInventory():
+ """
+ Collection of hardware elements
+ """
+ def __init__(self):
+ self.elements = list()
+
+ def add_element(self, elem):
+ if elem.type not in HardwareType:
+ raise ValueError('Unsupported hardware type')
+ if not elem.name:
+ raise ValueError('Empty hardware element name')
+ self.elements.append(elem)
+
+
+def _bytes_to_human(size):
+ suffixes = ['B', 'MiB', 'GiB', 'TiB']
+ if type(size) is not int:
+ raise TypeError('Invalid type')
+ for exponent, suffix in enumerate(suffixes, start=1):
+ conv = size / (1024**exponent)
+ if conv < 1024:
+ return f'{conv:.2f} {suffix}'
+
+# Utility methods for lshw json output processing
+
+def _fill_computer_model(inventory, root):
+ model = ' '.join([root['vendor'], root['product'], root['version']])
+ elem = HardwareElement(HardwareType.MODEL, model)
+ inventory.add_element(elem)
+
+
+def _fill_chassis_type(inventory, root):
+ chassis = root['configuration']['chassis']
+ elem = HardwareElement(HardwareType.CHASSIS, chassis)
+ inventory.add_element(elem)
+
+
+def _fill_bootmode(inventory):
+ bootmode = 'UEFI' if os.path.exists(SYSFS_EFI_PATH) else 'BIOS'
+ elem = HardwareElement(HardwareType.BOOTMODE, bootmode)
+ inventory.add_element(elem)
+
+
+def _process_core_firmware(inventory, obj):
+ desc = ' '.join([obj['description'], obj['vendor'], obj['version']])
+ firmware_elem = HardwareElement(HardwareType.FIRMWARE, desc)
+ inventory.add_element(firmware_elem)
+
+
+def _process_core_cpu(inventory, obj):
+ cpu = obj['product']
+ cpu_elem = HardwareElement(HardwareType.CPU, cpu)
+ inventory.add_element(cpu_elem)
+
+
+def _process_core_mem_bank(inventory, obj):
+ slot = obj.get('slot', 'Unknown slot')
+ size = _bytes_to_human(obj['size']) if 'size' in obj else None
+ if size:
+ mem = ' '.join([obj['vendor'], obj['product'], size, f'({slot})'])
+ mem_elem = HardwareElement(HardwareType.MEMORY, mem)
+ inventory.add_element(mem_elem)
+
+
+def _process_core_mem(inventory, obj):
+ banks = obj['children']
+ for bank in banks:
+ _process_core_mem_bank(inventory, bank)
+
+
+def _process_core_pci_usb(inventory, obj):
+ name = ' '.join([obj['vendor'], obj['product']])
+ usb_elem = HardwareElement(HardwareType.USB, name)
+ inventory.add_element(usb_elem)
+
+
+def _process_core_pci_display(inventory, obj):
+ name = ' '.join([obj['vendor'], obj['product']])
+ display_elem = HardwareElement(HardwareType.GRAPHICS, name)
+ inventory.add_element(display_elem)
+
+
+def _process_core_pci_network(inventory, obj):
+ link = obj.get('size', 'Unknown link speed')
+ if type(link) == int:
+ if link >= 1000000000:
+ linkh = f'{link/1e9} Gbit/s'
+ else:
+ linkh = f'{link/1e6} Mbit/s'
+ name = ' '.join([obj['vendor'], obj['product'], f'({linkh})'])
+ elem = HardwareElement(HardwareType.NETWORK, name)
+ inventory.add_element(elem)
+
+
+def _process_core_pci_storage_child(inventory, obj):
+ obj_id = obj['id']
+ if obj_id.startswith('disk') or obj_id.startswith('nvme'):
+ size = bytes_to_human(obj['size']) if 'size' in obj else 'Unknown size'
+ name = ' '.join([obj['description'], obj['product'], size])
+ elem = HardwareElement(HardwareType.DISK, name)
+ inventory.add_element(elem)
+
+
+def _process_core_pci_storage(inventory, obj):
+ name = ' '.join([obj['vendor'], obj['product']])
+ elem = HardwareElement(HardwareType.STORAGE, name)
+ inventory.add_element(elem)
+ # Disks follow a storage section
+ for storage_child in obj.get('children', []):
+ _process_core_pci_storage_child(inventory, storage_child)
+
+
+def _process_core_pci_disk(inventory, obj):
+ name = ' '.join([obj['vendor'], obj['product']])
+ elem = HardwareElement(HardwareType.DISK, name)
+ inventory.add_element(elem)
+
+
+def _process_core_pci_multimedia(inventory, obj):
+ name = ' '.join([obj['vendor'], obj['product']])
+ elem = HardwareElement(HardwareType.MULTIMEDIA, name)
+ inventory.add_element(elem)
+
+
+def _process_core_pci_child(inventory, obj):
+ obj_id = obj['id']
+ if obj_id.startswith('usb'):
+ _process_core_pci_usb(inventory, obj)
+ elif obj_id.startswith('display'):
+ _process_core_pci_display(inventory, obj)
+ elif obj_id.startswith('network'):
+ _process_core_pci_network(inventory, obj)
+ elif obj_id.startswith('sata') or obj_id.startswith('storage'):
+ _process_core_pci_storage(inventory, obj)
+ elif obj_id.startswith('multimedia'):
+ _process_core_pci_multimedia(inventory, obj)
+ elif obj_id.startswith('pci'): # PCI bridge
+ bridge_children = obj.get('children', [])
+ for bridge_child in bridge_children:
+ _process_core_pci_child(inventory, bridge_child)
+
+
+def _process_core_pci(inventory, obj):
+ children = obj.get('children', [])
+ for child in children:
+ _process_core_pci_child(inventory, child)
+
+
+def _process_core_scsi_disk(inventory, obj):
+ vendor = obj.get('vendor', 'Unknown vendor')
+ name = ' '.join([vendor, obj['product']])
+ elem = HardwareElement(HardwareType.DISK, name)
+ inventory.add_element(elem)
+
+
+def _process_core_scsi_cdrom(inventory, obj):
+ name = ' '.join([obj['vendor'], obj['product']])
+ elem = HardwareElement(HardwareType.CD, name)
+ inventory.add_element(elem)
+
+
+def _process_core_scsi_child(inventory, obj):
+ obj_id = obj['id']
+ if obj_id.startswith('disk'):
+ _process_core_scsi_disk(inventory, obj)
+ elif obj_id.startswith('cdrom'):
+ _process_core_scsi_cdrom(inventory, obj)
+
+
+def _process_core_scsi(inventory, obj):
+ children = obj['children']
+ for child in children:
+ _process_core_scsi_child(inventory, child)
+
+def _process_core(inventory, core):
+ for element in core['children']:
+ element_id = element['id']
+ if element_id.startswith('firmware'):
+ _process_core_firmware(inventory, element)
+ elif element_id.startswith('cpu'):
+ _process_core_cpu(inventory, element)
+ elif element_id.startswith('memory'):
+ _process_core_mem(inventory, element)
+ elif element_id.startswith('pci'):
+ _process_core_pci(inventory, element)
+ elif element_id.startswith('scsi'):
+ _process_core_scsi(inventory, element)
+
+def legacy_hardware_element(element):
+ """
+ Legacy string representation of a hardware element.
+
+ For example, a graphics card named "Foo" would be
+ represented as "vga=Foo"
+ """
+ if type(element) is not HardwareElement:
+ raise TypeError('Invalid type')
+ elif element.type is HardwareType.MULTIMEDIA:
+ nemonic = 'mul'
+ elif element.type is HardwareType.BOOTMODE:
+ nemonic = 'boo'
+ elif element.type is HardwareType.FIRMWARE:
+ nemonic = 'bio'
+ elif element.type is HardwareType.GRAPHICS:
+ nemonic = 'vga'
+ elif element.type is HardwareType.CHASSIS:
+ nemonic = 'cha'
+ elif element.type is HardwareType.STORAGE:
+ nemonic = 'sto'
+ elif element.type is HardwareType.NETWORK:
+ nemonic = 'net'
+ elif element.type is HardwareType.MEMORY:
+ nemonic = 'mem'
+ elif element.type is HardwareType.MODEL:
+ nemonic = 'mod'
+ elif element.type is HardwareType.DISK:
+ nemonic = 'dis'
+ elif element.type is HardwareType.CPU:
+ nemonic = 'cpu'
+ elif element.type is HardwareType.USB:
+ nemonic = 'usb'
+ elif element.type is HardwareType.CD:
+ nemonic = 'cdr'
+ return f'{nemonic}={element.name}'
+
+def legacy_list_hardware_inventory(inventory):
+ """
+ Return a hardware inventory as a legacy string. Map each hardware
+ component to its legacy string representation and concatenate using a new
+ line.
+
+ This is the same output as legacy script ogListHardware.
+ """
+ return '\n'.join([legacy_hardware_element(elem) for elem in inventory.elements])
+
+
+def get_hardware_inventory():
+ proc = subprocess.run(['lshw', '-json'], stdout=subprocess.PIPE, stderr=subprocess.DEVNULL)
+ j = json.loads(proc.stdout)
+
+ if type(j) is list:
+ root = j[0]
+ if type(root) is not dict:
+ raise ValueError('Unvalid lshw json output')
+
+ inventory = HardwareInventory()
+ _fill_computer_model(inventory, root)
+ _fill_chassis_type(inventory, root)
+ _fill_bootmode(inventory)
+ # Process 'children' node
+ # Usually there are two type of children from lshw json root:
+ # 'core' and 'power'.
+ # We are only interested in 'core'.
+ children = root['children']
+ for child in children:
+ child_id = child['id']
+ if child_id == 'core':
+ _process_core(inventory, child)
+
+ return inventory