From e30e934272f16e3fa60454c37692034064a33a5c Mon Sep 17 00:00:00 2001 From: Alejandro Sirgo Rica Date: Tue, 1 Oct 2024 10:14:48 +0200 Subject: src: replace DEVICE env variable with get_ethernet_interface() Use a python function to obtain the main net interface. Detect the first ethernet inferface in use. Stop using the DEVICE environment variable. --- src/utils/net.py | 67 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 67 insertions(+) (limited to 'src/utils/net.py') diff --git a/src/utils/net.py b/src/utils/net.py index 01c6f04..e842c0f 100644 --- a/src/utils/net.py +++ b/src/utils/net.py @@ -10,6 +10,73 @@ import array import fcntl import socket import struct +import psutil +import logging + +def is_ethernet(interface): + SIOCGIFHWADDR = 0x8927 + ARPHRD_ETHER = 1 + + try: + s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + info = fcntl.ioctl( + s.fileno(), + SIOCGIFHWADDR, + struct.pack('256s', interface.encode('utf-8')[:15])) + if struct.unpack('H', info[16:18])[0] == ARPHRD_ETHER: + return True + return False + except IOError: + return False + finally: + s.close() + +def is_wifi(interface): + SIOCGIWNAME = 0x8B01 # Wireless-specific ioctl + + sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + try: + buffer = struct.pack('256s', interface.encode('utf-8')) + info = fcntl.ioctl(sock.fileno(), SIOCGIWNAME, buffer) + return True + except IOError: + return False + finally: + sock.close() + +def is_link_active(interface): + sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + + try: + # Get interface flags + ifreq = struct.pack('16sh', interface.encode('utf-8'), 0) + flags = struct.unpack('16sh', fcntl.ioctl(sock.fileno(), 0x8913, ifreq))[1] + + # Check if IFF_UP and IFF_RUNNING flags are set + if flags & 0x1 and flags & 0x40: + return True + else: + return False + except IOError: + return False + finally: + sock.close() + +def get_ethernet_interface(): + eth_interfaces = [] + interfaces = psutil.net_if_addrs() + for interface in interfaces: + if is_ethernet(interface) and not is_wifi(interface) and is_link_active(interface): + eth_interfaces.append(interface) + + if len(eth_interfaces) > 1: + logging.info(f'Multiple active ethernet interfaces found: {", ".join(eth_interfaces)}. Using {eth_interfaces[0]}') + + if not eth_interfaces: + logging.info('No valid ethernet interface found') + return None + + return eth_interfaces[0] def ethtool(interface): try: -- cgit v1.2.3-18-g5258