Arduino v3.3.1

This commit is contained in:
Jason2866
2025-09-17 00:09:47 +02:00
parent d408614d66
commit 8715c1708f
389 changed files with 2101 additions and 100083 deletions
+129 -114
View File
@@ -12,23 +12,38 @@
# See the License for the specific language governing permissions and
# limitations under the License.
# LZMA support check
try:
import lzma as _lzma
except ImportError:
import sys
print("ERROR: Python's lzma module is unavailable or broken in this interpreter.", file=sys.stderr)
print("LZMA (liblzma) support is required for tool/toolchain installation.", file=sys.stderr)
print("Please install Python built with LZMA support.", file=sys.stderr)
raise SystemExit(1)
else:
# Keep namespace clean
del _lzma
import fnmatch
import os
import json
import requests
import socket
import subprocess
import sys
import shutil
import logging
from typing import Optional, Dict, List, Any
from pathlib import Path
from typing import Optional, Dict, List, Any, Union
from platformio.compat import IS_WINDOWS
from platformio.public import PlatformBase, to_unix_path
from platformio.proc import get_pythonexe_path
from platformio.project.config import ProjectConfig
from platformio.package.manager.tool import ToolPackageManager
# Constants
RETRY_LIMIT = 3
SUBPROCESS_TIMEOUT = 300
DEFAULT_DEBUG_SPEED = "5000"
DEFAULT_APP_OFFSET = "0x10000"
tl_install_name = "tool-esp_install"
@@ -68,15 +83,22 @@ CHECK_PACKAGES = [
]
# System-specific configuration
IS_WINDOWS = sys.platform.startswith("win")
# Set Platformio env var to use windows_amd64 for all windows architectures
# only windows_amd64 native espressif toolchains are available
# needs platformio/pioarduino core >= 6.1.17
if IS_WINDOWS:
os.environ["PLATFORMIO_SYSTEM_TYPE"] = "windows_amd64"
# Clear IDF_TOOLS_PATH, if set tools may be installed in the wrong place
os.environ["IDF_TOOLS_PATH"] = ""
# exit without git
if not shutil.which("git"):
print("Git not found in PATH, please install Git.", file=sys.stderr)
print("Git is needed for Platform espressif32 to work.", file=sys.stderr)
raise SystemExit(1)
# Set IDF_TOOLS_PATH to Pio core_dir
PROJECT_CORE_DIR = ProjectConfig.get_instance().get("platformio", "core_dir")
IDF_TOOLS_PATH = PROJECT_CORE_DIR
os.environ["IDF_TOOLS_PATH"] = IDF_TOOLS_PATH
os.environ['IDF_PATH'] = ""
# Global variables
python_exe = get_pythonexe_path()
@@ -100,51 +122,61 @@ def safe_file_operation(operation_func):
@safe_file_operation
def safe_remove_file(path: str) -> bool:
"""Safely remove a file with error handling."""
if os.path.exists(path) and os.path.isfile(path):
os.remove(path)
def safe_remove_file(path: Union[str, Path]) -> bool:
"""Safely remove a file with error handling using pathlib."""
path = Path(path)
if path.is_file() or path.is_symlink():
path.unlink()
logger.debug(f"File removed: {path}")
return True
@safe_file_operation
def safe_remove_directory(path: str) -> bool:
"""Safely remove directories with error handling."""
if os.path.exists(path) and os.path.isdir(path):
def safe_remove_directory(path: Union[str, Path]) -> bool:
"""Safely remove directories with error handling using pathlib."""
path = Path(path)
if not path.exists():
return True
if path.is_symlink():
path.unlink()
elif path.is_dir():
shutil.rmtree(path)
logger.debug(f"Directory removed: {path}")
return True
@safe_file_operation
def safe_remove_directory_pattern(base_path: str, pattern: str) -> bool:
"""Safely remove directories matching a pattern with error handling."""
if not os.path.exists(base_path):
def safe_remove_directory_pattern(base_path: Union[str, Path], pattern: str) -> bool:
"""Safely remove directories matching a pattern with error handling using pathlib."""
base_path = Path(base_path)
if not base_path.exists():
return True
# Find all directories matching the pattern in the base directory
for item in os.listdir(base_path):
item_path = os.path.join(base_path, item)
if os.path.isdir(item_path) and fnmatch.fnmatch(item, pattern):
shutil.rmtree(item_path)
logger.debug(f"Directory removed: {item_path}")
for item in base_path.iterdir():
if item.is_dir() and fnmatch.fnmatch(item.name, pattern):
if item.is_symlink():
item.unlink()
else:
shutil.rmtree(item)
logger.debug(f"Directory removed: {item}")
return True
@safe_file_operation
def safe_copy_file(src: str, dst: str) -> bool:
"""Safely copy files with error handling."""
os.makedirs(os.path.dirname(dst), exist_ok=True)
shutil.copyfile(src, dst)
def safe_copy_file(src: Union[str, Path], dst: Union[str, Path]) -> bool:
"""Safely copy files with error handling using pathlib."""
src, dst = Path(src), Path(dst)
dst.parent.mkdir(parents=True, exist_ok=True)
shutil.copy2(src, dst)
logger.debug(f"File copied: {src} -> {dst}")
return True
@safe_file_operation
def safe_copy_directory(src: str, dst: str) -> bool:
"""Safely copy directories with error handling."""
os.makedirs(os.path.dirname(dst), exist_ok=True)
shutil.copytree(src, dst, dirs_exist_ok=True)
def safe_copy_directory(src: Union[str, Path], dst: Union[str, Path]) -> bool:
"""Safely copy directories with error handling using pathlib."""
src, dst = Path(src), Path(dst)
dst.parent.mkdir(parents=True, exist_ok=True)
shutil.copytree(src, dst, dirs_exist_ok=True, copy_function=shutil.copy2, symlinks=True)
logger.debug(f"Directory copied: {src} -> {dst}")
return True
@@ -160,11 +192,11 @@ class Espressif32Platform(PlatformBase):
self._mcu_config_cache = {}
@property
def packages_dir(self) -> str:
def packages_dir(self) -> Path:
"""Get cached packages directory path."""
if self._packages_dir is None:
config = ProjectConfig.get_instance()
self._packages_dir = config.get("platformio", "packages_dir")
self._packages_dir = Path(config.get("platformio", "packages_dir"))
return self._packages_dir
def _check_tl_install_version(self) -> bool:
@@ -183,10 +215,10 @@ class Espressif32Platform(PlatformBase):
return True
# Check if tool is already installed
tl_install_path = os.path.join(self.packages_dir, tl_install_name)
package_json_path = os.path.join(tl_install_path, "package.json")
tl_install_path = self.packages_dir / tl_install_name
package_json_path = tl_install_path / "package.json"
if not os.path.exists(package_json_path):
if not package_json_path.exists():
logger.info(f"{tl_install_name} not installed, installing version {required_version}")
return self._install_tl_install(required_version)
@@ -270,16 +302,16 @@ class Espressif32Platform(PlatformBase):
Returns:
bool: True if installation successful, False otherwise
"""
tl_install_path = os.path.join(self.packages_dir, tl_install_name)
old_tl_install_path = os.path.join(self.packages_dir, "tl-install")
tl_install_path = Path(self.packages_dir) / tl_install_name
old_tl_install_path = Path(self.packages_dir) / "tl-install"
try:
old_tl_install_exists = os.path.exists(old_tl_install_path)
old_tl_install_exists = old_tl_install_path.exists()
if old_tl_install_exists:
# remove outdated tl-install
safe_remove_directory(old_tl_install_path)
if os.path.exists(tl_install_path):
if tl_install_path.exists():
logger.info(f"Removing old {tl_install_name} installation")
safe_remove_directory(tl_install_path)
@@ -288,10 +320,10 @@ class Espressif32Platform(PlatformBase):
self.packages[tl_install_name]["version"] = version
pm.install(version)
# Ensure backward compatibility by removing pio install status indicator
tl_piopm_path = os.path.join(tl_install_path, ".piopm")
tl_piopm_path = tl_install_path / ".piopm"
safe_remove_file(tl_piopm_path)
if os.path.exists(os.path.join(tl_install_path, "package.json")):
if (tl_install_path / "package.json").exists():
logger.info(f"{tl_install_name} successfully installed and verified")
self.packages[tl_install_name]["optional"] = True
@@ -319,40 +351,37 @@ class Espressif32Platform(PlatformBase):
Args:
tool_name: Name of the tool to clean up
"""
if not os.path.exists(self.packages_dir) or not os.path.isdir(self.packages_dir):
packages_path = Path(self.packages_dir)
if not packages_path.exists() or not packages_path.is_dir():
return
try:
# Remove directories with '@' in their name (e.g., tool-name@version, tool-name@src)
safe_remove_directory_pattern(self.packages_dir, f"{tool_name}@*")
safe_remove_directory_pattern(packages_path, f"{tool_name}@*")
# Remove directories with version suffixes (e.g., tool-name.12345)
safe_remove_directory_pattern(self.packages_dir, f"{tool_name}.*")
safe_remove_directory_pattern(packages_path, f"{tool_name}.*")
# Also check for any directory that starts with tool_name and contains '@'
for item in os.listdir(self.packages_dir):
if item.startswith(tool_name) and '@' in item:
item_path = os.path.join(self.packages_dir, item)
if os.path.isdir(item_path):
safe_remove_directory(item_path)
logger.debug(f"Removed versioned directory: {item_path}")
for item in packages_path.iterdir():
if item.name.startswith(tool_name) and '@' in item.name and item.is_dir():
safe_remove_directory(item)
logger.debug(f"Removed versioned directory: {item}")
except OSError as e:
logger.error(f"Error cleaning up versioned directories for {tool_name}: {e}")
except OSError:
logger.exception(f"Error cleaning up versioned directories for {tool_name}")
def _get_tool_paths(self, tool_name: str) -> Dict[str, str]:
"""Get centralized path calculation for tools with caching."""
if tool_name not in self._tools_cache:
tool_path = os.path.join(self.packages_dir, tool_name)
tool_path = Path(self.packages_dir) / tool_name
self._tools_cache[tool_name] = {
'tool_path': tool_path,
'package_path': os.path.join(tool_path, "package.json"),
'tools_json_path': os.path.join(tool_path, "tools.json"),
'piopm_path': os.path.join(tool_path, ".piopm"),
'idf_tools_path': os.path.join(
self.packages_dir, tl_install_name, "tools", "idf_tools.py"
)
'tool_path': str(tool_path),
'package_path': str(tool_path / "package.json"),
'tools_json_path': str(tool_path / "tools.json"),
'piopm_path': str(tool_path / ".piopm"),
'idf_tools_path': str(Path(self.packages_dir) / tl_install_name / "tools" / "idf_tools.py")
}
return self._tools_cache[tool_name]
@@ -360,14 +389,18 @@ class Espressif32Platform(PlatformBase):
"""Check the installation status of a tool."""
paths = self._get_tool_paths(tool_name)
return {
'has_idf_tools': os.path.exists(paths['idf_tools_path']),
'has_tools_json': os.path.exists(paths['tools_json_path']),
'has_piopm': os.path.exists(paths['piopm_path']),
'tool_exists': os.path.exists(paths['tool_path'])
'has_idf_tools': Path(paths['idf_tools_path']).exists(),
'has_tools_json': Path(paths['tools_json_path']).exists(),
'has_piopm': Path(paths['piopm_path']).exists(),
'tool_exists': Path(paths['tool_path']).exists()
}
def _run_idf_tools_install(self, tools_json_path: str, idf_tools_path: str) -> bool:
"""Execute idf_tools.py install command with timeout and error handling."""
"""
Execute idf_tools.py install command.
Note: No timeout is set to allow installations to complete on slow networks.
The tool-esp_install handles the retry logic.
"""
cmd = [
python_exe,
idf_tools_path,
@@ -379,11 +412,11 @@ class Espressif32Platform(PlatformBase):
]
try:
logger.info(f"Installing tools via idf_tools.py (this may take several minutes)...")
result = subprocess.run(
cmd,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
timeout=SUBPROCESS_TIMEOUT,
check=False
)
@@ -394,9 +427,6 @@ class Espressif32Platform(PlatformBase):
logger.debug("idf_tools.py executed successfully")
return True
except subprocess.TimeoutExpired:
logger.error(f"Timeout in idf_tools.py after {SUBPROCESS_TIMEOUT}s")
return False
except (subprocess.SubprocessError, OSError) as e:
logger.error(f"Error in idf_tools.py: {e}")
return False
@@ -436,14 +466,8 @@ class Espressif32Platform(PlatformBase):
logger.error(f"Error reading package data for {tool_name}: {e}")
return False
def install_tool(self, tool_name: str, retry_count: int = 0) -> bool:
"""Install a tool with optimized retry mechanism."""
if retry_count >= RETRY_LIMIT:
logger.error(
f"Installation of {tool_name} failed after {RETRY_LIMIT} attempts"
)
return False
def install_tool(self, tool_name: str) -> bool:
"""Install a tool."""
self.packages[tool_name]["optional"] = False
paths = self._get_tool_paths(tool_name)
status = self._check_tool_status(tool_name)
@@ -455,7 +479,7 @@ class Espressif32Platform(PlatformBase):
# Case 2: Tool already installed, version check
if (status['has_idf_tools'] and status['has_piopm'] and
not status['has_tools_json']):
return self._handle_existing_tool(tool_name, paths, retry_count)
return self._handle_existing_tool(tool_name, paths)
logger.debug(f"Tool {tool_name} already configured")
return True
@@ -468,27 +492,20 @@ class Espressif32Platform(PlatformBase):
return False
# Copy tool files
tools_path_default = os.path.join(
os.path.expanduser("~"), ".platformio"
)
target_package_path = os.path.join(
tools_path_default, "tools", tool_name, "package.json"
)
target_package_path = Path(IDF_TOOLS_PATH) / "tools" / tool_name / "package.json"
if not safe_copy_file(paths['package_path'], target_package_path):
return False
safe_remove_directory(paths['tool_path'])
tl_path = f"file://{os.path.join(tools_path_default, 'tools', tool_name)}"
tl_path = f"file://{Path(IDF_TOOLS_PATH) / 'tools' / tool_name}"
pm.install(tl_path)
logger.info(f"Tool {tool_name} successfully installed")
return True
def _handle_existing_tool(
self, tool_name: str, paths: Dict[str, str], retry_count: int
) -> bool:
def _handle_existing_tool(self, tool_name: str, paths: Dict[str, str]) -> bool:
"""Handle already installed tools with version checking."""
if self._check_tool_version(tool_name):
# Version matches, use tool
@@ -503,7 +520,7 @@ class Espressif32Platform(PlatformBase):
# Remove the main tool directory (if it still exists after cleanup)
safe_remove_directory(paths['tool_path'])
return self.install_tool(tool_name, retry_count + 1)
return self.install_tool(tool_name)
def _configure_arduino_framework(self, frameworks: List[str]) -> None:
"""Configure Arduino framework dependencies."""
@@ -568,7 +585,7 @@ class Espressif32Platform(PlatformBase):
self.install_tool(toolchain)
# ULP toolchain if ULP directory exists
if mcu_config.get("ulp_toolchain") and os.path.isdir("ulp"):
if mcu_config.get("ulp_toolchain") and Path("ulp").is_dir():
for toolchain in mcu_config["ulp_toolchain"]:
self.install_tool(toolchain)
@@ -587,16 +604,14 @@ class Espressif32Platform(PlatformBase):
return
# Remove pio install marker to avoid issues when switching versions
old_tl_piopm_path = os.path.join(self.packages_dir, "tl-install", ".piopm")
if os.path.exists(old_tl_piopm_path):
old_tl_piopm_path = Path(self.packages_dir) / "tl-install" / ".piopm"
if old_tl_piopm_path.exists():
safe_remove_file(old_tl_piopm_path)
# Check if idf_tools.py is available
installer_path = os.path.join(
self.packages_dir, tl_install_name, "tools", "idf_tools.py"
)
installer_path = Path(self.packages_dir) / tl_install_name / "tools" / "idf_tools.py"
if os.path.exists(installer_path):
if installer_path.exists():
logger.debug(f"{tl_install_name} is available and ready")
self.packages[tl_install_name]["optional"] = True
else:
@@ -624,42 +639,42 @@ class Espressif32Platform(PlatformBase):
def _ensure_mklittlefs_version(self) -> None:
"""Ensure correct mklittlefs version is installed."""
piopm_path = os.path.join(self.packages_dir, "tool-mklittlefs", ".piopm")
piopm_path = Path(self.packages_dir) / "tool-mklittlefs" / ".piopm"
if os.path.exists(piopm_path):
if piopm_path.exists():
try:
with open(piopm_path, 'r', encoding='utf-8') as f:
package_data = json.load(f)
version = package_data.get('version', '')
if not version.startswith("3."):
os.remove(piopm_path)
safe_remove_file(piopm_path)
logger.info(f"Incompatible mklittlefs version {version} removed (required: 3.x)")
except (json.JSONDecodeError, KeyError) as e:
logger.error(f"Error reading mklittlefs package {e}")
except (json.JSONDecodeError, KeyError):
logger.exception("Error reading mklittlefs package metadata")
def _setup_mklittlefs_for_download(self) -> None:
"""Setup mklittlefs for download functionality with version 4.x."""
mklittlefs_dir = os.path.join(self.packages_dir, "tool-mklittlefs")
mklittlefs4_dir = os.path.join(
self.packages_dir, "tool-mklittlefs4"
)
mklittlefs_dir = Path(self.packages_dir) / "tool-mklittlefs"
mklittlefs4_dir = Path(self.packages_dir) / "tool-mklittlefs4"
# Ensure mklittlefs 3.x is installed
if not os.path.exists(mklittlefs_dir):
if not mklittlefs_dir.exists():
self.install_tool("tool-mklittlefs")
if os.path.exists(os.path.join(mklittlefs_dir, "tools.json")):
if (mklittlefs_dir / "tools.json").exists():
self.install_tool("tool-mklittlefs")
# Install mklittlefs 4.x
if not os.path.exists(mklittlefs4_dir):
if not mklittlefs4_dir.exists():
self.install_tool("tool-mklittlefs4")
if os.path.exists(os.path.join(mklittlefs4_dir, "tools.json")):
if (mklittlefs4_dir / "tools.json").exists():
self.install_tool("tool-mklittlefs4")
# Copy mklittlefs 4.x over 3.x
if os.path.exists(mklittlefs4_dir):
package_src = os.path.join(mklittlefs_dir, "package.json")
package_dst = os.path.join(mklittlefs4_dir, "package.json")
if mklittlefs4_dir.exists():
# Copy 3.x package.json into 4.x before mirroring 4.x -> 3.x,
# so 3.x dir ends up with 4.x binaries and 3.x metadata.
package_src = mklittlefs_dir / "package.json"
package_dst = mklittlefs4_dir / "package.json"
safe_copy_file(package_src, package_dst)
shutil.copytree(mklittlefs4_dir, mklittlefs_dir, dirs_exist_ok=True)
self.packages.pop("tool-mkfatfs", None)
@@ -870,7 +885,7 @@ class Espressif32Platform(PlatformBase):
ignore_conds = [
debug_config.load_cmds != ["load"],
not flash_images,
not all([os.path.isfile(item["path"]) for item in flash_images]),
not all([Path(item["path"]).is_file() for item in flash_images]),
]
if any(ignore_conds):