# # Copyright (C) 2020-2024 Soleta Networks # # This program is free software: you can redistribute it and/or modify it under # the terms of the GNU Affero General Public License as published by the # Free Software Foundation; either version 3 of the License, or # (at your option) any later version. import array import sys import socket import struct import psutil import logging if sys.platform != "win32": import fcntl def is_ethernet(interface): SIOCGIFHWADDR = 0x8927 ARPHRD_ETHER = 1 try: s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) info = fcntl.ioctl( s.fileno(), SIOCGIFHWADDR, struct.pack('256s', interface.encode('utf-8')[:15])) if struct.unpack('H', info[16:18])[0] == ARPHRD_ETHER: return True return False except IOError: return False finally: s.close() def is_wifi(interface): SIOCGIWNAME = 0x8B01 # Wireless-specific ioctl sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) try: buffer = struct.pack('256s', interface.encode('utf-8')) info = fcntl.ioctl(sock.fileno(), SIOCGIWNAME, buffer) return True except IOError: return False finally: sock.close() def is_link_active(interface): sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) try: # Get interface flags ifreq = struct.pack('16sh', interface.encode('utf-8'), 0) flags = struct.unpack('16sh', fcntl.ioctl(sock.fileno(), 0x8913, ifreq))[1] # Check if IFF_UP and IFF_RUNNING flags are set if flags & 0x1 and flags & 0x40: return True else: return False except IOError: return False finally: sock.close() def get_ethernet_interface(): eth_interfaces = [] interfaces = psutil.net_if_addrs() for interface in interfaces: if is_ethernet(interface) and not is_wifi(interface) and is_link_active(interface): eth_interfaces.append(interface) if len(eth_interfaces) > 1: logging.info(f'Multiple active ethernet interfaces found: {", ".join(eth_interfaces)}. Using {eth_interfaces[0]}') if not eth_interfaces: logging.info('No valid ethernet interface found') return None return eth_interfaces[0] def ethtool(interface): try: ETHTOOL_GSET = 0x00000001 SIOCETHTOOL = 0x8946 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sockfd = sock.fileno() ecmd = array.array( "B", struct.pack("I39s", ETHTOOL_GSET, b"\x00" * 39) ) interface = interface.encode("utf-8") ifreq = struct.pack("16sP", interface, ecmd.buffer_info()[0]) fcntl.ioctl(sockfd, SIOCETHTOOL, ifreq) res = ecmd.tobytes() speed = struct.unpack("12xH29x", res)[0] except IOError: speed = 0 finally: sock.close() return speed def getifaddr(device): """ """ s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) return socket.inet_ntoa(fcntl.ioctl( s.fileno(), 0x8915, # SIOCGIFADDR struct.pack('256s', bytes(device[:15], 'utf-8')) )[20:24]) def getifhwaddr(device): """ """ s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) hwaddr = fcntl.ioctl( s.fileno(), 0x8927, # SIOCGIFHWADDR struct.pack('256s', bytes(device[:15], 'utf-8')) )[18:24] return "%02x:%02x:%02x:%02x:%02x:%02x" % struct.unpack("BBBBBB", hwaddr)