# Copyright 2014-present PlatformIO # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import fnmatch import os import json import subprocess import sys import shutil import logging from typing import Optional, Dict, List, Any from platformio.public import PlatformBase, to_unix_path from platformio.proc import get_pythonexe_path from platformio.project.config import ProjectConfig from platformio.package.manager.tool import ToolPackageManager # Constants RETRY_LIMIT = 3 SUBPROCESS_TIMEOUT = 300 DEFAULT_DEBUG_SPEED = "5000" DEFAULT_APP_OFFSET = "0x10000" # MCUs that support ESP-builtin debug ESP_BUILTIN_DEBUG_MCUS = frozenset([ "esp32c3", "esp32c5", "esp32c6", "esp32s3", "esp32h2", "esp32p4" ]) # MCU configuration mapping MCU_TOOLCHAIN_CONFIG = { "xtensa": { "mcus": frozenset(["esp32", "esp32s2", "esp32s3"]), "toolchains": ["toolchain-xtensa-esp-elf"], "debug_tools": ["tool-xtensa-esp-elf-gdb"] }, "riscv": { "mcus": frozenset([ "esp32c2", "esp32c3", "esp32c5", "esp32c6", "esp32h2", "esp32p4" ]), "toolchains": ["toolchain-riscv32-esp"], "debug_tools": ["tool-riscv32-esp-elf-gdb"] } } COMMON_IDF_PACKAGES = [ "tool-cmake", "tool-ninja", "tool-scons", "tool-esp-rom-elfs" ] CHECK_PACKAGES = [ "tool-cppcheck", "tool-clangtidy", "tool-pvs-studio" ] # System-specific configuration IS_WINDOWS = sys.platform.startswith("win") # Set Platformio env var to use windows_amd64 for all windows architectures # only windows_amd64 native espressif toolchains are available # needs platformio/pioarduino core >= 6.1.17 if IS_WINDOWS: os.environ["PLATFORMIO_SYSTEM_TYPE"] = "windows_amd64" # Clear IDF_TOOLS_PATH, if set tools may be installed in the wrong place os.environ["IDF_TOOLS_PATH"] = "" # Global variables python_exe = get_pythonexe_path() pm = ToolPackageManager() # Configure logger logger = logging.getLogger(__name__) def safe_file_operation(operation_func): """Decorator for safe filesystem operations with error handling.""" def wrapper(*args, **kwargs): try: return operation_func(*args, **kwargs) except (OSError, IOError, FileNotFoundError) as e: logger.error(f"Filesystem error in {operation_func.__name__}: {e}") return False except Exception as e: logger.error(f"Unexpected error in {operation_func.__name__}: {e}") raise # Re-raise unexpected exceptions return wrapper @safe_file_operation def safe_remove_directory(path: str) -> bool: """Safely remove directories with error handling.""" if os.path.exists(path) and os.path.isdir(path): shutil.rmtree(path) logger.debug(f"Directory removed: {path}") return True @safe_file_operation def safe_remove_directory_pattern(base_path: str, pattern: str) -> bool: """Safely remove directories matching a pattern with error handling.""" if not os.path.exists(base_path): return True # Find all directories matching the pattern in the base directory for item in os.listdir(base_path): item_path = os.path.join(base_path, item) if os.path.isdir(item_path) and fnmatch.fnmatch(item, pattern): shutil.rmtree(item_path) logger.debug(f"Directory removed: {item_path}") return True @safe_file_operation def safe_copy_file(src: str, dst: str) -> bool: """Safely copy files with error handling.""" os.makedirs(os.path.dirname(dst), exist_ok=True) shutil.copyfile(src, dst) logger.debug(f"File copied: {src} -> {dst}") return True class Espressif32Platform(PlatformBase): """ESP32 platform implementation for PlatformIO with optimized toolchain management.""" def __init__(self, *args, **kwargs): """Initialize the ESP32 platform with caching mechanisms.""" super().__init__(*args, **kwargs) self._packages_dir = None self._tools_cache = {} self._mcu_config_cache = {} @property def packages_dir(self) -> str: """Get cached packages directory path.""" if self._packages_dir is None: config = ProjectConfig.get_instance() self._packages_dir = config.get("platformio", "packages_dir") return self._packages_dir def _get_tool_paths(self, tool_name: str) -> Dict[str, str]: """Get centralized path calculation for tools with caching.""" if tool_name not in self._tools_cache: tool_path = os.path.join(self.packages_dir, tool_name) # Remove all directories containing '@' in their name try: if os.path.exists(self.packages_dir) and os.path.isdir(self.packages_dir): for item in os.listdir(self.packages_dir): if '@' in item and item.startswith(tool_name): item_path = os.path.join(self.packages_dir, item) if os.path.isdir(item_path): safe_remove_directory(item_path) logger.debug(f"Removed directory with '@' in name: {item_path}") except OSError as e: logger.error(f"Error scanning packages directory for '@' directories: {e}") self._tools_cache[tool_name] = { 'tool_path': tool_path, 'package_path': os.path.join(tool_path, "package.json"), 'tools_json_path': os.path.join(tool_path, "tools.json"), 'piopm_path': os.path.join(tool_path, ".piopm"), 'idf_tools_path': os.path.join( self.packages_dir, "tl-install", "tools", "idf_tools.py" ) } return self._tools_cache[tool_name] def _check_tool_status(self, tool_name: str) -> Dict[str, bool]: """Check the installation status of a tool.""" paths = self._get_tool_paths(tool_name) return { 'has_idf_tools': os.path.exists(paths['idf_tools_path']), 'has_tools_json': os.path.exists(paths['tools_json_path']), 'has_piopm': os.path.exists(paths['piopm_path']), 'tool_exists': os.path.exists(paths['tool_path']) } def _run_idf_tools_install(self, tools_json_path: str, idf_tools_path: str) -> bool: """Execute idf_tools.py install command with timeout and error handling.""" cmd = [ python_exe, idf_tools_path, "--quiet", "--non-interactive", "--tools-json", tools_json_path, "install" ] try: result = subprocess.run( cmd, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, timeout=SUBPROCESS_TIMEOUT, check=False ) if result.returncode != 0: logger.error("idf_tools.py installation failed") return False logger.debug("idf_tools.py executed successfully") return True except subprocess.TimeoutExpired: logger.error(f"Timeout in idf_tools.py after {SUBPROCESS_TIMEOUT}s") return False except (subprocess.SubprocessError, OSError) as e: logger.error(f"Error in idf_tools.py: {e}") return False def _check_tool_version(self, tool_name: str) -> bool: """Check if the installed tool version matches the required version.""" paths = self._get_tool_paths(tool_name) try: with open(paths['package_path'], 'r', encoding='utf-8') as f: package_data = json.load(f) required_version = self.packages.get(tool_name, {}).get("package-version") installed_version = package_data.get("version") if not required_version: logger.debug(f"No version check required for {tool_name}") return True if not installed_version: logger.warning(f"Installed version for {tool_name} unknown") return False version_match = required_version == installed_version if not version_match: logger.info( f"Version mismatch for {tool_name}: " f"{installed_version} != {required_version}" ) return version_match except (json.JSONDecodeError, FileNotFoundError) as e: logger.error(f"Error reading package data for {tool_name}: {e}") return False def install_tool(self, tool_name: str, retry_count: int = 0) -> bool: """Install a tool with optimized retry mechanism.""" if retry_count >= RETRY_LIMIT: logger.error( f"Installation of {tool_name} failed after {RETRY_LIMIT} attempts" ) return False self.packages[tool_name]["optional"] = False paths = self._get_tool_paths(tool_name) status = self._check_tool_status(tool_name) # Case 1: New installation with idf_tools if status['has_idf_tools'] and status['has_tools_json']: return self._install_with_idf_tools(tool_name, paths) # Case 2: Tool already installed, version check if (status['has_idf_tools'] and status['has_piopm'] and not status['has_tools_json']): return self._handle_existing_tool(tool_name, paths, retry_count) logger.debug(f"Tool {tool_name} already configured") return True def _install_with_idf_tools(self, tool_name: str, paths: Dict[str, str]) -> bool: """Install tool using idf_tools.py installation method.""" if not self._run_idf_tools_install( paths['tools_json_path'], paths['idf_tools_path'] ): return False # Copy tool files tools_path_default = os.path.join( os.path.expanduser("~"), ".platformio" ) target_package_path = os.path.join( tools_path_default, "tools", tool_name, "package.json" ) if not safe_copy_file(paths['package_path'], target_package_path): return False safe_remove_directory(paths['tool_path']) tl_path = f"file://{os.path.join(tools_path_default, 'tools', tool_name)}" pm.install(tl_path) logger.info(f"Tool {tool_name} successfully installed") return True def _handle_existing_tool( self, tool_name: str, paths: Dict[str, str], retry_count: int ) -> bool: """Handle already installed tools with version checking.""" if self._check_tool_version(tool_name): # Version matches, use tool self.packages[tool_name]["version"] = paths['tool_path'] self.packages[tool_name]["optional"] = False logger.debug(f"Tool {tool_name} found with correct version") return True # Wrong version, reinstall - remove similar paths too logger.info(f"Reinstalling {tool_name} due to version mismatch") tool_base_name = os.path.basename(paths['tool_path']) packages_dir = os.path.dirname(paths['tool_path']) # Remove similar directories with version suffixes FIRST (e.g., xtensa@src, xtensa.12232) safe_remove_directory_pattern(packages_dir, f"{tool_base_name}@*") safe_remove_directory_pattern(packages_dir, f"{tool_base_name}.*") # Then remove the main tool directory (if it still exists) safe_remove_directory(paths['tool_path']) return self.install_tool(tool_name, retry_count + 1) def _configure_arduino_framework(self, frameworks: List[str]) -> None: """Configure Arduino framework""" if "arduino" not in frameworks: return self.packages["framework-arduinoespressif32"]["optional"] = False self.packages["framework-arduinoespressif32-libs"]["optional"] = False def _configure_espidf_framework( self, frameworks: List[str], variables: Dict, board_config: Dict, mcu: str ) -> None: """Configure ESP-IDF framework based on custom sdkconfig settings.""" custom_sdkconfig = variables.get("custom_sdkconfig") board_sdkconfig = variables.get( "board_espidf.custom_sdkconfig", board_config.get("espidf.custom_sdkconfig", "") ) if custom_sdkconfig is not None or len(str(board_sdkconfig)) > 3: frameworks.append("espidf") self.packages["framework-espidf"]["optional"] = False if mcu == "esp32c2": self.packages["framework-arduino-c2-skeleton-lib"]["optional"] = False def _get_mcu_config(self, mcu: str) -> Optional[Dict]: """Get MCU configuration with optimized caching and search.""" if mcu in self._mcu_config_cache: return self._mcu_config_cache[mcu] for _, config in MCU_TOOLCHAIN_CONFIG.items(): if mcu in config["mcus"]: # Dynamically add ULP toolchain result = config.copy() result["ulp_toolchain"] = ["toolchain-esp32ulp"] if mcu != "esp32": result["ulp_toolchain"].append("toolchain-riscv32-esp") self._mcu_config_cache[mcu] = result return result return None def _needs_debug_tools(self, variables: Dict, targets: List[str]) -> bool: """Check if debug tools are needed based on build configuration.""" return bool( variables.get("build_type") or "debug" in targets or variables.get("upload_protocol") ) def _configure_mcu_toolchains( self, mcu: str, variables: Dict, targets: List[str] ) -> None: """Configure MCU-specific toolchains with optimized installation.""" mcu_config = self._get_mcu_config(mcu) if not mcu_config: logger.warning(f"Unknown MCU: {mcu}") return # Install base toolchains for toolchain in mcu_config["toolchains"]: self.install_tool(toolchain) # ULP toolchain if ULP directory exists if mcu_config.get("ulp_toolchain") and os.path.isdir("ulp"): for toolchain in mcu_config["ulp_toolchain"]: self.install_tool(toolchain) # Debug tools when needed if self._needs_debug_tools(variables, targets): for debug_tool in mcu_config["debug_tools"]: self.install_tool(debug_tool) self.install_tool("tool-openocd-esp32") def _configure_installer(self) -> None: """Configure the ESP-IDF tools installer.""" installer_path = os.path.join( self.packages_dir, "tl-install", "tools", "idf_tools.py" ) if os.path.exists(installer_path): self.packages["tl-install"]["optional"] = True def _install_esptool_package(self) -> None: """Install esptool package required for all builds.""" self.install_tool("tool-esptoolpy") def _install_common_idf_packages(self) -> None: """Install common ESP-IDF packages required for all builds.""" for package in COMMON_IDF_PACKAGES: self.install_tool(package) def _configure_check_tools(self, variables: Dict) -> None: """Configure static analysis and check tools based on configuration.""" check_tools = variables.get("check_tool", []) self.install_tool("contrib-piohome") if not check_tools: return for package in CHECK_PACKAGES: if any(tool in package for tool in check_tools): self.install_tool(package) def _ensure_mklittlefs_version(self) -> None: """Ensure correct mklittlefs version is installed.""" piopm_path = os.path.join(self.packages_dir, "tool-mklittlefs", ".piopm") if os.path.exists(piopm_path): try: with open(piopm_path, 'r', encoding='utf-8') as f: package_data = json.load(f) version = package_data.get('version', '') if not version.startswith("3."): os.remove(piopm_path) logger.info(f"Incompatible mklittlefs version {version} removed (required: 3.x)") except (json.JSONDecodeError, KeyError) as e: logger.error(f"Error reading mklittlefs package data: {e}") def _setup_mklittlefs_for_download(self) -> None: """Setup mklittlefs for download functionality with version 4.x.""" mklittlefs_dir = os.path.join(self.packages_dir, "tool-mklittlefs") mklittlefs4_dir = os.path.join( self.packages_dir, "tool-mklittlefs4" ) # Ensure mklittlefs 3.x is installed if not os.path.exists(mklittlefs_dir): self.install_tool("tool-mklittlefs") if os.path.exists(os.path.join(mklittlefs_dir, "tools.json")): self.install_tool("tool-mklittlefs") # Install mklittlefs 4.x if not os.path.exists(mklittlefs4_dir): self.install_tool("tool-mklittlefs4") if os.path.exists(os.path.join(mklittlefs4_dir, "tools.json")): self.install_tool("tool-mklittlefs4") # Copy mklittlefs 4.x over 3.x if os.path.exists(mklittlefs4_dir): package_src = os.path.join(mklittlefs_dir, "package.json") package_dst = os.path.join(mklittlefs4_dir, "package.json") safe_copy_file(package_src, package_dst) shutil.copytree(mklittlefs4_dir, mklittlefs_dir, dirs_exist_ok=True) self.packages.pop("tool-mkfatfs", None) def _handle_littlefs_tool(self, for_download: bool) -> None: """Handle LittleFS tool installation with special download configuration.""" if for_download: self._setup_mklittlefs_for_download() else: self._ensure_mklittlefs_version() self.install_tool("tool-mklittlefs") def _install_filesystem_tool(self, filesystem: str, for_download: bool = False) -> None: """Install filesystem-specific tools based on the filesystem type.""" tool_mapping = { "default": lambda: self._handle_littlefs_tool(for_download), "fatfs": lambda: self.install_tool("tool-mkfatfs"), "spiffs": lambda: self.install_tool("tool-mkspiffs") } handler = tool_mapping.get(filesystem, tool_mapping["default"]) handler() def _handle_dfuutil_tool(self, variables: Dict, for_download: bool = False) -> None: """Install dfuutil tool for Arduino Nano ESP32 board.""" # Currently only Arduino Nano ESP32 uses the dfuutil tool as uploader if variables.get("board") == "arduino_nano_esp32": self.install_tool("tool-dfuutil-arduino") def _configure_filesystem_tools(self, variables: Dict, targets: List[str]) -> None: """Configure filesystem tools based on build targets and filesystem type.""" filesystem = variables.get("board_build.filesystem", "littlefs") if any(target in targets for target in ["buildfs", "uploadfs"]): self._install_filesystem_tool(filesystem, for_download=False) if "downloadfs" in targets: self._install_filesystem_tool(filesystem, for_download=True) def configure_default_packages(self, variables: Dict, targets: List[str]) -> Any: """Main configuration method with optimized package management.""" if not variables.get("board"): return super().configure_default_packages(variables, targets) # Base configuration board_config = self.board_config(variables.get("board")) mcu = variables.get("board_build.mcu", board_config.get("build.mcu", "esp32")) frameworks = list(variables.get("pioframework", [])) # Create copy try: # Configuration steps self._configure_installer() self._install_esptool_package() self._configure_arduino_framework(frameworks) self._configure_espidf_framework(frameworks, variables, board_config, mcu) self._configure_mcu_toolchains(mcu, variables, targets) self._handle_littlefs_tool(for_download=False) # Ensure mklittlefs is installed if "espidf" in frameworks: self._install_common_idf_packages() self._configure_check_tools(variables) self._configure_filesystem_tools(variables, targets) self._handle_dfuutil_tool(variables) logger.info("Package configuration completed successfully") except Exception as e: logger.error(f"Error in package configuration: {type(e).__name__}: {e}") # Don't re-raise to maintain compatibility return super().configure_default_packages(variables, targets) def get_boards(self, id_=None): """Get board configuration with dynamic options.""" result = super().get_boards(id_) if not result: return result if id_: return self._add_dynamic_options(result) else: for key, value in result.items(): result[key] = self._add_dynamic_options(result[key]) return result def _add_dynamic_options(self, board): """Add dynamic board options for upload protocols and debug tools.""" # Upload protocols if not board.get("upload.protocols", []): board.manifest["upload"]["protocols"] = ["esptool", "espota"] if not board.get("upload.protocol", ""): board.manifest["upload"]["protocol"] = "esptool" # Debug tools debug = board.manifest.get("debug", {}) non_debug_protocols = ["esptool", "espota"] supported_debug_tools = [ "cmsis-dap", "esp-prog", "esp-bridge", "iot-bus-jtag", "jlink", "minimodule", "olimex-arm-usb-tiny-h", "olimex-arm-usb-ocd-h", "olimex-arm-usb-ocd", "olimex-jtag-tiny", "tumpa" ] # Special configuration for Kaluga board if board.id == "esp32-s2-kaluga-1": supported_debug_tools.append("ftdi") # ESP-builtin for certain MCUs mcu = board.get("build.mcu", "") if mcu in ESP_BUILTIN_DEBUG_MCUS: supported_debug_tools.append("esp-builtin") upload_protocol = board.manifest.get("upload", {}).get("protocol") upload_protocols = board.manifest.get("upload", {}).get("protocols", []) if debug: upload_protocols.extend(supported_debug_tools) if upload_protocol and upload_protocol not in upload_protocols: upload_protocols.append(upload_protocol) board.manifest["upload"]["protocols"] = upload_protocols if "tools" not in debug: debug["tools"] = {} # Debug tool configuration for link in upload_protocols: if link in non_debug_protocols or link in debug["tools"]: continue openocd_interface = self._get_openocd_interface(link, board) server_args = self._get_debug_server_args(openocd_interface, debug) debug["tools"][link] = { "server": { "package": "tool-openocd-esp32", "executable": "bin/openocd", "arguments": server_args, }, "init_break": "thb app_main", "init_cmds": [ "define pio_reset_halt_target", " monitor reset halt", " flushregs", "end", "define pio_reset_run_target", " monitor reset", "end", "target extended-remote $DEBUG_PORT", "$LOAD_CMDS", "pio_reset_halt_target", "$INIT_BREAK", ], "onboard": link in debug.get("onboard_tools", []), "default": link == debug.get("default_tool"), } # Avoid erasing Arduino Nano bootloader by preloading app binary if board.id == "arduino_nano_esp32": debug["tools"][link]["load_cmds"] = "preload" board.manifest["debug"] = debug return board def _get_openocd_interface(self, link: str, board) -> str: """Determine OpenOCD interface configuration for debug link.""" if link in ("jlink", "cmsis-dap"): return link if link in ("esp-prog", "ftdi"): if board.id == "esp32-s2-kaluga-1": return "ftdi/esp32s2_kaluga_v1" return "ftdi/esp32_devkitj_v1" if link == "esp-bridge": return "esp_usb_bridge" if link == "esp-builtin": return "esp_usb_jtag" return f"ftdi/{link}" def _get_debug_server_args(self, openocd_interface: str, debug: Dict) -> List[str]: """Generate debug server arguments for OpenOCD configuration.""" if 'openocd_target' in debug: config_type = 'target' config_name = debug.get('openocd_target') else: config_type = 'board' config_name = debug.get('openocd_board') return [ "-s", "$PACKAGE_DIR/share/openocd/scripts", "-f", f"interface/{openocd_interface}.cfg", "-f", f"{config_type}/{config_name}.cfg" ] def configure_debug_session(self, debug_config): """Configure debug session with flash image loading.""" build_extra_data = debug_config.build_data.get("extra", {}) flash_images = build_extra_data.get("flash_images", []) if "openocd" in (debug_config.server or {}).get("executable", ""): debug_config.server["arguments"].extend([ "-c", f"adapter speed {debug_config.speed or DEFAULT_DEBUG_SPEED}" ]) ignore_conds = [ debug_config.load_cmds != ["load"], not flash_images, not all([os.path.isfile(item["path"]) for item in flash_images]), ] if any(ignore_conds): return load_cmds = [ f'monitor program_esp "{to_unix_path(item["path"])}" ' f'{item["offset"]} verify' for item in flash_images ] load_cmds.append( f'monitor program_esp ' f'"{to_unix_path(debug_config.build_data["prog_path"][:-4])}.bin" ' f'{build_extra_data.get("application_offset", DEFAULT_APP_OFFSET)} verify' ) debug_config.load_cmds = load_cmds