feat(ci): updated Ethernet tests to align with new runners

This commit is contained in:
Ondrej Kosta
2025-11-20 14:40:12 +01:00
parent df08d0dbc3
commit 3fe406aa92
5 changed files with 117 additions and 64 deletions
@@ -642,7 +642,7 @@ TEST_CASE("heap utilization", "[ethernet_l2]")
}
#define FORMAT_MAC(mac_addr, a, b, c, d, e, f) do { mac_addr[0] = a; mac_addr[1] = b; mac_addr[2] = c; mac_addr[3] = d; mac_addr[4] = e; mac_addr[5] = f; } while(0)
TEST_CASE("w5500_multicast_filter", "[ethernet_l2]")
TEST_CASE("multicast_filter", "[ethernet_l2]")
{
esp_eth_mac_t *mac = mac_init(NULL, NULL);
TEST_ASSERT_NOT_NULL(mac);
+13 -11
View File
@@ -31,16 +31,18 @@ class EthTestIntf(object):
netifs.sort(reverse=True)
logging.info('detected interfaces: %s', str(netifs))
for netif in netifs:
# if no interface defined, try to find it automatically
if my_if == '':
if netif.find('eth') == 0 or netif.find('enp') == 0 or netif.find('eno') == 0:
self.target_if = netif
break
if my_if == '':
if 'dut_p1' in netifs:
self.target_if = 'dut_p1'
else:
if netif.find(my_if) == 0:
self.target_if = my_if
break
for netif in netifs:
# if no interface defined, try to find it automatically
if netif.find('eth') == 0 or netif.find('enp') == 0 or netif.find('eno') == 0:
self.target_if = netif
break
elif my_if in netifs:
self.target_if = my_if
if self.target_if == '':
raise RuntimeError('network interface not found')
logging.info('Use %s for testing', self.target_if)
@@ -132,8 +134,8 @@ def ethernet_int_emac_test(dut: IdfDut) -> None:
dut.run_all_single_board_cases(group='esp_emac', timeout=240)
def ethernet_l2_test(dut: IdfDut) -> None:
target_if = EthTestIntf(ETH_TYPE)
def ethernet_l2_test(dut: IdfDut, test_if: str = '') -> None:
target_if = EthTestIntf(ETH_TYPE, test_if)
dut.expect_exact('Press ENTER to see the list of tests')
dut.write('\n')
@@ -11,8 +11,11 @@ import subprocess
import time
from concurrent.futures import Future
from concurrent.futures import ThreadPoolExecutor
from typing import Generator
from typing import List
from typing import Match
from typing import Optional
from typing import Tuple
from typing import Union
import netifaces
@@ -276,28 +279,38 @@ def send_brcast_msg_endnode_to_host(endnode: EndnodeSsh, host_brcast_ip: str, te
return nc_host_out
@pytest.mark.eth_w5500
@pytest.mark.parametrize(
'config',
[
'w5500',
],
indirect=True,
)
@idf_parametrize('target', ['esp32'], indirect=['target'])
def test_esp_eth_bridge(dut: Dut, dev_user: str, dev_password: str) -> None:
def get_legacy_host_name_match() -> Optional[Match[str]]:
host_name = socket.gethostname()
regex = r'ethVM-(\d+)-(\d+)'
host_name_match = re.search(regex, host_name, re.DOTALL)
return host_name_match
def get_host_info() -> Tuple[int, int]:
# Get switch configuration info from the hostname (legacy runners)
sw_info = get_legacy_host_name_match()
if sw_info is not None:
sw_num = int(sw_info.group(1))
port_num = int(sw_info.group(2))
return sw_num, port_num
else:
# Get switch configuration info from the IP address of the `switch` interface (new runners)
switch_if_ip = get_host_ip_by_interface('switch', netifaces.AF_INET)
# Parse IP address: x.y.<sw_num>.<port_num>
ip_parts = switch_if_ip.split('.')
if len(ip_parts) == 4:
sw_num = int(ip_parts[2])
port_num = int(ip_parts[3])
return sw_num, port_num
else:
raise RuntimeError('Unexpected switch IP address')
def eth_bridge_test(dut: Dut, dev_user: str, dev_password: str) -> None:
# ------------------------------ #
# Pre-test testbed configuration #
# ------------------------------ #
# Get switch configuration info from the hostname
host_name = socket.gethostname()
regex = r'ethVM-(\d+)-(\d+)'
sw_info = re.search(regex, host_name, re.DOTALL)
if sw_info is None:
raise RuntimeError('Unexpected hostname')
sw_num = int(sw_info.group(1))
port_num = int(sw_info.group(2))
sw_num, port_num = get_host_info()
port_num_endnode = int(port_num) + 1 # endnode address is always + 1 to the host
endnode = EndnodeSsh(f'10.10.{sw_num}.{port_num_endnode}', ETHVM_ENDNODE_USER)
@@ -333,7 +346,10 @@ def test_esp_eth_bridge(dut: Dut, dev_user: str, dev_password: str) -> None:
host_ip = get_host_ip_by_interface(host_if, netifaces.AF_INET)
logging.info('Host IP %s', host_ip)
endnode_if = host_if # endnode is a clone of the host
if get_legacy_host_name_match() is not None:
endnode_if = host_if # endnode is a clone of the host (legacy runners)
else:
endnode_if = 'dut_p2' # interface name connected to the second port of the DUT (new runners)
# Endnode MAC
endnode_mac = get_endnode_mac_by_interface(endnode, endnode_if)
logging.info('Endnode MAC %s', endnode_mac)
@@ -355,12 +371,12 @@ def test_esp_eth_bridge(dut: Dut, dev_user: str, dev_password: str) -> None:
# TEST Objective 1: Ping the devices on the network
# --------------------------------------------------
# ping bridge
ping_test = subprocess.call(f'ping {br_ip} -c 2', shell=True)
ping_test = subprocess.call(['ping', br_ip, '-c', '2'])
if ping_test != 0:
raise RuntimeError('ESP bridge is not reachable')
# ping the end nodes of the network
ping_test = subprocess.call(f'ping {endnode_ip} -c 2', shell=True)
ping_test = subprocess.call(['ping', endnode_ip, '-c', '2'])
if ping_test != 0:
raise RuntimeError('End node is not reachable')
@@ -529,13 +545,13 @@ def test_esp_eth_bridge(dut: Dut, dev_user: str, dev_password: str) -> None:
logging.info('Drop `Endnode` MAC')
dut.write('add --addr=' + endnode_mac + ' -d')
dut.expect_exact('Bridge Config OK!')
ping_test = subprocess.call(f'ping {endnode_ip} -c 2', shell=True)
ping_test = subprocess.call(['ping', endnode_ip, '-c', '2'])
if ping_test == 0:
raise RuntimeError('End node should not be reachable')
logging.info('Remove Drop `Endnode` MAC entry')
dut.write('remove --addr=' + endnode_mac)
dut.expect_exact('Bridge Config OK!')
ping_test = subprocess.call(f'ping {endnode_ip} -c 2', shell=True)
ping_test = subprocess.call(['ping', endnode_ip, '-c', '2'])
if ping_test != 0:
raise RuntimeError('End node is not reachable')
@@ -568,9 +584,9 @@ def test_esp_eth_bridge(dut: Dut, dev_user: str, dev_password: str) -> None:
# Remove ARP record from Test host computer. ARP is broadcasted, hence Bridge port does not reply to a request since
# it does not receive it (no forward to Bridge port). As a result, Bridge is not pingable.
subprocess.call(f'sudo arp -d {br_ip}', shell=True)
subprocess.call('arp -a', shell=True)
ping_test = subprocess.call(f'ping {br_ip} -c 2', shell=True)
subprocess.call(['sudo', 'arp', '-d', br_ip])
subprocess.call(['arp', '-a'])
ping_test = subprocess.call(['ping', br_ip, '-c', '2'])
if ping_test == 0:
raise RuntimeError('Bridge should not be reachable')
@@ -580,7 +596,7 @@ def test_esp_eth_bridge(dut: Dut, dev_user: str, dev_password: str) -> None:
dut.expect_exact('Bridge Config OK!')
dut.write('add --addr=ff:ff:ff:ff:ff:ff -p 1 -c')
dut.expect_exact('Bridge Config OK!')
ping_test = subprocess.call(f'ping {br_ip} -c 2', shell=True)
ping_test = subprocess.call(['ping', br_ip, '-c', '2'])
if ping_test != 0:
raise RuntimeError('Bridge is not reachable')
@@ -591,3 +607,28 @@ def test_esp_eth_bridge(dut: Dut, dev_user: str, dev_password: str) -> None:
endnode.close()
switch1.close()
@pytest.fixture(scope='session', autouse=True)
def setup_test_environment() -> Generator[None, None, None]:
# Fixture code to run before any tests in the session
# make sure dut_p2 is down (only for new runners)
if get_legacy_host_name_match() is None:
subprocess.call(['sudo', 'ip', 'link', 'set', 'down', 'dev', 'dut_p2'])
yield # Tests run here
# Optional teardown after all tests...
@pytest.mark.eth_w5500
@pytest.mark.parametrize(
'config',
[
'w5500',
],
indirect=True,
)
@idf_parametrize('target', ['esp32'], indirect=['target'])
def test_esp_eth_bridge(dut: Dut, dev_user: str, dev_password: str) -> None:
eth_bridge_test(dut, dev_user, dev_password)
@@ -20,18 +20,26 @@ ETH_TYPE_3 = 0x2223
@contextlib.contextmanager
def configure_eth_if(eth_type: int, target_if: str = '') -> Iterator[socket.socket]:
# try to determine which interface to use
netifs = os.listdir('/sys/class/net/')
# order matters - ETH NIC with the highest number is connected to DUT on CI runner
netifs.sort(reverse=True)
logging.info('detected interfaces: %s', str(netifs))
if target_if == '':
# try to determine which interface to use
netifs = os.listdir('/sys/class/net/')
# order matters - ETH NIC with the highest number is connected to DUT on CI runner
netifs.sort(reverse=True)
logging.info('detected interfaces: %s', str(netifs))
for netif in netifs:
if netif.find('eth') == 0 or netif.find('enx') == 0 or netif.find('enp') == 0 or netif.find('eno') == 0:
target_if = netif
break
if target_if == '':
raise Exception('no network interface found')
if 'dut_p1' in netifs:
target_if = 'dut_p1'
else:
for netif in netifs:
# if no interface defined, try to find it automatically
if netif.find('eth') == 0 or netif.find('enp') == 0 or netif.find('eno') == 0:
target_if = netif
break
elif target_if not in netifs:
target_if = ''
if target_if == '':
raise RuntimeError('network interface not found')
logging.info('Use %s for testing', target_if)
so = socket.socket(socket.AF_PACKET, socket.SOCK_RAW, socket.htons(eth_type))
@@ -76,7 +84,7 @@ def recv_eth_frame(eth_type: int, eth_if: str = '') -> str:
return str(eth_frame.load.decode().rstrip('\x00'))
def actual_test(dut: Dut) -> None:
def actual_test(dut: Dut, test_if: str = '') -> None:
# Get DUT's MAC address
res = dut.expect(
r'([\s\S]*)'
@@ -85,17 +93,17 @@ def actual_test(dut: Dut) -> None:
dut_mac = res.group(2)
# Receive "ESP32 Hello frame"
recv_eth_frame(ETH_TYPE_3)
recv_eth_frame(ETH_TYPE_3, test_if)
# Sent a message and receive its echo
message = 'ESP32 test message with EthType ' + hex(ETH_TYPE_1)
echoed = send_recv_eth_frame(message, ETH_TYPE_1, dut_mac)
echoed = send_recv_eth_frame(message, ETH_TYPE_1, dut_mac, test_if)
if echoed == message:
logging.info('PASS')
else:
raise Exception('Echoed message does not match!')
message = 'ESP32 test message with EthType ' + hex(ETH_TYPE_2)
echoed = send_recv_eth_frame(message, ETH_TYPE_2, dut_mac)
echoed = send_recv_eth_frame(message, ETH_TYPE_2, dut_mac, test_if)
if echoed == message:
logging.info('PASS')
else:
@@ -36,14 +36,16 @@ def find_target_if(my_if: str = '') -> str:
netifs.sort(reverse=True)
logging.info('detected interfaces: %s', str(netifs))
for netif in netifs:
# if no interface defined, try to find it automatically
if my_if == '':
if netif.find('eth') == 0 or netif.find('enp') == 0 or netif.find('eno') == 0:
return netif
if my_if == '':
if 'dut_p1' in netifs:
return 'dut_p1'
else:
if netif.find(my_if) == 0:
return my_if
for netif in netifs:
# if no interface defined, try to find it automatically
if netif.find('eth') == 0 or netif.find('enp') == 0 or netif.find('eno') == 0:
return netif
elif my_if in netifs:
return my_if
raise RuntimeError('network interface not found')
@@ -135,7 +137,7 @@ def test_examples_udp_multicast_proto(dut: Dut, ip_version: str = 'ipv4', nic: s
try:
data, recv_addr = sock.recvfrom(1024)
logging.info(f'Received {len(data)} bytes from {recv_addr}')
except socket.timeout:
except TimeoutError:
raise RuntimeError(f'Timeout waiting for {ip_version} multicast message from ESP32')
# Check if received from expected source
@@ -143,7 +145,7 @@ def test_examples_udp_multicast_proto(dut: Dut, ip_version: str = 'ipv4', nic: s
raise RuntimeError(f'Received {ip_version} multicast message from unexpected source')
# Send multicast message
message = '!!! Multicast test message from host !!!'.encode()
message = b'!!! Multicast test message from host !!!'
logging.info(f'Sending {ip_version} multicast message to {multicast_addr}:{PORT}')
sock.sendto(message, (multicast_addr, PORT))
if ip_version == 'ipv4_mapped':