Merge branch 'coredump_read_flash_retry_v5.5' into 'release/v5.5'

Add retry logic to espcoredump subprocess call (v5.5)

See merge request espressif/esp-idf!45318
This commit is contained in:
Roland Dobai
2026-01-21 10:09:51 +01:00
@@ -1,10 +1,11 @@
# SPDX-FileCopyrightText: 2022-2025 Espressif Systems (Shanghai) CO LTD # SPDX-FileCopyrightText: 2022-2026 Espressif Systems (Shanghai) CO LTD
# SPDX-License-Identifier: Unlicense OR CC0-1.0 # SPDX-License-Identifier: Unlicense OR CC0-1.0
import logging import logging
import os import os
import re import re
import subprocess import subprocess
import sys import sys
import time
from typing import Any from typing import Any
from typing import Dict from typing import Dict
from typing import List from typing import List
@@ -13,8 +14,8 @@ from typing import TextIO
from typing import Union from typing import Union
import pexpect import pexpect
from panic_utils import attach_logger
from panic_utils import NoGdbProcessError from panic_utils import NoGdbProcessError
from panic_utils import attach_logger
from panic_utils import quote_string from panic_utils import quote_string
from panic_utils import sha256 from panic_utils import sha256
from panic_utils import verify_valid_gdb_subprocess from panic_utils import verify_valid_gdb_subprocess
@@ -32,7 +33,10 @@ class PanicTestDut(IdfDut):
COREDUMP_UART_END = r'================= CORE DUMP END =================' COREDUMP_UART_END = r'================= CORE DUMP END ================='
COREDUMP_CHECKSUM = r"Coredump checksum='([a-fA-F0-9]+)'" COREDUMP_CHECKSUM = r"Coredump checksum='([a-fA-F0-9]+)'"
REBOOT = r'.*Rebooting\.\.\.' REBOOT = r'.*Rebooting\.\.\.'
CPU_RESET = r'.*rst:.*(RTC_SW_CPU_RST|SW_CPU_RESET|SW_CPU|RTCWDT_RTC_RESET|LP_WDT_SYS|RTCWDT_RTC_RST|CHIP_LP_WDT_RESET|RTC_WDT_SYS)\b' CPU_RESET = (
r'.*rst:.*(RTC_SW_CPU_RST|SW_CPU_RESET|SW_CPU|RTCWDT_RTC_RESET|'
r'LP_WDT_SYS|RTCWDT_RTC_RST|CHIP_LP_WDT_RESET|RTC_WDT_SYS)\b'
)
app: IdfApp app: IdfApp
serial: IdfSerial serial: IdfSerial
@@ -113,9 +117,7 @@ class PanicTestDut(IdfDut):
def expect_elf_sha256(self, caption: str = 'ELF file SHA256: ') -> None: def expect_elf_sha256(self, caption: str = 'ELF file SHA256: ') -> None:
"""Expect method for ELF SHA256 line""" """Expect method for ELF SHA256 line"""
elf_sha256 = sha256(self.app.elf_file) elf_sha256 = sha256(self.app.elf_file)
elf_sha256_len = int( elf_sha256_len = int(self.app.sdkconfig.get('CONFIG_APP_RETRIEVE_LEN_ELF_SHA', '9'))
self.app.sdkconfig.get('CONFIG_APP_RETRIEVE_LEN_ELF_SHA', '9')
)
self.expect_exact(caption + elf_sha256[0:elf_sha256_len]) self.expect_exact(caption + elf_sha256[0:elf_sha256_len])
def expect_coredump(self, output_file_name: str, patterns: List[Union[str, re.Pattern]]) -> None: def expect_coredump(self, output_file_name: str, patterns: List[Union[str, re.Pattern]]) -> None:
@@ -131,16 +133,12 @@ class PanicTestDut(IdfDut):
else: else:
raise ValueError(f'Unsupported input type: {type(pattern).__name__}') raise ValueError(f'Unsupported input type: {type(pattern).__name__}')
def _call_espcoredump( def _call_espcoredump(self, extra_args: list[str], output_file_name: str, max_retries: int = 3) -> None:
self, extra_args: List[str], output_file_name: str
) -> None:
# no "with" here, since we need the file to be open for later inspection by the test case # no "with" here, since we need the file to be open for later inspection by the test case
if not self.coredump_output: if not self.coredump_output:
self.coredump_output = open(output_file_name, 'w') self.coredump_output = open(output_file_name, 'w')
espcoredump_script = os.path.join( espcoredump_script = os.path.join(os.environ['IDF_PATH'], 'components', 'espcoredump', 'espcoredump.py')
os.environ['IDF_PATH'], 'components', 'espcoredump', 'espcoredump.py'
)
espcoredump_args = [ espcoredump_args = [
sys.executable, sys.executable,
espcoredump_script, espcoredump_script,
@@ -153,18 +151,31 @@ class PanicTestDut(IdfDut):
logging.info('espcoredump output is written to %s', self.coredump_output.name) logging.info('espcoredump output is written to %s', self.coredump_output.name)
self.serial.close() self.serial.close()
try: for attempt in range(max_retries):
subprocess.check_call(espcoredump_args, stdout=self.coredump_output, stderr=self.coredump_output) try:
except subprocess.CalledProcessError: if attempt > 0:
self.coredump_output.flush() # Reset output file for retry
with open(output_file_name, 'r') as file: time.sleep(1)
logging.error('espcoredump failed with output: %s', file.read()) self.coredump_output.seek(0)
raise self.coredump_output.truncate()
finally: logging.info(f'Retrying espcoredump (attempt {attempt + 1}/{max_retries})')
self.coredump_output.seek(0) subprocess.check_call(espcoredump_args, stdout=self.coredump_output, stderr=self.coredump_output)
self.coredump_output.seek(0)
return # Success
except subprocess.CalledProcessError:
self.coredump_output.flush()
with open(output_file_name) as file:
content = file.read()
if attempt < max_retries - 1:
logging.warning(f'espcoredump attempt {attempt + 1}/{max_retries} failed with output: {content}')
else:
logging.error(f'espcoredump failed after {max_retries} attempts with output: {content}')
raise
def process_coredump_uart( def process_coredump_uart(
self, coredump_base64: Any, expected: Optional[List[Union[str, re.Pattern]]] = None, self,
coredump_base64: Any,
expected: Optional[List[Union[str, re.Pattern]]] = None,
) -> Any: ) -> Any:
with open(os.path.join(self.logdir, 'coredump_data.b64'), 'w') as coredump_file: with open(os.path.join(self.logdir, 'coredump_data.b64'), 'w') as coredump_file:
logging.info('Writing UART base64 core dump to %s', coredump_file.name) logging.info('Writing UART base64 core dump to %s', coredump_file.name)
@@ -183,9 +194,7 @@ class PanicTestDut(IdfDut):
coredump_file_name = os.path.join(self.logdir, 'coredump_data.bin') coredump_file_name = os.path.join(self.logdir, 'coredump_data.bin')
logging.info('Writing flash binary core dump to %s', coredump_file_name) logging.info('Writing flash binary core dump to %s', coredump_file_name)
output_file_name = os.path.join(self.logdir, 'coredump_flash_result.txt') output_file_name = os.path.join(self.logdir, 'coredump_flash_result.txt')
self._call_espcoredump( self._call_espcoredump(['--core-format', 'raw', '--save-core', coredump_file_name], output_file_name)
['--core-format', 'raw', '--save-core', coredump_file_name], output_file_name
)
if expected: if expected:
self.expect_coredump(output_file_name, expected) self.expect_coredump(output_file_name, expected)
return coredump_file_name return coredump_file_name
@@ -210,12 +219,14 @@ class PanicTestDut(IdfDut):
gdb_path = 'riscv32-esp-elf-gdb' gdb_path = 'riscv32-esp-elf-gdb'
try: try:
from pygdbmi.constants import GdbTimeoutError from pygdbmi.constants import GdbTimeoutError
gdb_command = [gdb_path] + gdb_args gdb_command = [gdb_path] + gdb_args
self.gdbmi = GdbController(command=gdb_command) self.gdbmi = GdbController(command=gdb_command)
pygdbmi_logger = attach_logger() pygdbmi_logger = attach_logger()
except ImportError: except ImportError:
# fallback for pygdbmi<0.10.0.0. # fallback for pygdbmi<0.10.0.0.
from pygdbmi.gdbcontroller import GdbTimeoutError from pygdbmi.gdbcontroller import GdbTimeoutError
self.gdbmi = GdbController(gdb_path=gdb_path, gdb_args=gdb_args) self.gdbmi = GdbController(gdb_path=gdb_path, gdb_args=gdb_args)
pygdbmi_logger = self.gdbmi.logger pygdbmi_logger = self.gdbmi.logger
@@ -225,9 +236,7 @@ class PanicTestDut(IdfDut):
while pygdbmi_logger.hasHandlers(): while pygdbmi_logger.hasHandlers():
pygdbmi_logger.removeHandler(pygdbmi_logger.handlers[0]) pygdbmi_logger.removeHandler(pygdbmi_logger.handlers[0])
log_handler = logging.FileHandler(pygdbmi_log_file_name) log_handler = logging.FileHandler(pygdbmi_log_file_name)
log_handler.setFormatter( log_handler.setFormatter(logging.Formatter('%(asctime)s %(levelname)s: %(message)s'))
logging.Formatter('%(asctime)s %(levelname)s: %(message)s')
)
logging.info(f'Saving pygdbmi logs to {pygdbmi_log_file_name}') logging.info(f'Saving pygdbmi logs to {pygdbmi_log_file_name}')
pygdbmi_logger.addHandler(log_handler) pygdbmi_logger.addHandler(log_handler)
try: try:
@@ -251,16 +260,12 @@ class PanicTestDut(IdfDut):
logging.info('GDB response: %s', resp) logging.info('GDB response: %s', resp)
break # success break # success
except GdbTimeoutError: except GdbTimeoutError:
logging.warning( logging.warning('GDB internal error: cannot get response from the subprocess')
'GDB internal error: cannot get response from the subprocess'
)
except NoGdbProcessError: except NoGdbProcessError:
logging.error('GDB internal error: process is not running') logging.error('GDB internal error: process is not running')
break # failure - TODO: create another GdbController break # failure - TODO: create another GdbController
except ValueError: except ValueError:
logging.error( logging.error('GDB internal error: select() returned an unexpected file number')
'GDB internal error: select() returned an unexpected file number'
)
# Set up logging for GDB remote protocol # Set up logging for GDB remote protocol
gdb_remotelog_file_name = os.path.join(self.logdir, 'gdb_remote_log.txt') gdb_remotelog_file_name = os.path.join(self.logdir, 'gdb_remote_log.txt')
@@ -271,7 +276,6 @@ class PanicTestDut(IdfDut):
# Prepare gdb for the gdb stub # Prepare gdb for the gdb stub
def start_gdb_for_gdbstub(self) -> None: def start_gdb_for_gdbstub(self) -> None:
self.run_gdb() self.run_gdb()
# Connect GDB to UART # Connect GDB to UART
@@ -280,8 +284,9 @@ class PanicTestDut(IdfDut):
self.gdb_write('-gdb-set serial baud 115200') self.gdb_write('-gdb-set serial baud 115200')
if sys.platform == 'darwin': if sys.platform == 'darwin':
assert '/dev/tty.' not in self.serial.port, \ assert '/dev/tty.' not in self.serial.port, (
'/dev/tty.* ports can\'t be used with GDB on macOS. Use with /dev/cu.* instead.' "/dev/tty.* ports can't be used with GDB on macOS. Use with /dev/cu.* instead."
)
# Make sure we get the 'stopped' notification # Make sure we get the 'stopped' notification
responses = self.gdb_write('-target-select remote ' + self.serial.port) responses = self.gdb_write('-target-select remote ' + self.serial.port)
@@ -307,7 +312,6 @@ class PanicTestDut(IdfDut):
# Prepare gdb to debug coredump file # Prepare gdb to debug coredump file
def start_gdb_for_coredump(self, elf_file: str) -> None: def start_gdb_for_coredump(self, elf_file: str) -> None:
self.run_gdb() self.run_gdb()
self.gdb_write('core {}'.format(elf_file)) self.gdb_write('core {}'.format(elf_file))
@@ -326,9 +330,7 @@ class PanicTestDut(IdfDut):
return self.find_gdb_response('done', 'result', responses)['payload']['value'] return self.find_gdb_response('done', 'result', responses)['payload']['value']
@staticmethod @staticmethod
def verify_gdb_backtrace( def verify_gdb_backtrace(gdb_backtrace: List[Any], expected_functions_list: List[Any]) -> None:
gdb_backtrace: List[Any], expected_functions_list: List[Any]
) -> None:
""" """
Raises an assert if the function names listed in expected_functions_list do not match the backtrace Raises an assert if the function names listed in expected_functions_list do not match the backtrace
given by gdb_backtrace argument. The latter is in the same format as returned by gdb_backtrace() given by gdb_backtrace argument. The latter is in the same format as returned by gdb_backtrace()
@@ -341,9 +343,7 @@ class PanicTestDut(IdfDut):
assert False, 'Got unexpected backtrace' assert False, 'Got unexpected backtrace'
@staticmethod @staticmethod
def find_gdb_response( def find_gdb_response(message: str, response_type: str, responses: List[Any]) -> Any:
message: str, response_type: str, responses: List[Any]
) -> Any:
""" """
Helper function which extracts one response from an array of GDB responses, filtering Helper function which extracts one response from an array of GDB responses, filtering
by message and type. Returned message is a dictionary, refer to pygdbmi docs for the format. by message and type. Returned message is a dictionary, refer to pygdbmi docs for the format.