From ee0d62db444441be41d99ac4ee5f2de5100cd934 Mon Sep 17 00:00:00 2001 From: "Jose M. Guisado" Date: Fri, 21 Jul 2023 09:49:24 +0200 Subject: hw_inventory: use dict.get The first stage of parsing the "lshw -json" command output is to load the json string into a Python dictionary. lshw output is large and varies from machine to machine, so it's not safe to assume that different keys will be present in the dictionary. Use dict.get() instead of dict[key] to avoid KeyError exceptions. --- src/utils/hw_inventory.py | 50 +++++++++++++++++++++++++---------------------- 1 file changed, 27 insertions(+), 23 deletions(-) (limited to 'src') diff --git a/src/utils/hw_inventory.py b/src/utils/hw_inventory.py index db24b83..2f38ae2 100644 --- a/src/utils/hw_inventory.py +++ b/src/utils/hw_inventory.py @@ -73,7 +73,9 @@ def _bytes_to_human(size): # Utility methods for lshw json output processing def _fill_computer_model(inventory, root): - model = ' '.join([root['vendor'], root['product'], root['version']]) + model = ' '.join([root.get('vendor', 'Unknown vendor'), + root.get('product', 'Unknown prouct'), + root.get('version', 'Unknown version')]) elem = HardwareElement(HardwareType.MODEL, model) inventory.add_element(elem) @@ -91,40 +93,40 @@ def _fill_bootmode(inventory): def _process_core_firmware(inventory, obj): - desc = ' '.join([obj['description'], obj['vendor'], obj['version']]) + desc = ' '.join([obj.get('description', ''), obj.get('vendor', ''), obj.get('version', '')]) firmware_elem = HardwareElement(HardwareType.FIRMWARE, desc) inventory.add_element(firmware_elem) def _process_core_cpu(inventory, obj): - cpu = obj['product'] + cpu = obj.get('product', 'Unknown product') cpu_elem = HardwareElement(HardwareType.CPU, cpu) inventory.add_element(cpu_elem) def _process_core_mem_bank(inventory, obj): slot = obj.get('slot', 'Unknown slot') - size = _bytes_to_human(obj['size']) if 'size' in obj else None + size = _bytes_to_human(obj.get('size', None)) if size: - mem = ' '.join([obj['vendor'], obj['product'], size, f'({slot})']) + mem = ' '.join([obj.get('vendor', 'Unknown vendor'), obj.get('product', 'Unknown product'), size, f'({slot})']) mem_elem = HardwareElement(HardwareType.MEMORY, mem) inventory.add_element(mem_elem) def _process_core_mem(inventory, obj): - banks = obj['children'] + banks = obj.get('children', []) for bank in banks: _process_core_mem_bank(inventory, bank) def _process_core_pci_usb(inventory, obj): - name = ' '.join([obj['vendor'], obj['product']]) + name = ' '.join([obj.get('vendor', 'Unknown vendor'), obj.get('product', 'Unknown product')]) usb_elem = HardwareElement(HardwareType.USB, name) inventory.add_element(usb_elem) def _process_core_pci_display(inventory, obj): - name = ' '.join([obj['vendor'], obj['product']]) + name = ' '.join([obj.get('vendor', 'Unknown vendor'), obj.get('product', 'Unknown product')]) display_elem = HardwareElement(HardwareType.GRAPHICS, name) inventory.add_element(display_elem) @@ -132,26 +134,28 @@ def _process_core_pci_display(inventory, obj): def _process_core_pci_network(inventory, obj): link = obj.get('size', 'Unknown link speed') if type(link) == int: - if link >= 1000000000: - linkh = f'{link/1e9} Gbit/s' + if link >= 1e9: + link_human = f'{link/1e9} Gbit/s' else: - linkh = f'{link/1e6} Mbit/s' - name = ' '.join([obj['vendor'], obj['product'], f'({linkh})']) + link_human = f'{link/1e6} Mbit/s' + name = ' '.join([obj.get('vendor', 'Unknown vendor'), obj.get('product', 'Unknown product'), f'({link_human})']) + else: + name = ' '.join([obj.get('vendor', 'Unknown vendor'), obj.get('product', 'Unknown product')]) elem = HardwareElement(HardwareType.NETWORK, name) inventory.add_element(elem) def _process_core_pci_storage_child(inventory, obj): - obj_id = obj['id'] + obj_id = obj.get('id', '') if obj_id.startswith('disk') or obj_id.startswith('nvme'): - size = bytes_to_human(obj['size']) if 'size' in obj else 'Unknown size' - name = ' '.join([obj['description'], obj['product'], size]) + size = bytes_to_human(obj.get('size', 'Unknown size')) + name = ' '.join([obj.get('description', ''), obj.get('product', 'Unknown product'), size]) elem = HardwareElement(HardwareType.DISK, name) inventory.add_element(elem) def _process_core_pci_storage(inventory, obj): - name = ' '.join([obj['vendor'], obj['product']]) + name = ' '.join([obj.get('vendor', 'Unknown vendor'), obj.get('product', 'Unknown product')]) elem = HardwareElement(HardwareType.STORAGE, name) inventory.add_element(elem) # Disks follow a storage section @@ -160,19 +164,19 @@ def _process_core_pci_storage(inventory, obj): def _process_core_pci_disk(inventory, obj): - name = ' '.join([obj['vendor'], obj['product']]) + name = ' '.join([obj.get('vendor', 'Unknown vendor'), obj.get('product', 'Unknown product')]) elem = HardwareElement(HardwareType.DISK, name) inventory.add_element(elem) def _process_core_pci_multimedia(inventory, obj): - name = ' '.join([obj['vendor'], obj['product']]) + name = ' '.join([obj.get('vendor', 'Unknown vendor'), obj.get('product', 'Unknown product')]) elem = HardwareElement(HardwareType.MULTIMEDIA, name) inventory.add_element(elem) def _process_core_pci_child(inventory, obj): - obj_id = obj['id'] + obj_id = obj.get('id', '') if obj_id.startswith('usb'): _process_core_pci_usb(inventory, obj) elif obj_id.startswith('display'): @@ -197,19 +201,19 @@ def _process_core_pci(inventory, obj): def _process_core_scsi_disk(inventory, obj): vendor = obj.get('vendor', 'Unknown vendor') - name = ' '.join([vendor, obj['product']]) + name = ' '.join([vendor, obj.get('product', 'Unknown product')]) elem = HardwareElement(HardwareType.DISK, name) inventory.add_element(elem) def _process_core_scsi_cdrom(inventory, obj): - name = ' '.join([obj['vendor'], obj['product']]) + name = ' '.join([obj.get('vendor', 'Unknown vendor'), obj.get('product', 'Unknown product')]) elem = HardwareElement(HardwareType.CD, name) inventory.add_element(elem) def _process_core_scsi_child(inventory, obj): - obj_id = obj['id'] + obj_id = obj.get('id', '') if obj_id.startswith('disk'): _process_core_scsi_disk(inventory, obj) elif obj_id.startswith('cdrom'): @@ -217,7 +221,7 @@ def _process_core_scsi_child(inventory, obj): def _process_core_scsi(inventory, obj): - children = obj['children'] + children = obj.get('children', []) for child in children: _process_core_scsi_child(inventory, child) -- cgit v1.2.3-18-g5258