From 6b490a278cb3869f07fa456bb992c37b07f8f974 Mon Sep 17 00:00:00 2001 From: wanckl Date: Fri, 14 Nov 2025 15:25:41 +0800 Subject: [PATCH] fix(driver_twai): enhance ci test and fix example --- .../test_apps/legacy_twai/pytest_twai.py | 73 +++++++++++++++---- .../test_apps/test_twai/pytest_driver_twai.py | 70 ++++++++++++++---- 2 files changed, 113 insertions(+), 30 deletions(-) diff --git a/components/driver/test_apps/legacy_twai/pytest_twai.py b/components/driver/test_apps/legacy_twai/pytest_twai.py index dd3a5d24e8..1c6086ceaf 100644 --- a/components/driver/test_apps/legacy_twai/pytest_twai.py +++ b/components/driver/test_apps/legacy_twai/pytest_twai.py @@ -2,7 +2,7 @@ # SPDX-License-Identifier: CC0-1.0 import logging import subprocess -from time import sleep +import time import pytest from can import Bus @@ -10,6 +10,10 @@ from can import Message from pytest_embedded import Dut from pytest_embedded_idf.utils import idf_parametrize +# --------------------------------------------------------------------------- +# Loop Back Tests +# --------------------------------------------------------------------------- + @pytest.mark.generic @pytest.mark.parametrize( @@ -24,16 +28,57 @@ def test_legacy_twai_self(dut: Dut) -> None: dut.run_all_single_board_cases(group='twai-loop-back') +# --------------------------------------------------------------------------- +# Helper Functions +# --------------------------------------------------------------------------- + + +def esp_enter_flash_mode(dut: Dut) -> None: + ser = dut.serial.proc + ser.setRTS(True) # EN Low + time.sleep(0.5) + ser.setDTR(True) # GPIO0 Low + ser.setRTS(False) # EN High + dut.expect('waiting for download', timeout=2) + ser.setDTR(False) # Back RTS/DTR to 1/1 to avoid affect to esptool + + +def esp_reset_and_wait_ready(dut: Dut) -> None: + dut.serial.hard_reset() + time.sleep(0.5) + dut.expect_exact('Press ENTER to see the list of tests') + + @pytest.fixture(name='socket_can') def fixture_create_socket_can() -> Bus: # Set up the socket CAN with the bitrate - start_command = 'sudo ip link set can0 up type can bitrate 250000 restart-ms 100' - stop_command = 'sudo ip link set can0 down' - subprocess.run(start_command, shell=True, capture_output=True, text=True) - bus = Bus(interface='socketcan', channel='can0', bitrate=250000) - yield bus # test invoked here - bus.shutdown() - subprocess.run(stop_command, shell=True, capture_output=True, text=True) + start_command = 'sudo -n ip link set can0 up type can bitrate 250000 restart-ms 100' + stop_command = 'sudo -n ip link set can0 down' + status_command = 'sudo -n ip -details link show can0' + + try: + result = subprocess.run(status_command, shell=True, capture_output=True, text=True) + if result.returncode != 0: + raise Exception('CAN interface "can0" not found') + + if 'UP' in result.stdout: # Close the bus anyway if it is already up + subprocess.run(stop_command, shell=True, capture_output=True, text=True) + subprocess.run(start_command, shell=True, capture_output=True, text=True) + + time.sleep(0.5) + bus = Bus(interface='socketcan', channel='can0', bitrate=250000) + yield bus # test invoked here + + bus.shutdown() + except Exception as e: + pytest.skip(f'Open usb-can bus Error: {str(e)}') + finally: + subprocess.run(stop_command, shell=True, capture_output=True, text=True) + + +# --------------------------------------------------------------------------- +# Interactive Tests +# --------------------------------------------------------------------------- @pytest.mark.twai_std @@ -46,14 +91,13 @@ def fixture_create_socket_can() -> Bus: ) @idf_parametrize('target', ['esp32', 'esp32c3', 'esp32c6', 'esp32h2', 'esp32s2', 'esp32s3'], indirect=['target']) def test_legacy_twai_listen_only(dut: Dut, socket_can: Bus) -> None: - dut.serial.hard_reset() - dut.expect_exact('Press ENTER to see the list of tests') + esp_reset_and_wait_ready(dut) # TEST_CASE("twai_listen_only", "[twai]") dut.write('"twai_listen_only"') - # wait the DUT to block at the receive API - sleep(0.03) + # wait the DUT to start listening + time.sleep(0.1) message = Message( arbitration_id=0x123, @@ -62,6 +106,7 @@ def test_legacy_twai_listen_only(dut: Dut, socket_can: Bus) -> None: ) socket_can.send(message, timeout=0.2) dut.expect_unity_test_output() + esp_enter_flash_mode(dut) @pytest.mark.twai_std @@ -74,8 +119,7 @@ def test_legacy_twai_listen_only(dut: Dut, socket_can: Bus) -> None: ) @idf_parametrize('target', ['esp32', 'esp32c3', 'esp32c6', 'esp32h2', 'esp32s2', 'esp32s3'], indirect=['target']) def test_legacy_twai_remote_request(dut: Dut, socket_can: Bus) -> None: - dut.serial.hard_reset() - dut.expect_exact('Press ENTER to see the list of tests') + esp_reset_and_wait_ready(dut) # TEST_CASE("twai_remote_request", "[twai]") dut.write('"twai_remote_request"') @@ -97,3 +141,4 @@ def test_legacy_twai_remote_request(dut: Dut, socket_can: Bus) -> None: print('send', reply) dut.expect_unity_test_output() + esp_enter_flash_mode(dut) diff --git a/components/esp_driver_twai/test_apps/test_twai/pytest_driver_twai.py b/components/esp_driver_twai/test_apps/test_twai/pytest_driver_twai.py index 846b2bf8dc..6c22dad488 100644 --- a/components/esp_driver_twai/test_apps/test_twai/pytest_driver_twai.py +++ b/components/esp_driver_twai/test_apps/test_twai/pytest_driver_twai.py @@ -2,7 +2,7 @@ # SPDX-License-Identifier: Apache-2.0 import subprocess -from time import sleep +import time import pytest from can import Bus @@ -12,6 +12,9 @@ from pytest_embedded_idf.utils import idf_parametrize from pytest_embedded_idf.utils import soc_filtered_targets +# --------------------------------------------------------------------------- +# Loop Back Tests +# --------------------------------------------------------------------------- @pytest.mark.generic @pytest.mark.temp_skip_ci(targets=['esp32p4'], reason='p4 rev3 migration, IDF-14393') @pytest.mark.parametrize('config', ['release', 'cache_safe'], indirect=True) @@ -20,43 +23,77 @@ def test_driver_twai_loopbk(dut: Dut) -> None: dut.run_all_single_board_cases(group='twai', reset=True) -# -------------------------------- test twai interactive ------------------------------ +# --------------------------------------------------------------------------- +# Helper Functions +# --------------------------------------------------------------------------- + + +def esp_enter_flash_mode(dut: Dut) -> None: + ser = dut.serial.proc + ser.setRTS(True) # EN Low + time.sleep(0.5) + ser.setDTR(True) # GPIO0 Low + ser.setRTS(False) # EN High + dut.expect('waiting for download', timeout=2) + ser.setDTR(False) # Back RTS/DTR to 1/1 to avoid affect to esptool + + +def esp_reset_and_wait_ready(dut: Dut) -> None: + dut.serial.hard_reset() + time.sleep(0.5) + dut.expect_exact('Press ENTER to see the list of tests') + + @pytest.fixture(name='socket_can') def fixture_create_socket_can() -> Bus: # Set up the socket CAN with the bitrate - start_command = 'sudo ip link set can0 up type can bitrate 250000' - stop_command = 'sudo ip link set can0 down' + start_command = 'sudo -n ip link set can0 up type can bitrate 250000 restart-ms 100' + stop_command = 'sudo -n ip link set can0 down' + status_command = 'sudo -n ip -details link show can0' + try: + result = subprocess.run(status_command, shell=True, capture_output=True, text=True) + if result.returncode != 0: + raise Exception('CAN interface "can0" not found') + + if 'UP' in result.stdout: # Close the bus anyway if it is already up + subprocess.run(stop_command, shell=True, capture_output=True, text=True) subprocess.run(start_command, shell=True, capture_output=True, text=True) + + time.sleep(0.5) + bus = Bus(interface='socketcan', channel='can0', bitrate=250000) + yield bus # test invoked here + + bus.shutdown() except Exception as e: - print(f'Open bus Error: {e}') - bus = Bus(interface='socketcan', channel='can0', bitrate=250000) - yield bus # test invoked here - bus.shutdown() - subprocess.run(stop_command, shell=True, capture_output=True, text=True) + pytest.skip(f'Open usb-can bus Error: {str(e)}') + finally: + subprocess.run(stop_command, shell=True, capture_output=True, text=True) +# --------------------------------------------------------------------------- +# Interactive Tests +# --------------------------------------------------------------------------- @pytest.mark.twai_std @pytest.mark.temp_skip_ci(targets=['esp32p4'], reason='p4 rev3 migration, IDF-14393') @pytest.mark.parametrize('config', ['release'], indirect=True) @idf_parametrize('target', soc_filtered_targets('SOC_TWAI_SUPPORTED == 1'), indirect=['target']) def test_driver_twai_listen_only(dut: Dut, socket_can: Bus) -> None: - dut.serial.hard_reset() - dut.expect_exact('Press ENTER to see the list of tests') + esp_reset_and_wait_ready(dut) dut.write('"twai_listen_only"') # wait the DUT to finish initialize - sleep(0.1) + time.sleep(0.1) message = Message( arbitration_id=0x6688, is_extended_id=True, data=[0x11, 0x22, 0x33, 0x44, 0x55, 0x66, 0x77, 0x88], ) - print('USB Socket CAN Send:', message) - socket_can.send(message, timeout=0.2) + print('USB Socket CAN Send:', message, 'Return:', socket_can.send(message)) dut.expect_unity_test_output(timeout=10) + esp_enter_flash_mode(dut) @pytest.mark.twai_std @@ -64,8 +101,7 @@ def test_driver_twai_listen_only(dut: Dut, socket_can: Bus) -> None: @pytest.mark.parametrize('config', ['release'], indirect=True) @idf_parametrize('target', soc_filtered_targets('SOC_TWAI_SUPPORTED == 1'), indirect=['target']) def test_driver_twai_remote_request(dut: Dut, socket_can: Bus) -> None: - dut.serial.hard_reset() - dut.expect_exact('Press ENTER to see the list of tests') + esp_reset_and_wait_ready(dut) dut.write('"twai_remote_request"') @@ -83,4 +119,6 @@ def test_driver_twai_remote_request(dut: Dut, socket_can: Bus) -> None: ) socket_can.send(reply, timeout=0.2) print('USB Socket CAN Replied:', reply) + dut.expect_unity_test_output(timeout=10) + esp_enter_flash_mode(dut)