Extensions: resolve unhandled exceptions when wheel extraction fails

Setting up wheels wasn't handling errors if extraction failed.

Addresses the error in #132924.
This commit is contained in:
Campbell Barton
2025-02-07 20:25:08 +11:00
parent fb3fe458bf
commit 198d07e240

View File

@@ -22,6 +22,7 @@ import zipfile
from collections.abc import (
Callable,
Iterator,
)
WheelSource = tuple[
@@ -180,6 +181,9 @@ def _zipfile_extractall_safe(
zip_fh: zipfile.ZipFile,
path: str,
path_restrict: str,
*,
error_fn: Callable[[Exception], None],
remove_error_fn: Callable[[str, Exception], None],
) -> None:
"""
A version of ``ZipFile.extractall`` that wont write to paths outside ``path_restrict``.
@@ -204,20 +208,69 @@ def _zipfile_extractall_safe(
path_restrict_with_slash = path_restrict + sep
assert len(path) >= len(path_restrict_with_slash)
if not path.startswith(path_restrict_with_slash):
raise Exception("Expected the restricted directory to start with ")
# This is an internal error if it ever happens.
raise Exception("Expected the restricted directory to start with \"{:s}\"".format(path_restrict_with_slash))
has_error = False
member_index = 0
# Use an iterator to avoid duplicating the checks (for the cleanup pass).
def zip_iter_filtered(*, verbose: bool) -> Iterator[tuple[zipfile.ZipInfo, str, str]]:
for member in zip_fh.infolist():
filename_orig = member.filename
filename_next = path_prefix + filename_orig
# This isn't likely to happen so accept a noisy print here.
# If this ends up happening more often, it could be suppressed.
# (although this hints at bigger problems because we might be excluding necessary files).
if os.path.normpath(filename_next).startswith(".." + sep):
if verbose:
print("Skipping path:", filename_next, "that escapes:", path_restrict)
continue
yield member, filename_orig, filename_next
for member, filename_orig, filename_next in zip_iter_filtered(verbose=True):
# Increment before extracting, so a potential cleanup will a file that failed to extract.
member_index += 1
member.filename = filename_next
# Extraction can fail for many reasons, see: #132924.
try:
zip_fh.extract(member, path_restrict)
except Exception as ex:
error_fn(ex)
filepath_native = path_restrict + sep + filename_next.replace("/", sep)
print("Failed to extract path:", filepath_native, "error", str(ex))
remove_error_fn(filepath_native, ex)
has_error = True
for member in zip_fh.infolist():
filename_orig = member.filename
member.filename = path_prefix + filename_orig
# This isn't likely to happen so accept a noisy print here.
# If this ends up happening more often, it could be suppressed.
# (although this hints at bigger problems because we might be excluding necessary files).
if os.path.normpath(member.filename).startswith(".." + sep):
print("Skipping path:", member.filename, "that escapes:", path_restrict)
continue
zip_fh.extract(member, path_restrict)
member.filename = filename_orig
if has_error:
break
# If the zip-file failed to extract, remove all files that were extracted.
# This is done so failure to extract a file never results in a partially-working
# state which can cause confusing situations for users.
if has_error:
# NOTE: this currently leaves empty directories which is not ideal.
# It's possible to calculate directories created by this extraction but more involved.
member_cleanup_len = member_index + 1
member_index = 0
for member, filename_orig, filename_next in zip_iter_filtered(verbose=False):
member_index += 1
if member_index >= member_cleanup_len:
break
filepath_native = path_restrict + sep + filename_next.replace("/", sep)
try:
os.unlink(filepath_native)
except Exception as ex:
remove_error_fn(filepath_native, ex)
WHEEL_VERSION_RE = re.compile(r"(\d+)?(?:\.(\d+))?(?:\.(\d+))")
@@ -435,4 +488,6 @@ def apply_action(
zip_fh,
local_dir_site_packages,
local_dir,
error_fn=error_fn,
remove_error_fn=remove_error_fn,
)