fix(driver_twai): enhance ci test and fix example
This commit is contained in:
@@ -2,7 +2,7 @@
|
||||
# SPDX-License-Identifier: CC0-1.0
|
||||
import logging
|
||||
import subprocess
|
||||
from time import sleep
|
||||
import time
|
||||
|
||||
import pytest
|
||||
from can import Bus
|
||||
@@ -10,6 +10,10 @@ from can import Message
|
||||
from pytest_embedded import Dut
|
||||
from pytest_embedded_idf.utils import idf_parametrize
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Loop Back Tests
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
@pytest.mark.generic
|
||||
@pytest.mark.parametrize(
|
||||
@@ -24,16 +28,57 @@ def test_legacy_twai_self(dut: Dut) -> None:
|
||||
dut.run_all_single_board_cases(group='twai-loop-back')
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Helper Functions
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def esp_enter_flash_mode(dut: Dut) -> None:
|
||||
ser = dut.serial.proc
|
||||
ser.setRTS(True) # EN Low
|
||||
time.sleep(0.5)
|
||||
ser.setDTR(True) # GPIO0 Low
|
||||
ser.setRTS(False) # EN High
|
||||
dut.expect('waiting for download', timeout=2)
|
||||
ser.setDTR(False) # Back RTS/DTR to 1/1 to avoid affect to esptool
|
||||
|
||||
|
||||
def esp_reset_and_wait_ready(dut: Dut) -> None:
|
||||
dut.serial.hard_reset()
|
||||
time.sleep(0.5)
|
||||
dut.expect_exact('Press ENTER to see the list of tests')
|
||||
|
||||
|
||||
@pytest.fixture(name='socket_can')
|
||||
def fixture_create_socket_can() -> Bus:
|
||||
# Set up the socket CAN with the bitrate
|
||||
start_command = 'sudo ip link set can0 up type can bitrate 250000 restart-ms 100'
|
||||
stop_command = 'sudo ip link set can0 down'
|
||||
subprocess.run(start_command, shell=True, capture_output=True, text=True)
|
||||
bus = Bus(interface='socketcan', channel='can0', bitrate=250000)
|
||||
yield bus # test invoked here
|
||||
bus.shutdown()
|
||||
subprocess.run(stop_command, shell=True, capture_output=True, text=True)
|
||||
start_command = 'sudo -n ip link set can0 up type can bitrate 250000 restart-ms 100'
|
||||
stop_command = 'sudo -n ip link set can0 down'
|
||||
status_command = 'sudo -n ip -details link show can0'
|
||||
|
||||
try:
|
||||
result = subprocess.run(status_command, shell=True, capture_output=True, text=True)
|
||||
if result.returncode != 0:
|
||||
raise Exception('CAN interface "can0" not found')
|
||||
|
||||
if 'UP' in result.stdout: # Close the bus anyway if it is already up
|
||||
subprocess.run(stop_command, shell=True, capture_output=True, text=True)
|
||||
subprocess.run(start_command, shell=True, capture_output=True, text=True)
|
||||
|
||||
time.sleep(0.5)
|
||||
bus = Bus(interface='socketcan', channel='can0', bitrate=250000)
|
||||
yield bus # test invoked here
|
||||
|
||||
bus.shutdown()
|
||||
except Exception as e:
|
||||
pytest.skip(f'Open usb-can bus Error: {str(e)}')
|
||||
finally:
|
||||
subprocess.run(stop_command, shell=True, capture_output=True, text=True)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Interactive Tests
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
@pytest.mark.twai_std
|
||||
@@ -46,14 +91,13 @@ def fixture_create_socket_can() -> Bus:
|
||||
)
|
||||
@idf_parametrize('target', ['esp32', 'esp32c3', 'esp32c6', 'esp32h2', 'esp32s2', 'esp32s3'], indirect=['target'])
|
||||
def test_legacy_twai_listen_only(dut: Dut, socket_can: Bus) -> None:
|
||||
dut.serial.hard_reset()
|
||||
dut.expect_exact('Press ENTER to see the list of tests')
|
||||
esp_reset_and_wait_ready(dut)
|
||||
|
||||
# TEST_CASE("twai_listen_only", "[twai]")
|
||||
dut.write('"twai_listen_only"')
|
||||
|
||||
# wait the DUT to block at the receive API
|
||||
sleep(0.03)
|
||||
# wait the DUT to start listening
|
||||
time.sleep(0.1)
|
||||
|
||||
message = Message(
|
||||
arbitration_id=0x123,
|
||||
@@ -62,6 +106,7 @@ def test_legacy_twai_listen_only(dut: Dut, socket_can: Bus) -> None:
|
||||
)
|
||||
socket_can.send(message, timeout=0.2)
|
||||
dut.expect_unity_test_output()
|
||||
esp_enter_flash_mode(dut)
|
||||
|
||||
|
||||
@pytest.mark.twai_std
|
||||
@@ -74,8 +119,7 @@ def test_legacy_twai_listen_only(dut: Dut, socket_can: Bus) -> None:
|
||||
)
|
||||
@idf_parametrize('target', ['esp32', 'esp32c3', 'esp32c6', 'esp32h2', 'esp32s2', 'esp32s3'], indirect=['target'])
|
||||
def test_legacy_twai_remote_request(dut: Dut, socket_can: Bus) -> None:
|
||||
dut.serial.hard_reset()
|
||||
dut.expect_exact('Press ENTER to see the list of tests')
|
||||
esp_reset_and_wait_ready(dut)
|
||||
|
||||
# TEST_CASE("twai_remote_request", "[twai]")
|
||||
dut.write('"twai_remote_request"')
|
||||
@@ -97,3 +141,4 @@ def test_legacy_twai_remote_request(dut: Dut, socket_can: Bus) -> None:
|
||||
print('send', reply)
|
||||
|
||||
dut.expect_unity_test_output()
|
||||
esp_enter_flash_mode(dut)
|
||||
|
||||
@@ -2,7 +2,7 @@
|
||||
# SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
import subprocess
|
||||
from time import sleep
|
||||
import time
|
||||
|
||||
import pytest
|
||||
from can import Bus
|
||||
@@ -12,6 +12,9 @@ from pytest_embedded_idf.utils import idf_parametrize
|
||||
from pytest_embedded_idf.utils import soc_filtered_targets
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Loop Back Tests
|
||||
# ---------------------------------------------------------------------------
|
||||
@pytest.mark.generic
|
||||
@pytest.mark.temp_skip_ci(targets=['esp32p4'], reason='p4 rev3 migration, IDF-14393')
|
||||
@pytest.mark.parametrize('config', ['release', 'cache_safe'], indirect=True)
|
||||
@@ -20,43 +23,77 @@ def test_driver_twai_loopbk(dut: Dut) -> None:
|
||||
dut.run_all_single_board_cases(group='twai', reset=True)
|
||||
|
||||
|
||||
# -------------------------------- test twai interactive ------------------------------
|
||||
# ---------------------------------------------------------------------------
|
||||
# Helper Functions
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def esp_enter_flash_mode(dut: Dut) -> None:
|
||||
ser = dut.serial.proc
|
||||
ser.setRTS(True) # EN Low
|
||||
time.sleep(0.5)
|
||||
ser.setDTR(True) # GPIO0 Low
|
||||
ser.setRTS(False) # EN High
|
||||
dut.expect('waiting for download', timeout=2)
|
||||
ser.setDTR(False) # Back RTS/DTR to 1/1 to avoid affect to esptool
|
||||
|
||||
|
||||
def esp_reset_and_wait_ready(dut: Dut) -> None:
|
||||
dut.serial.hard_reset()
|
||||
time.sleep(0.5)
|
||||
dut.expect_exact('Press ENTER to see the list of tests')
|
||||
|
||||
|
||||
@pytest.fixture(name='socket_can')
|
||||
def fixture_create_socket_can() -> Bus:
|
||||
# Set up the socket CAN with the bitrate
|
||||
start_command = 'sudo ip link set can0 up type can bitrate 250000'
|
||||
stop_command = 'sudo ip link set can0 down'
|
||||
start_command = 'sudo -n ip link set can0 up type can bitrate 250000 restart-ms 100'
|
||||
stop_command = 'sudo -n ip link set can0 down'
|
||||
status_command = 'sudo -n ip -details link show can0'
|
||||
|
||||
try:
|
||||
result = subprocess.run(status_command, shell=True, capture_output=True, text=True)
|
||||
if result.returncode != 0:
|
||||
raise Exception('CAN interface "can0" not found')
|
||||
|
||||
if 'UP' in result.stdout: # Close the bus anyway if it is already up
|
||||
subprocess.run(stop_command, shell=True, capture_output=True, text=True)
|
||||
subprocess.run(start_command, shell=True, capture_output=True, text=True)
|
||||
|
||||
time.sleep(0.5)
|
||||
bus = Bus(interface='socketcan', channel='can0', bitrate=250000)
|
||||
yield bus # test invoked here
|
||||
|
||||
bus.shutdown()
|
||||
except Exception as e:
|
||||
print(f'Open bus Error: {e}')
|
||||
bus = Bus(interface='socketcan', channel='can0', bitrate=250000)
|
||||
yield bus # test invoked here
|
||||
bus.shutdown()
|
||||
subprocess.run(stop_command, shell=True, capture_output=True, text=True)
|
||||
pytest.skip(f'Open usb-can bus Error: {str(e)}')
|
||||
finally:
|
||||
subprocess.run(stop_command, shell=True, capture_output=True, text=True)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Interactive Tests
|
||||
# ---------------------------------------------------------------------------
|
||||
@pytest.mark.twai_std
|
||||
@pytest.mark.temp_skip_ci(targets=['esp32p4'], reason='p4 rev3 migration, IDF-14393')
|
||||
@pytest.mark.parametrize('config', ['release'], indirect=True)
|
||||
@idf_parametrize('target', soc_filtered_targets('SOC_TWAI_SUPPORTED == 1'), indirect=['target'])
|
||||
def test_driver_twai_listen_only(dut: Dut, socket_can: Bus) -> None:
|
||||
dut.serial.hard_reset()
|
||||
dut.expect_exact('Press ENTER to see the list of tests')
|
||||
esp_reset_and_wait_ready(dut)
|
||||
|
||||
dut.write('"twai_listen_only"')
|
||||
|
||||
# wait the DUT to finish initialize
|
||||
sleep(0.1)
|
||||
time.sleep(0.1)
|
||||
|
||||
message = Message(
|
||||
arbitration_id=0x6688,
|
||||
is_extended_id=True,
|
||||
data=[0x11, 0x22, 0x33, 0x44, 0x55, 0x66, 0x77, 0x88],
|
||||
)
|
||||
print('USB Socket CAN Send:', message)
|
||||
socket_can.send(message, timeout=0.2)
|
||||
print('USB Socket CAN Send:', message, 'Return:', socket_can.send(message))
|
||||
dut.expect_unity_test_output(timeout=10)
|
||||
esp_enter_flash_mode(dut)
|
||||
|
||||
|
||||
@pytest.mark.twai_std
|
||||
@@ -64,8 +101,7 @@ def test_driver_twai_listen_only(dut: Dut, socket_can: Bus) -> None:
|
||||
@pytest.mark.parametrize('config', ['release'], indirect=True)
|
||||
@idf_parametrize('target', soc_filtered_targets('SOC_TWAI_SUPPORTED == 1'), indirect=['target'])
|
||||
def test_driver_twai_remote_request(dut: Dut, socket_can: Bus) -> None:
|
||||
dut.serial.hard_reset()
|
||||
dut.expect_exact('Press ENTER to see the list of tests')
|
||||
esp_reset_and_wait_ready(dut)
|
||||
|
||||
dut.write('"twai_remote_request"')
|
||||
|
||||
@@ -83,4 +119,6 @@ def test_driver_twai_remote_request(dut: Dut, socket_can: Bus) -> None:
|
||||
)
|
||||
socket_can.send(reply, timeout=0.2)
|
||||
print('USB Socket CAN Replied:', reply)
|
||||
|
||||
dut.expect_unity_test_output(timeout=10)
|
||||
esp_enter_flash_mode(dut)
|
||||
|
||||
Reference in New Issue
Block a user