diff options
author | Alejandro Sirgo Rica <asirgo@soleta.eu> | 2024-10-01 10:14:48 +0200 |
---|---|---|
committer | Alejandro Sirgo Rica <asirgo@soleta.eu> | 2024-10-01 15:32:54 +0200 |
commit | e30e934272f16e3fa60454c37692034064a33a5c (patch) | |
tree | 1ce5a32c25581a201d2129a1aa55f4cf53acd1bc /src/utils/net.py | |
parent | 8f437bb954266d1da31c7044edcdc2d9f4cef540 (diff) |
src: replace DEVICE env variable with get_ethernet_interface()
Use a python function to obtain the main net interface. Detect
the first ethernet inferface in use.
Stop using the DEVICE environment variable.
Diffstat (limited to 'src/utils/net.py')
-rw-r--r-- | src/utils/net.py | 67 |
1 files changed, 67 insertions, 0 deletions
diff --git a/src/utils/net.py b/src/utils/net.py index 01c6f04..e842c0f 100644 --- a/src/utils/net.py +++ b/src/utils/net.py @@ -10,6 +10,73 @@ import array import fcntl import socket import struct +import psutil +import logging + +def is_ethernet(interface): + SIOCGIFHWADDR = 0x8927 + ARPHRD_ETHER = 1 + + try: + s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + info = fcntl.ioctl( + s.fileno(), + SIOCGIFHWADDR, + struct.pack('256s', interface.encode('utf-8')[:15])) + if struct.unpack('H', info[16:18])[0] == ARPHRD_ETHER: + return True + return False + except IOError: + return False + finally: + s.close() + +def is_wifi(interface): + SIOCGIWNAME = 0x8B01 # Wireless-specific ioctl + + sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + try: + buffer = struct.pack('256s', interface.encode('utf-8')) + info = fcntl.ioctl(sock.fileno(), SIOCGIWNAME, buffer) + return True + except IOError: + return False + finally: + sock.close() + +def is_link_active(interface): + sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + + try: + # Get interface flags + ifreq = struct.pack('16sh', interface.encode('utf-8'), 0) + flags = struct.unpack('16sh', fcntl.ioctl(sock.fileno(), 0x8913, ifreq))[1] + + # Check if IFF_UP and IFF_RUNNING flags are set + if flags & 0x1 and flags & 0x40: + return True + else: + return False + except IOError: + return False + finally: + sock.close() + +def get_ethernet_interface(): + eth_interfaces = [] + interfaces = psutil.net_if_addrs() + for interface in interfaces: + if is_ethernet(interface) and not is_wifi(interface) and is_link_active(interface): + eth_interfaces.append(interface) + + if len(eth_interfaces) > 1: + logging.info(f'Multiple active ethernet interfaces found: {", ".join(eth_interfaces)}. Using {eth_interfaces[0]}') + + if not eth_interfaces: + logging.info('No valid ethernet interface found') + return None + + return eth_interfaces[0] def ethtool(interface): try: |