summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorJose M. Guisado <jguisado@soleta.eu>2023-07-21 09:49:24 +0200
committerJose M. Guisado <jguisado@soleta.eu>2023-07-21 11:55:25 +0200
commitee0d62db444441be41d99ac4ee5f2de5100cd934 (patch)
tree72a226395568a1a7b8a09bf5e005797e59fac849
parent035995fc8c55740693575ca5f7b408bfc46c1f7d (diff)
hw_inventory: use dict.get
The first stage of parsing the "lshw -json" command output is to load the json string into a Python dictionary. lshw output is large and varies from machine to machine, so it's not safe to assume that different keys will be present in the dictionary. Use dict.get() instead of dict[key] to avoid KeyError exceptions.
-rw-r--r--src/utils/hw_inventory.py50
1 files changed, 27 insertions, 23 deletions
diff --git a/src/utils/hw_inventory.py b/src/utils/hw_inventory.py
index db24b83..2f38ae2 100644
--- a/src/utils/hw_inventory.py
+++ b/src/utils/hw_inventory.py
@@ -73,7 +73,9 @@ def _bytes_to_human(size):
# Utility methods for lshw json output processing
def _fill_computer_model(inventory, root):
- model = ' '.join([root['vendor'], root['product'], root['version']])
+ model = ' '.join([root.get('vendor', 'Unknown vendor'),
+ root.get('product', 'Unknown prouct'),
+ root.get('version', 'Unknown version')])
elem = HardwareElement(HardwareType.MODEL, model)
inventory.add_element(elem)
@@ -91,40 +93,40 @@ def _fill_bootmode(inventory):
def _process_core_firmware(inventory, obj):
- desc = ' '.join([obj['description'], obj['vendor'], obj['version']])
+ desc = ' '.join([obj.get('description', ''), obj.get('vendor', ''), obj.get('version', '')])
firmware_elem = HardwareElement(HardwareType.FIRMWARE, desc)
inventory.add_element(firmware_elem)
def _process_core_cpu(inventory, obj):
- cpu = obj['product']
+ cpu = obj.get('product', 'Unknown product')
cpu_elem = HardwareElement(HardwareType.CPU, cpu)
inventory.add_element(cpu_elem)
def _process_core_mem_bank(inventory, obj):
slot = obj.get('slot', 'Unknown slot')
- size = _bytes_to_human(obj['size']) if 'size' in obj else None
+ size = _bytes_to_human(obj.get('size', None))
if size:
- mem = ' '.join([obj['vendor'], obj['product'], size, f'({slot})'])
+ mem = ' '.join([obj.get('vendor', 'Unknown vendor'), obj.get('product', 'Unknown product'), size, f'({slot})'])
mem_elem = HardwareElement(HardwareType.MEMORY, mem)
inventory.add_element(mem_elem)
def _process_core_mem(inventory, obj):
- banks = obj['children']
+ banks = obj.get('children', [])
for bank in banks:
_process_core_mem_bank(inventory, bank)
def _process_core_pci_usb(inventory, obj):
- name = ' '.join([obj['vendor'], obj['product']])
+ name = ' '.join([obj.get('vendor', 'Unknown vendor'), obj.get('product', 'Unknown product')])
usb_elem = HardwareElement(HardwareType.USB, name)
inventory.add_element(usb_elem)
def _process_core_pci_display(inventory, obj):
- name = ' '.join([obj['vendor'], obj['product']])
+ name = ' '.join([obj.get('vendor', 'Unknown vendor'), obj.get('product', 'Unknown product')])
display_elem = HardwareElement(HardwareType.GRAPHICS, name)
inventory.add_element(display_elem)
@@ -132,26 +134,28 @@ def _process_core_pci_display(inventory, obj):
def _process_core_pci_network(inventory, obj):
link = obj.get('size', 'Unknown link speed')
if type(link) == int:
- if link >= 1000000000:
- linkh = f'{link/1e9} Gbit/s'
+ if link >= 1e9:
+ link_human = f'{link/1e9} Gbit/s'
else:
- linkh = f'{link/1e6} Mbit/s'
- name = ' '.join([obj['vendor'], obj['product'], f'({linkh})'])
+ link_human = f'{link/1e6} Mbit/s'
+ name = ' '.join([obj.get('vendor', 'Unknown vendor'), obj.get('product', 'Unknown product'), f'({link_human})'])
+ else:
+ name = ' '.join([obj.get('vendor', 'Unknown vendor'), obj.get('product', 'Unknown product')])
elem = HardwareElement(HardwareType.NETWORK, name)
inventory.add_element(elem)
def _process_core_pci_storage_child(inventory, obj):
- obj_id = obj['id']
+ obj_id = obj.get('id', '')
if obj_id.startswith('disk') or obj_id.startswith('nvme'):
- size = bytes_to_human(obj['size']) if 'size' in obj else 'Unknown size'
- name = ' '.join([obj['description'], obj['product'], size])
+ size = bytes_to_human(obj.get('size', 'Unknown size'))
+ name = ' '.join([obj.get('description', ''), obj.get('product', 'Unknown product'), size])
elem = HardwareElement(HardwareType.DISK, name)
inventory.add_element(elem)
def _process_core_pci_storage(inventory, obj):
- name = ' '.join([obj['vendor'], obj['product']])
+ name = ' '.join([obj.get('vendor', 'Unknown vendor'), obj.get('product', 'Unknown product')])
elem = HardwareElement(HardwareType.STORAGE, name)
inventory.add_element(elem)
# Disks follow a storage section
@@ -160,19 +164,19 @@ def _process_core_pci_storage(inventory, obj):
def _process_core_pci_disk(inventory, obj):
- name = ' '.join([obj['vendor'], obj['product']])
+ name = ' '.join([obj.get('vendor', 'Unknown vendor'), obj.get('product', 'Unknown product')])
elem = HardwareElement(HardwareType.DISK, name)
inventory.add_element(elem)
def _process_core_pci_multimedia(inventory, obj):
- name = ' '.join([obj['vendor'], obj['product']])
+ name = ' '.join([obj.get('vendor', 'Unknown vendor'), obj.get('product', 'Unknown product')])
elem = HardwareElement(HardwareType.MULTIMEDIA, name)
inventory.add_element(elem)
def _process_core_pci_child(inventory, obj):
- obj_id = obj['id']
+ obj_id = obj.get('id', '')
if obj_id.startswith('usb'):
_process_core_pci_usb(inventory, obj)
elif obj_id.startswith('display'):
@@ -197,19 +201,19 @@ def _process_core_pci(inventory, obj):
def _process_core_scsi_disk(inventory, obj):
vendor = obj.get('vendor', 'Unknown vendor')
- name = ' '.join([vendor, obj['product']])
+ name = ' '.join([vendor, obj.get('product', 'Unknown product')])
elem = HardwareElement(HardwareType.DISK, name)
inventory.add_element(elem)
def _process_core_scsi_cdrom(inventory, obj):
- name = ' '.join([obj['vendor'], obj['product']])
+ name = ' '.join([obj.get('vendor', 'Unknown vendor'), obj.get('product', 'Unknown product')])
elem = HardwareElement(HardwareType.CD, name)
inventory.add_element(elem)
def _process_core_scsi_child(inventory, obj):
- obj_id = obj['id']
+ obj_id = obj.get('id', '')
if obj_id.startswith('disk'):
_process_core_scsi_disk(inventory, obj)
elif obj_id.startswith('cdrom'):
@@ -217,7 +221,7 @@ def _process_core_scsi_child(inventory, obj):
def _process_core_scsi(inventory, obj):
- children = obj['children']
+ children = obj.get('children', [])
for child in children:
_process_core_scsi_child(inventory, child)