Cleanup: remove deprecated typing built-ins for extension logic

Deprecated since Python 3.9 but still used in many docs/examples.
This commit is contained in:
Campbell Barton
2024-10-17 19:11:40 +11:00
parent 51773a41cc
commit a66601ee95
12 changed files with 278 additions and 306 deletions

View File

@@ -97,7 +97,7 @@ def manifest_compatible_with_wheel_data_or_error(
repo_module, # `str`
pkg_id, # `str`
repo_directory, # `str`
wheel_list, # `List[Tuple[str, List[str]]]`
wheel_list, # `list[tuple[str, list[str]]]`
): # `Optional[str]`
from bl_pkg.bl_extension_utils import (
pkg_manifest_dict_is_valid_or_error,

View File

@@ -18,10 +18,7 @@ import sys
from typing import (
Any,
Dict,
List,
Optional,
Tuple,
Union,
)
@@ -106,10 +103,10 @@ class subcmd_utils:
@staticmethod
def _expand_package_ids(
packages: List[str],
packages: list[str],
*,
use_local: bool,
) -> Union[List[Tuple[int, str]], str]:
) -> Union[list[tuple[int, str]], str]:
# Takes a terse lists of package names and expands to repo index and name list,
# returning an error string if any can't be resolved.
from . import repo_cache_store_ensure
@@ -173,11 +170,11 @@ class subcmd_utils:
return repos_and_packages
@staticmethod
def expand_package_ids_from_remote(packages: List[str]) -> Union[List[Tuple[int, str]], str]:
def expand_package_ids_from_remote(packages: list[str]) -> Union[list[tuple[int, str]], str]:
return subcmd_utils._expand_package_ids(packages, use_local=False)
@staticmethod
def expand_package_ids_from_local(packages: List[str]) -> Union[List[Tuple[int, str]], str]:
def expand_package_ids_from_local(packages: list[str]) -> Union[list[tuple[int, str]], str]:
return subcmd_utils._expand_package_ids(packages, use_local=True)
@@ -200,7 +197,7 @@ class subcmd_query:
item_local: Optional[PkgManifest_Normalized],
item_remote: Optional[PkgManifest_Normalized],
has_remote: bool,
item_warnings: List[str],
item_warnings: list[str],
) -> None:
# Both can't be None.
assert item_remote is not None or item_local is not None
@@ -267,7 +264,7 @@ class subcmd_query:
import addon_utils # type: ignore
# pylint: disable-next=protected-access
extensions_warnings: Dict[str, List[str]] = addon_utils._extensions_warnings_get()
extensions_warnings: dict[str, list[str]] = addon_utils._extensions_warnings_get()
assert isinstance(extensions_warnings, dict)
# Blocked and installed.
@@ -339,7 +336,7 @@ class subcmd_pkg:
def install(
*,
sync: bool,
packages: List[str],
packages: list[str],
enable_on_install: bool,
no_prefs: bool,
) -> bool:
@@ -375,7 +372,7 @@ class subcmd_pkg:
@staticmethod
def remove(
*,
packages: List[str],
packages: list[str],
no_prefs: bool,
) -> bool:
# Expand all package ID's.
@@ -920,7 +917,7 @@ def cli_extension_args_extra(subparsers: "argparse._SubParsersAction[argparse.Ar
cli_extension_args_repo_remove(subparsers)
def cli_extension_handler(args: List[str]) -> int:
def cli_extension_handler(args: list[str]) -> int:
from .cli import blender_ext
# Override the default valid tags with a file which Blender includes.

View File

@@ -312,7 +312,7 @@ def _extensions_repo_temp_files_make_stale(
def _extensions_repo_uninstall_stale_package_fallback(
repo_directory, # `str`
pkg_id_sequence, # `List[str]`
pkg_id_sequence, # `list[str]`
): # `-> None`
# If uninstall failed, make the package stale (by renaming it & queue to remove later).
import addon_utils
@@ -332,7 +332,7 @@ def _extensions_repo_uninstall_stale_package_fallback(
def _extensions_repo_install_stale_package_clear(
repo_directory, # `str`
pkg_id_sequence, # `List[str]`
pkg_id_sequence, # `list[str]`
): # `-> None`
# If install succeeds, ensure the package is not stale.
#
@@ -545,7 +545,7 @@ def pkg_manifest_params_compatible_or_error_for_this_system(
*,
blender_version_min, # `str`
blender_version_max, # `str`
platforms, # `List[str]`
platforms, # `list[str]`
): # `Optional[str]`
# Return true if the parameters are compatible with this system.
from .bl_extension_utils import (
@@ -609,8 +609,8 @@ def repo_cache_store_refresh_from_prefs(repo_cache_store, include_disabled=False
def _preferences_pkg_id_sequence_filter_enabled(
repo_item, # `RepoItem`
pkg_id_sequence, # `List[str]`
): # `-> List[str]`
pkg_id_sequence, # `list[str]`
): # `-> list[str]`
import addon_utils
result = []
@@ -632,10 +632,10 @@ def _preferences_pkg_id_sequence_filter_enabled(
def _preferences_ensure_disabled(
*,
repo_item, # `RepoItem`
pkg_id_sequence, # `List[str]`
pkg_id_sequence, # `list[str]`
default_set, # `bool`
error_fn, # `Callable[[Exception], None]`
): # `-> Dict[str, Tuple[boo, bool]]`
): # `-> dict[str, tuple[boo, bool]]`
import sys
import addon_utils
@@ -981,8 +981,8 @@ def pkg_wheel_filter(
repo_module, # `str`
pkg_id, # `str`
repo_directory, # `str`
wheels_rel, # `List[str]`
): # `-> Tuple[str, List[str]]`
wheels_rel, # `list[str]`
): # `-> tuple[str, list[str]]`
# Filter only the wheels for this platform.
wheels_rel = _extensions_wheel_filter_for_platform(wheels_rel)
if not wheels_rel:

View File

@@ -220,7 +220,7 @@ def addon_draw_item_expanded(
item_description, # `str`
item_maintainer, # `str`
item_version, # `str`
item_warnings, # `List[str]`
item_warnings, # `list[str]`
item_doc_url, # `str`
item_tracker_url, # `str`
):
@@ -303,7 +303,7 @@ def addons_panel_draw_missing_with_extension_impl(
*,
context, # `bpy.types.Context`
layout, # `bpy.types.UILayout`
missing_modules # `Set[str]`
missing_modules # `set[str]`
):
layout_header, layout_panel = layout.panel("builtin_addons", default_closed=True)
layout_header.label(text="Missing Built-in Add-ons", icon='ERROR')
@@ -398,7 +398,7 @@ def addons_panel_draw_missing_with_extension_impl(
def addons_panel_draw_missing_impl(
*,
layout, # `bpy.types.UILayout`
missing_modules, # `Set[str]`
missing_modules, # `set[str]`
):
layout_header, layout_panel = layout.panel("missing_script_files", default_closed=True)
layout_header.label(text="Missing Add-ons", icon='ERROR')
@@ -427,15 +427,15 @@ def addons_panel_draw_items(
context, # `bpy.types.Context`
*,
addon_modules, # `Iterable[ModuleType]`
used_addon_module_name_map, # `Dict[str, bpy.types.Addon]`
used_addon_module_name_map, # `dict[str, bpy.types.Addon]`
search_casefold, # `str`
addon_tags_exclude, # `Set[str]`
addon_tags_exclude, # `set[str]`
enabled_only, # `bool`
addon_extension_manifest_map, # `Dict[str, PkgManifest_Normalized]`
addon_extension_block_map, # `Dict[str, PkgBlock_Normalized]`
addon_extension_manifest_map, # `dict[str, PkgManifest_Normalized]`
addon_extension_block_map, # `dict[str, PkgBlock_Normalized]`
show_development, # `bool`
): # `-> Set[str]`
): # `-> set[str]`
# NOTE: this duplicates logic from `USERPREF_PT_addons` eventually this logic should be used instead.
# Don't de-duplicate the logic as this is a temporary state - as long as extensions remains experimental.
import addon_utils
@@ -629,7 +629,7 @@ def addons_panel_draw_impl(
panel,
context, # `bpy.types.Context`
search_casefold, # `str`
addon_tags_exclude, # `Set[str]`
addon_tags_exclude, # `set[str]`
enabled_only, # `bool`
*,
show_development, # `bool`
@@ -917,8 +917,8 @@ class ExtensionUI_FilterParams:
def extension_ui_visible(
self,
repo_index, # `int`
pkg_manifest_local, # `Dict[str, PkgManifest_Normalized]`
pkg_manifest_remote, # `Dict[str, PkgManifest_Normalized]`
pkg_manifest_local, # `dict[str, PkgManifest_Normalized]`
pkg_manifest_remote, # `dict[str, PkgManifest_Normalized]`
):
from .bl_extension_ops import (
pkg_info_check_exclude_filter,
@@ -1258,7 +1258,7 @@ def extension_draw_item(
repo_index, # `int`
repo_item, # `RepoItem`
operation_in_progress, # `bool`
extensions_warnings, # `Dict[str, List[str]]`
extensions_warnings, # `dict[str, list[str]]`
):
item = item_local or item_remote
is_installed = item_local is not None
@@ -2114,8 +2114,8 @@ class USERPREF_MT_extensions_active_repo_extra(Menu):
# Shared (Extension / Legacy Add-ons) Tags Logic
def tags_exclude_match(
item_tags, # `Tuple[str]`
exclude_tags, # `Set[str]`
item_tags, # `tuple[str]`
exclude_tags, # `set[str]`
):
if not item_tags:
# When an item has no tags then including it makes no sense

View File

@@ -68,13 +68,9 @@ from typing import (
Callable,
Generator,
IO,
List,
Optional,
Dict,
NamedTuple,
Sequence,
Set,
Tuple,
Union,
)
@@ -97,7 +93,7 @@ PKG_TEMP_PREFIX_AND_SUFFIX = (".", ".~temp~")
REPO_LOCAL_JSON = os.path.join(REPO_LOCAL_PRIVATE_DIR, PKG_REPO_LIST_FILENAME)
# An item we communicate back to Blender.
InfoItem = Tuple[str, Any]
InfoItem = tuple[str, Any]
InfoItemSeq = Sequence[InfoItem]
COMPLETE_ITEM = ('DONE', "")
@@ -369,7 +365,7 @@ def license_info_to_text(license_list: Sequence[str]) -> str:
# Public Stand-Alone Utilities
#
def pkg_theme_file_list(directory: str, pkg_idname: str) -> Tuple[str, List[str]]:
def pkg_theme_file_list(directory: str, pkg_idname: str) -> tuple[str, list[str]]:
theme_dir = os.path.join(directory, pkg_idname)
theme_files = [
filename for entry in os.scandir(theme_dir)
@@ -401,7 +397,7 @@ def platform_from_this_system() -> str:
return result
def _url_append_query(url: str, query: Dict[str, str]) -> str:
def _url_append_query(url: str, query: dict[str, str]) -> str:
import urllib
import urllib.parse
@@ -434,7 +430,7 @@ def _url_append_query(url: str, query: Dict[str, str]) -> str:
return new_url
def url_append_query_for_blender(url: str, blender_version: Tuple[int, int, int]) -> str:
def url_append_query_for_blender(url: str, blender_version: tuple[int, int, int]) -> str:
# `blender_version` is typically `bpy.app.version`.
# While this won't cause errors, it's redundant to add this information to file URL's.
@@ -448,7 +444,7 @@ def url_append_query_for_blender(url: str, blender_version: Tuple[int, int, int]
return _url_append_query(url, query)
def url_parse_for_blender(url: str) -> Tuple[str, Dict[str, str]]:
def url_parse_for_blender(url: str) -> tuple[str, dict[str, str]]:
# Split the URL into components:
# - The stripped: `scheme + netloc + path`
# - Known query values used by Blender.
@@ -622,7 +618,7 @@ def pkg_install_files(
*,
directory: str,
files: Sequence[str],
blender_version: Tuple[int, int, int],
blender_version: tuple[int, int, int],
use_idle: bool,
python_args: Sequence[str],
) -> Generator[InfoItemSeq, None, None]:
@@ -644,7 +640,7 @@ def pkg_install(
directory: str,
remote_url: str,
pkg_id_sequence: Sequence[str],
blender_version: Tuple[int, int, int],
blender_version: tuple[int, int, int],
online_user_agent: str,
access_token: str,
timeout: float,
@@ -715,7 +711,7 @@ def dummy_progress(
# Public (non-command-line-wrapping) functions
#
def json_from_filepath(filepath_json: str) -> Optional[Dict[str, Any]]:
def json_from_filepath(filepath_json: str) -> Optional[dict[str, Any]]:
if os.path.exists(filepath_json):
with open(filepath_json, "r", encoding="utf-8") as fh:
result = json.loads(fh.read())
@@ -724,7 +720,7 @@ def json_from_filepath(filepath_json: str) -> Optional[Dict[str, Any]]:
return None
def toml_from_filepath(filepath_json: str) -> Optional[Dict[str, Any]]:
def toml_from_filepath(filepath_json: str) -> Optional[dict[str, Any]]:
if os.path.exists(filepath_json):
with open(filepath_json, "r", encoding="utf-8") as fh:
return tomllib.loads(fh.read())
@@ -752,7 +748,7 @@ def pkg_make_obsolete_for_testing(local_dir: str, pkg_id: str) -> None:
def pkg_manifest_dict_is_valid_or_error(
data: Dict[str, Any],
data: dict[str, Any],
from_repo: bool,
strict: bool,
) -> Optional[str]:
@@ -768,7 +764,7 @@ def pkg_manifest_dict_is_valid_or_error(
def pkg_manifest_dict_from_archive_or_error(
filepath: str,
) -> Union[Dict[str, Any], str]:
) -> Union[dict[str, Any], str]:
from .cli.blender_ext import pkg_manifest_from_archive_and_validate
result = pkg_manifest_from_archive_and_validate(filepath, strict=False)
if isinstance(result, str):
@@ -792,7 +788,7 @@ def pkg_manifest_archive_url_abs_from_remote_url(remote_url: str, archive_url: s
return archive_url
def pkg_manifest_dict_apply_build_generated_table(manifest_dict: Dict[str, Any]) -> None:
def pkg_manifest_dict_apply_build_generated_table(manifest_dict: dict[str, Any]) -> None:
from .cli.blender_ext import pkg_manifest_dict_apply_build_generated_table as fn
fn(manifest_dict)
@@ -855,7 +851,7 @@ class CommandBatchItem:
self.has_fatal_error = False
self.has_error = False
self.has_warning = False
self.msg_log: List[Tuple[str, Any]] = []
self.msg_log: list[tuple[str, Any]] = []
self.msg_log_len_last = 0
self.msg_type = ""
self.msg_info = ""
@@ -866,7 +862,7 @@ class CommandBatchItem:
class CommandBatch_ExecNonBlockingResult(NamedTuple):
# A message list for each command, aligned to `CommandBatchItem._batch`.
messages: Tuple[List[Tuple[str, str]], ...]
messages: tuple[list[tuple[str, str]], ...]
# When true, the status of all commands is `CommandBatchItem.STATUS_COMPLETE`.
all_complete: bool
# When true, `calc_status_data` will return a different result.
@@ -964,7 +960,7 @@ class CommandBatch:
"""
Return the result of running multiple commands.
"""
command_output: Tuple[List[Tuple[str, str]], ...] = tuple([] for _ in range(len(self._batch)))
command_output: tuple[list[tuple[str, str]], ...] = tuple([] for _ in range(len(self._batch)))
if request_exit:
self._request_exit = True
@@ -1050,7 +1046,7 @@ class CommandBatch:
status_data_changed=status_data_changed,
)
def calc_status_string(self) -> List[str]:
def calc_status_string(self) -> list[str]:
return [
"{:s}: {:s}".format(cmd.msg_type, cmd.msg_info)
for cmd in self._batch if (cmd.msg_type or cmd.msg_info)
@@ -1076,7 +1072,7 @@ class CommandBatch:
def calc_status_text_icon_from_data(
status_data: CommandBatch_StatusFlag,
update_count: int,
) -> Tuple[str, str]:
) -> tuple[str, str]:
# Generate a nice UI string for a status-bar & splash screen (must be short).
#
# NOTE: this is (arguably) UI logic, it's just nice to have it here
@@ -1106,7 +1102,7 @@ class CommandBatch:
# Should never reach this line!
return "Internal error, unknown state!{:s}".format(fail_text), 'ERROR'
def calc_status_log_or_none(self) -> Optional[List[Tuple[str, str]]]:
def calc_status_log_or_none(self) -> Optional[list[tuple[str, str]]]:
"""
Return the log or None if there were no changes since the last call.
"""
@@ -1120,11 +1116,11 @@ class CommandBatch:
for ty, msg in (cmd.msg_log + ([(cmd.msg_type, cmd.msg_info)] if cmd.msg_type == 'PROGRESS' else []))
]
def calc_status_log_since_last_request_or_none(self) -> Optional[List[List[Tuple[str, str]]]]:
def calc_status_log_since_last_request_or_none(self) -> Optional[list[list[tuple[str, str]]]]:
"""
Return a list of new errors per command or None when none are found.
"""
result: List[List[Tuple[str, str]]] = [[] for _ in range(len(self._batch))]
result: list[list[tuple[str, str]]] = [[] for _ in range(len(self._batch))]
found = False
for cmd_index, cmd in enumerate(self._batch):
msg_log_len = len(cmd.msg_log)
@@ -1147,7 +1143,7 @@ class PkgBlock_Normalized(NamedTuple):
@staticmethod
def from_dict_with_error_fn(
block_dict: Dict[str, Any],
block_dict: dict[str, Any],
*,
# Only for useful error messages.
pkg_idname: str,
@@ -1183,9 +1179,9 @@ class PkgManifest_Normalized(NamedTuple):
# Optional.
website: str
permissions: Dict[str, str]
tags: Tuple[str]
wheels: Tuple[str]
permissions: dict[str, str]
tags: tuple[str]
wheels: tuple[str]
# Remote.
archive_size: int
@@ -1196,7 +1192,7 @@ class PkgManifest_Normalized(NamedTuple):
@staticmethod
def from_dict_with_error_fn(
manifest_dict: Dict[str, Any],
manifest_dict: dict[str, Any],
*,
# Only for useful error messages.
pkg_idname: str,
@@ -1223,7 +1219,7 @@ class PkgManifest_Normalized(NamedTuple):
# Optional.
field_website = manifest_dict.get("website", "")
field_permissions: Union[List[str], Dict[str, str]] = manifest_dict.get("permissions", {})
field_permissions: Union[list[str], dict[str, str]] = manifest_dict.get("permissions", {})
field_tags = manifest_dict.get("tags", [])
field_wheels = manifest_dict.get("wheels", [])
@@ -1318,7 +1314,7 @@ class PkgManifest_Normalized(NamedTuple):
def repository_id_with_error_fn(
item: Dict[str, Any],
item: dict[str, Any],
*,
repo_directory: str,
error_fn: Callable[[Exception], None],
@@ -1337,11 +1333,11 @@ def repository_id_with_error_fn(
# Values used to exclude incompatible packages when listing & installing.
class PkgManifest_FilterParams(NamedTuple):
platform: str
blender_version: Tuple[int, int, int]
blender_version: tuple[int, int, int]
def repository_filter_skip(
item: Dict[str, Any],
item: dict[str, Any],
filter_params: PkgManifest_FilterParams,
error_fn: Callable[[Exception], None],
) -> bool:
@@ -1361,15 +1357,15 @@ def pkg_manifest_params_compatible_or_error(
*,
blender_version_min: str,
blender_version_max: str,
platforms: List[str],
this_platform: Tuple[int, int, int],
this_blender_version: Tuple[int, int, int],
platforms: list[str],
this_platform: tuple[int, int, int],
this_blender_version: tuple[int, int, int],
error_fn: Callable[[Exception], None],
) -> Optional[str]:
from .cli.blender_ext import repository_filter_skip as fn
# Weak, create the minimum information for a manifest to be checked against.
item: Dict[str, Any] = {}
item: dict[str, Any] = {}
if blender_version_min:
item["blender_version_min"] = blender_version_min
if blender_version_max:
@@ -1394,11 +1390,11 @@ def pkg_manifest_params_compatible_or_error(
def repository_parse_blocklist(
data: List[Dict[str, Any]],
data: list[dict[str, Any]],
*,
repo_directory: str,
error_fn: Callable[[Exception], None],
) -> Dict[str, PkgBlock_Normalized]:
) -> dict[str, PkgBlock_Normalized]:
pkg_block_map = {}
for item in data:
@@ -1429,13 +1425,13 @@ def repository_parse_blocklist(
def repository_parse_data_filtered(
data: List[Dict[str, Any]],
data: list[dict[str, Any]],
*,
repo_directory: str,
filter_params: PkgManifest_FilterParams,
pkg_block_map: Dict[str, PkgBlock_Normalized],
pkg_block_map: dict[str, PkgBlock_Normalized],
error_fn: Callable[[Exception], None],
) -> Dict[str, PkgManifest_Normalized]:
) -> dict[str, PkgManifest_Normalized]:
pkg_manifest_map = {}
for item in data:
if not isinstance(item, dict):
@@ -1472,7 +1468,7 @@ def repository_parse_data_filtered(
class RepoRemoteData(NamedTuple):
version: str
# Converted from the `data` & `blocklist` fields.
pkg_manifest_map: Dict[str, PkgManifest_Normalized]
pkg_manifest_map: dict[str, PkgManifest_Normalized]
class _RepoDataSouce_ABC(metaclass=abc.ABCMeta):
@@ -1594,7 +1590,7 @@ class _RepoDataSouce_JSON(_RepoDataSouce_ABC):
data = None
mtime = file_mtime_or_none_with_error_fn(self._filepath, error_fn=error_fn) or 0
data_dict: Dict[str, Any] = {}
data_dict: dict[str, Any] = {}
if mtime != 0:
try:
data_dict = json_from_filepath(self._filepath) or {}
@@ -1665,7 +1661,7 @@ class _RepoDataSouce_TOML_FILES(_RepoDataSouce_ABC):
):
self._directory: str = directory
self._filter_params = filter_params
self._mtime_for_each_package: Optional[Dict[str, int]] = None
self._mtime_for_each_package: Optional[dict[str, int]] = None
self._data: Optional[RepoRemoteData] = None
def exists(self) -> bool:
@@ -1712,7 +1708,7 @@ class _RepoDataSouce_TOML_FILES(_RepoDataSouce_ABC):
error_fn=error_fn,
)
pkg_manifest_map: Dict[str, PkgManifest_Normalized] = {}
pkg_manifest_map: dict[str, PkgManifest_Normalized] = {}
for dirname in mtime_for_each_package.keys():
filepath_toml = os.path.join(self._directory, dirname, PKG_MANIFEST_FILENAME_TOML)
try:
@@ -1768,11 +1764,11 @@ class _RepoDataSouce_TOML_FILES(_RepoDataSouce_ABC):
*,
directory: str,
error_fn: Callable[[Exception], None],
) -> Dict[str, int]:
) -> dict[str, int]:
# Caller must check `self.exists()`.
assert os.path.isdir(directory)
mtime_for_each_package: Dict[str, int] = {}
mtime_for_each_package: dict[str, int] = {}
for entry in repository_iter_package_dirs(directory, error_fn=error_fn):
dirname = entry.name
@@ -1781,12 +1777,12 @@ class _RepoDataSouce_TOML_FILES(_RepoDataSouce_ABC):
return mtime_for_each_package
@ classmethod
@classmethod
def _mtime_for_each_package_changed(
cls,
*,
directory: str,
mtime_for_each_package: Dict[str, int],
mtime_for_each_package: dict[str, int],
error_fn: Callable[[Exception], None],
) -> bool:
"""
@@ -1843,8 +1839,8 @@ class _RepoCacheEntry:
self.remote_url = remote_url
# Manifest data per package loaded from the packages local JSON.
# TODO(@ideasman42): use `_RepoDataSouce_ABC` for `pkg_manifest_local`.
self._pkg_manifest_local: Optional[Dict[str, PkgManifest_Normalized]] = None
self._pkg_manifest_remote: Optional[Dict[str, PkgManifest_Normalized]] = None
self._pkg_manifest_local: Optional[dict[str, PkgManifest_Normalized]] = None
self._pkg_manifest_remote: Optional[dict[str, PkgManifest_Normalized]] = None
self._pkg_manifest_remote_data_source: _RepoDataSouce_ABC = (
_RepoDataSouce_JSON(directory, filter_params) if remote_url else
_RepoDataSouce_TOML_FILES(directory, filter_params)
@@ -1858,14 +1854,14 @@ class _RepoCacheEntry:
error_fn: Callable[[Exception], None],
check_files: bool = False,
ignore_missing: bool = False,
) -> Optional[Dict[str, PkgManifest_Normalized]]:
) -> Optional[dict[str, PkgManifest_Normalized]]:
data = self._pkg_manifest_remote_data_source.data(
cache_validate=check_files,
force=False,
error_fn=error_fn,
)
pkg_manifest_remote: Optional[Dict[str, PkgManifest_Normalized]] = None
pkg_manifest_remote: Optional[dict[str, PkgManifest_Normalized]] = None
if data is not None:
pkg_manifest_remote = data.pkg_manifest_map
@@ -1888,14 +1884,14 @@ class _RepoCacheEntry:
*,
error_fn: Callable[[Exception], None],
force: bool = False,
) -> Optional[Dict[str, PkgManifest_Normalized]]:
) -> Optional[dict[str, PkgManifest_Normalized]]:
data = self._pkg_manifest_remote_data_source.data(
cache_validate=True,
force=force,
error_fn=error_fn,
)
pkg_manifest_remote: Optional[Dict[str, PkgManifest_Normalized]] = None
pkg_manifest_remote: Optional[dict[str, PkgManifest_Normalized]] = None
if data is not None:
pkg_manifest_remote = data.pkg_manifest_map
@@ -1909,7 +1905,7 @@ class _RepoCacheEntry:
*,
error_fn: Callable[[Exception], None],
ignore_missing: bool = False,
) -> Optional[Dict[str, PkgManifest_Normalized]]:
) -> Optional[dict[str, PkgManifest_Normalized]]:
# Important for local-only repositories (where the directory name defines the ID).
has_remote = self.remote_url != ""
@@ -1971,7 +1967,7 @@ class _RepoCacheEntry:
*,
error_fn: Callable[[Exception], None],
ignore_missing: bool = False,
) -> Optional[Dict[str, PkgManifest_Normalized]]:
) -> Optional[dict[str, PkgManifest_Normalized]]:
if self._pkg_manifest_remote is None:
self._json_data_ensure(
ignore_missing=ignore_missing,
@@ -1990,8 +1986,8 @@ class RepoCacheStore:
"_is_init",
)
def __init__(self, blender_version: Tuple[int, int, int]) -> None:
self._repos: List[_RepoCacheEntry] = []
def __init__(self, blender_version: tuple[int, int, int]) -> None:
self._repos: list[_RepoCacheEntry] = []
self._filter_params = PkgManifest_FilterParams(
platform=platform_from_this_system(),
blender_version=blender_version,
@@ -2003,7 +1999,7 @@ class RepoCacheStore:
def refresh_from_repos(
self, *,
repos: List[Tuple[str, str]],
repos: list[tuple[str, str]],
force: bool = False,
) -> None:
"""
@@ -2028,7 +2024,7 @@ class RepoCacheStore:
*,
error_fn: Callable[[Exception], None],
force: bool = False,
) -> Optional[Dict[str, PkgManifest_Normalized]]:
) -> Optional[dict[str, PkgManifest_Normalized]]:
for repo_entry in self._repos:
if directory == repo_entry.directory:
# pylint: disable-next=protected-access
@@ -2041,7 +2037,7 @@ class RepoCacheStore:
*,
error_fn: Callable[[Exception], None],
ignore_missing: bool = False,
) -> Optional[Dict[str, PkgManifest_Normalized]]:
) -> Optional[dict[str, PkgManifest_Normalized]]:
for repo_entry in self._repos:
if directory == repo_entry.directory:
# Force refresh.
@@ -2058,8 +2054,8 @@ class RepoCacheStore:
error_fn: Callable[[Exception], None],
check_files: bool = False,
ignore_missing: bool = False,
directory_subset: Optional[Set[str]] = None,
) -> Generator[Optional[Dict[str, PkgManifest_Normalized]], None, None]:
directory_subset: Optional[set[str]] = None,
) -> Generator[Optional[dict[str, PkgManifest_Normalized]], None, None]:
for repo_entry in self._repos:
if directory_subset is not None:
if repo_entry.directory not in directory_subset:
@@ -2083,8 +2079,8 @@ class RepoCacheStore:
error_fn: Callable[[Exception], None],
check_files: bool = False,
ignore_missing: bool = False,
directory_subset: Optional[Set[str]] = None,
) -> Generator[Optional[Dict[str, PkgManifest_Normalized]], None, None]:
directory_subset: Optional[set[str]] = None,
) -> Generator[Optional[dict[str, PkgManifest_Normalized]], None, None]:
for repo_entry in self._repos:
if directory_subset is not None:
if repo_entry.directory not in directory_subset:
@@ -2136,7 +2132,7 @@ class RepoLock:
"""
assert len(cookie) <= _REPO_LOCK_SIZE_LIMIT, "Unreachable"
self._repo_directories = tuple(repo_directories)
self._repo_lock_files: List[Tuple[str, str]] = []
self._repo_lock_files: list[tuple[str, str]] = []
self._held = False
self._cookie = cookie
@@ -2167,7 +2163,7 @@ class RepoLock:
return "lock file could not be removed ({:s})".format(str(ex))
return None
def acquire(self) -> Dict[str, Optional[str]]:
def acquire(self) -> dict[str, Optional[str]]:
"""
Return directories and the lock status,
with None if locking succeeded.
@@ -2178,7 +2174,7 @@ class RepoLock:
raise Exception("acquire(): cookie doesn't exist! (when it should)")
# Assume all succeed.
result: Dict[str, Optional[str]] = {directory: None for directory in self._repo_directories}
result: dict[str, Optional[str]] = {directory: None for directory in self._repo_directories}
for directory in self._repo_directories:
local_private_dir = os.path.join(directory, REPO_LOCAL_PRIVATE_DIR)
@@ -2213,12 +2209,12 @@ class RepoLock:
self._held = True
return result
def release(self) -> Dict[str, Optional[str]]:
def release(self) -> dict[str, Optional[str]]:
# NOTE: lots of error checks here, mostly to give insights in the very unlikely case this fails.
if not self._held:
raise Exception("release(): called without a lock!")
result: Dict[str, Optional[str]] = {directory: None for directory in self._repo_directories}
result: dict[str, Optional[str]] = {directory: None for directory in self._repo_directories}
for directory, local_lock_file in self._repo_lock_files:
if not os.path.exists(local_lock_file):
result[directory] = "release(): lock missing when expected, continuing."
@@ -2252,7 +2248,7 @@ class RepoLockContext:
def __init__(self, *, repo_directories: Sequence[str], cookie: str):
self._repo_lock = RepoLock(repo_directories=repo_directories, cookie=cookie)
def __enter__(self) -> Dict[str, Optional[str]]:
def __enter__(self) -> dict[str, Optional[str]]:
return self._repo_lock.acquire()
def __exit__(self, _ty: Any, _value: Any, _traceback: Any) -> None:
@@ -2266,7 +2262,7 @@ class RepoLockContext:
def repo_lock_directory_query(
directory: str,
cookie: str,
) -> Optional[Tuple[bool, float, str]]:
) -> Optional[tuple[bool, float, str]]:
local_lock_file = os.path.join(directory, REPO_LOCAL_PRIVATE_DIR, REPO_LOCAL_PRIVATE_LOCK)
cookie_is_ours = False

View File

@@ -27,14 +27,10 @@ import zipfile
from typing import (
Any,
Dict,
Generator,
IO,
Optional,
Sequence,
List,
Set,
Tuple,
Callable,
NamedTuple,
Union,
@@ -198,11 +194,11 @@ del _ArgsDefaultOverride
# pylint: disable-next=redefined-builtin
def print(*args: Any, **kw: Dict[str, Any]) -> None:
def print(*args: Any, **kw: dict[str, Any]) -> None:
raise Exception("Illegal print(*({!r}), **{{{!r}}})".format(args, kw))
# # Useful for testing.
# def print(*args: Any, **kw: Dict[str, Any]):
# def print(*args: Any, **kw: dict[str, Any]):
# __builtins__["print"](*args, **kw, file=open('/tmp/output.txt', 'a'))
@@ -267,7 +263,7 @@ def force_exit_ok_enable() -> None:
# -----------------------------------------------------------------------------
# Generic Functions
def execfile(filepath: str) -> Dict[str, Any]:
def execfile(filepath: str) -> dict[str, Any]:
global_namespace = {"__file__": filepath, "__name__": "__main__"}
with open(filepath, "rb") as fh:
# pylint: disable-next=exec-used
@@ -326,22 +322,22 @@ class CleanupPathsContext:
class PkgRepoData(NamedTuple):
version: str
blocklist: List[Dict[str, Any]]
data: List[Dict[str, Any]]
blocklist: list[dict[str, Any]]
data: list[dict[str, Any]]
class PkgManifest_Build(NamedTuple):
"""Package Build Information (for the "build" sub-command)."""
paths: Optional[List[str]]
paths_exclude_pattern: Optional[List[str]]
paths: Optional[list[str]]
paths_exclude_pattern: Optional[list[str]]
@staticmethod
def _from_dict_impl(
manifest_build_dict: Dict[str, Any],
manifest_build_dict: dict[str, Any],
*,
extra_paths: Sequence[str],
all_errors: bool,
) -> Union["PkgManifest_Build", List[str]]:
) -> Union["PkgManifest_Build", list[str]]:
# TODO: generalize the type checks, see: `pkg_manifest_is_valid_or_error_impl`.
error_list = []
if value := manifest_build_dict.get("paths"):
@@ -381,9 +377,9 @@ class PkgManifest_Build(NamedTuple):
@staticmethod
def from_dict_all_errors(
manifest_build_dict: Dict[str, Any],
manifest_build_dict: dict[str, Any],
extra_paths: Sequence[str],
) -> Union["PkgManifest_Build", List[str]]:
) -> Union["PkgManifest_Build", list[str]]:
return PkgManifest_Build._from_dict_impl(
manifest_build_dict,
extra_paths=extra_paths,
@@ -400,17 +396,17 @@ class PkgManifest(NamedTuple):
version: str
type: str
maintainer: str
license: List[str]
license: list[str]
blender_version_min: str
# Optional (set all defaults).
blender_version_max: Optional[str] = None
website: Optional[str] = None
copyright: Optional[List[str]] = None
permissions: Optional[List[str]] = None
tags: Optional[List[str]] = None
platforms: Optional[List[str]] = None
wheels: Optional[List[str]] = None
copyright: Optional[list[str]] = None
permissions: Optional[list[str]] = None
tags: Optional[list[str]] = None
platforms: Optional[list[str]] = None
wheels: Optional[list[str]] = None
class PkgManifest_Archive(NamedTuple):
@@ -425,7 +421,7 @@ class PkgManifest_Archive(NamedTuple):
class PkgServerRepoConfig(NamedTuple):
"""Server configuration (for generating repositories)."""
schema_version: str
blocklist: List[Dict[str, Any]]
blocklist: list[dict[str, Any]]
# -----------------------------------------------------------------------------
@@ -485,7 +481,7 @@ def sha256_from_file_or_error(
filepath: str,
block_size: int = 1 << 20,
hash_prefix: bool = False,
) -> Union[Tuple[int, str], str]:
) -> Union[tuple[int, str], str]:
"""
Returns an arbitrary sized unique ASCII string based on the file contents.
(exact hashing method may change).
@@ -518,7 +514,7 @@ def scandir_recursive_impl(
path: str,
*,
filter_fn: Callable[[str, bool], bool],
) -> Generator[Tuple[str, str], None, None]:
) -> Generator[tuple[str, str], None, None]:
"""Recursively yield DirEntry objects for given directory."""
for entry in os.scandir(path):
if entry.is_symlink():
@@ -544,7 +540,7 @@ def scandir_recursive_impl(
def scandir_recursive(
path: str,
filter_fn: Callable[[str, bool], bool],
) -> Generator[Tuple[str, str], None, None]:
) -> Generator[tuple[str, str], None, None]:
yield from scandir_recursive_impl(path, path, filter_fn=filter_fn)
@@ -619,7 +615,7 @@ def rmtree_with_fallback_or_error(
def rmtree_with_fallback_or_error_pseudo_atomic(
path: str,
*,
temp_prefix_and_suffix: Tuple[str, str],
temp_prefix_and_suffix: tuple[str, str],
remove_file: bool = True,
remove_link: bool = True,
) -> Optional[str]:
@@ -686,7 +682,7 @@ def rmtree_with_fallback_or_error_pseudo_atomic(
def build_paths_expand_iter(
path: str,
path_list: Sequence[str],
) -> Generator[Tuple[str, str], None, None]:
) -> Generator[tuple[str, str], None, None]:
"""
Expand paths from a path list which always uses "/" slashes.
"""
@@ -732,12 +728,12 @@ def filepath_skip_compress(filepath: str) -> bool:
def pkg_manifest_from_dict_and_validate_impl(
data: Dict[Any, Any],
data: dict[Any, Any],
*,
from_repo: bool,
all_errors: bool,
strict: bool,
) -> Union[PkgManifest, List[str]]:
) -> Union[PkgManifest, list[str]]:
error_list = []
# Validate the dictionary.
if all_errors:
@@ -750,7 +746,7 @@ def pkg_manifest_from_dict_and_validate_impl(
if error_list:
return error_list
values: List[str] = []
values: list[str] = []
for key in PkgManifest._fields:
val = data.get(key, ...)
if val is ...:
@@ -760,7 +756,7 @@ def pkg_manifest_from_dict_and_validate_impl(
assert val is not ...
values.append(val)
kw_args: Dict[str, Any] = dict(zip(PkgManifest._fields, values, strict=True))
kw_args: dict[str, Any] = dict(zip(PkgManifest._fields, values, strict=True))
manifest = PkgManifest(**kw_args)
# There could be other validation, leave these as-is.
@@ -768,7 +764,7 @@ def pkg_manifest_from_dict_and_validate_impl(
def pkg_manifest_from_dict_and_validate(
data: Dict[Any, Any],
data: dict[Any, Any],
from_repo: bool,
strict: bool,
) -> Union[PkgManifest, str]:
@@ -779,10 +775,10 @@ def pkg_manifest_from_dict_and_validate(
def pkg_manifest_from_dict_and_validate_all_errros(
data: Dict[Any, Any],
data: dict[Any, Any],
from_repo: bool,
strict: bool,
) -> Union[PkgManifest, List[str]]:
) -> Union[PkgManifest, list[str]]:
"""
Validate the manifest and return all errors.
"""
@@ -790,7 +786,7 @@ def pkg_manifest_from_dict_and_validate_all_errros(
def pkg_manifest_archive_from_dict_and_validate(
data: Dict[Any, Any],
data: dict[Any, Any],
strict: bool,
) -> Union[PkgManifest_Archive, str]:
manifest = pkg_manifest_from_dict_and_validate(data, from_repo=True, strict=strict)
@@ -811,7 +807,7 @@ def pkg_manifest_archive_from_dict_and_validate(
def pkg_manifest_from_toml_and_validate_all_errors(
filepath: str,
strict: bool,
) -> Union[PkgManifest, List[str]]:
) -> Union[PkgManifest, list[str]]:
"""
This function is responsible for not letting invalid manifest from creating packages with ID names
or versions that would not properly install.
@@ -862,7 +858,7 @@ def pkg_manifest_from_zipfile_and_validate_impl(
archive_subdir: str,
all_errors: bool,
strict: bool,
) -> Union[PkgManifest, List[str]]:
) -> Union[PkgManifest, list[str]]:
"""
Validate the manifest and return all errors.
"""
@@ -914,7 +910,7 @@ def pkg_manifest_from_zipfile_and_validate_all_errors(
zip_fh: zipfile.ZipFile,
archive_subdir: str,
strict: bool,
) -> Union[PkgManifest, List[str]]:
) -> Union[PkgManifest, list[str]]:
return pkg_manifest_from_zipfile_and_validate_impl(
zip_fh,
archive_subdir,
@@ -1119,8 +1115,8 @@ class PathPatternMatch:
"_regex_list",
)
def __init__(self, path_patterns: List[str]):
self._regex_list: List[Tuple[bool, re.Pattern[str]]] = PathPatternMatch._pattern_match_as_regex(path_patterns)
def __init__(self, path_patterns: list[str]):
self._regex_list: list[tuple[bool, re.Pattern[str]]] = PathPatternMatch._pattern_match_as_regex(path_patterns)
def test_path(self, path: str) -> bool:
assert not path.startswith("/")
@@ -1255,9 +1251,9 @@ class PathPatternMatch:
return pattern
@staticmethod
def _pattern_match_as_regex(path_patterns: Sequence[str]) -> List[Tuple[bool, re.Pattern[str]]]:
def _pattern_match_as_regex(path_patterns: Sequence[str]) -> list[tuple[bool, re.Pattern[str]]]:
# First group negative-positive expressions.
pattern_groups: List[Tuple[bool, List[str]]] = []
pattern_groups: list[tuple[bool, list[str]]] = []
for pattern in path_patterns:
if pattern.startswith("!"):
pattern = pattern.lstrip("!")
@@ -1276,7 +1272,7 @@ class PathPatternMatch:
else:
pattern_groups.append((negate, [pattern_regex]))
result: List[Tuple[bool, re.Pattern[str]]] = []
result: list[tuple[bool, re.Pattern[str]]] = []
for negate, pattern_list in pattern_groups:
result.append((negate, re.compile("(?:{:s})".format("|".join(pattern_list)), re.MULTILINE)))
# print(result)
@@ -1291,10 +1287,10 @@ def url_retrieve_to_data_iter(
url: str,
*,
data: Optional[Any] = None,
headers: Dict[str, str],
headers: dict[str, str],
chunk_size: int,
timeout_in_seconds: float,
) -> Generator[Tuple[bytes, int, Any], None, None]:
) -> Generator[tuple[bytes, int, Any], None, None]:
"""
Retrieve a URL into a temporary location on disk.
@@ -1358,11 +1354,11 @@ def url_retrieve_to_filepath_iter(
url: str,
filepath: str,
*,
headers: Dict[str, str],
headers: dict[str, str],
data: Optional[Any] = None,
chunk_size: int,
timeout_in_seconds: float,
) -> Generator[Tuple[int, int, Any], None, None]:
) -> Generator[tuple[int, int, Any], None, None]:
# Handle temporary file setup.
with open(filepath, 'wb') as fh_output:
for block, size, response_headers in url_retrieve_to_data_iter(
@@ -1382,7 +1378,7 @@ def filepath_retrieve_to_filepath_iter(
*,
chunk_size: int,
timeout_in_seconds: float,
) -> Generator[Tuple[int, int], None, None]:
) -> Generator[tuple[int, int], None, None]:
# TODO: `timeout_in_seconds`.
# Handle temporary file setup.
_ = timeout_in_seconds
@@ -1396,7 +1392,7 @@ def filepath_retrieve_to_filepath_iter(
def url_retrieve_to_data_iter_or_filesystem(
url: str,
headers: Dict[str, str],
headers: dict[str, str],
chunk_size: int,
timeout_in_seconds: float,
) -> Generator[bytes, None, None]:
@@ -1421,10 +1417,10 @@ def url_retrieve_to_data_iter_or_filesystem(
def url_retrieve_to_filepath_iter_or_filesystem(
url: str,
filepath: str,
headers: Dict[str, str],
headers: dict[str, str],
chunk_size: int,
timeout_in_seconds: float,
) -> Generator[Tuple[int, int], None, None]:
) -> Generator[tuple[int, int], None, None]:
"""
Callers should catch: ``(Exception, KeyboardInterrupt)`` and convert them to message using:
``url_retrieve_exception_as_message``.
@@ -1523,7 +1519,7 @@ def pkg_manifest_validate_terse_description_or_error(value: str) -> Optional[str
def pkg_manifest_tags_load_valid_map_from_python(
valid_tags_filepath: str,
) -> Union[str, Dict[str, Set[str]]]:
) -> Union[str, dict[str, set[str]]]:
try:
data = execfile(valid_tags_filepath)
except Exception as ex:
@@ -1546,7 +1542,7 @@ def pkg_manifest_tags_load_valid_map_from_python(
def pkg_manifest_tags_load_valid_map_from_json(
valid_tags_filepath: str,
) -> Union[str, Dict[str, Set[str]]]:
) -> Union[str, dict[str, set[str]]]:
try:
with open(valid_tags_filepath, "rb") as fh:
data = json.load(fh)
@@ -1573,7 +1569,7 @@ def pkg_manifest_tags_load_valid_map_from_json(
def pkg_manifest_tags_load_valid_map(
valid_tags_filepath: str,
) -> Union[str, Dict[str, Set[str]]]:
) -> Union[str, dict[str, set[str]]]:
# Allow Python data (Blender stores this internally).
if valid_tags_filepath.endswith(".py"):
return pkg_manifest_tags_load_valid_map_from_python(valid_tags_filepath)
@@ -1581,9 +1577,9 @@ def pkg_manifest_tags_load_valid_map(
def pkg_manifest_tags_valid_or_error(
valid_tags_data: Dict[str, Any],
valid_tags_data: dict[str, Any],
manifest_type: str,
manifest_tags: List[str],
manifest_tags: list[str],
) -> Optional[str]:
valid_tags = valid_tags_data[manifest_type]
for tag in manifest_tags:
@@ -1641,7 +1637,7 @@ def pkg_manifest_validate_field_any_non_empty_string_stripped_no_control_chars(
return None
def pkg_manifest_validate_field_any_list_of_non_empty_strings(value: List[Any], strict: bool) -> Optional[str]:
def pkg_manifest_validate_field_any_list_of_non_empty_strings(value: list[Any], strict: bool) -> Optional[str]:
_ = strict
for i, tag in enumerate(value):
if not isinstance(tag, str):
@@ -1652,7 +1648,7 @@ def pkg_manifest_validate_field_any_list_of_non_empty_strings(value: List[Any],
def pkg_manifest_validate_field_any_non_empty_list_of_non_empty_strings(
value: List[Any],
value: list[Any],
strict: bool,
) -> Optional[str]:
if not value:
@@ -1746,7 +1742,7 @@ def pkg_manifest_validate_field_tagline(value: str, strict: bool) -> Optional[st
def pkg_manifest_validate_field_copyright(
value: List[str],
value: list[str],
strict: bool,
) -> Optional[str]:
if strict:
@@ -1774,10 +1770,10 @@ def pkg_manifest_validate_field_copyright(
def pkg_manifest_validate_field_permissions(
value: Union[
# `Dict[str, str]` is expected but at this point it's only guaranteed to be a dict.
Dict[Any, Any],
# `dict[str, str]` is expected but at this point it's only guaranteed to be a dict.
dict[Any, Any],
# Kept for old files.
List[Any],
list[Any],
],
strict: bool,
) -> Optional[str]:
@@ -1828,9 +1824,9 @@ def pkg_manifest_validate_field_permissions(
return None
def pkg_manifest_validate_field_build_path_list(value: List[Any], strict: bool) -> Optional[str]:
def pkg_manifest_validate_field_build_path_list(value: list[Any], strict: bool) -> Optional[str]:
_ = strict
value_duplicate_check: Set[str] = set()
value_duplicate_check: set[str] = set()
for item in value:
if not isinstance(item, str):
@@ -1870,7 +1866,7 @@ def pkg_manifest_validate_field_build_path_list(value: List[Any], strict: bool)
def pkg_manifest_validate_field_wheels(
value: List[Any],
value: list[Any],
strict: bool,
) -> Optional[str]:
if (error := pkg_manifest_validate_field_any_list_of_non_empty_strings(value, strict)) is not None:
@@ -1935,8 +1931,8 @@ def pkg_manifest_validate_field_archive_hash(
# Keep in sync with `PkgManifest`.
# key, type, check_fn.
pkg_manifest_known_keys_and_types: Tuple[
Tuple[str, Union[type, Tuple[type, ...]], Callable[[Any, bool], Optional[str]]],
pkg_manifest_known_keys_and_types: tuple[
tuple[str, Union[type, tuple[type, ...]], Callable[[Any, bool], Optional[str]]],
...,
] = (
("id", str, pkg_manifest_validate_field_idname),
@@ -1961,8 +1957,8 @@ pkg_manifest_known_keys_and_types: Tuple[
)
# Keep in sync with `PkgManifest_Archive`.
pkg_manifest_known_keys_and_types_from_repo: Tuple[
Tuple[str, type, Callable[[Any, bool], Optional[str]]],
pkg_manifest_known_keys_and_types_from_repo: tuple[
tuple[str, type, Callable[[Any, bool], Optional[str]]],
...,
] = (
("archive_size", int, pkg_manifest_validate_field_archive_size),
@@ -1975,12 +1971,12 @@ pkg_manifest_known_keys_and_types_from_repo: Tuple[
# Manifest Validation
def pkg_manifest_is_valid_or_error_impl(
data: Dict[str, Any],
data: dict[str, Any],
*,
from_repo: bool,
all_errors: bool,
strict: bool,
) -> Optional[List[str]]:
) -> Optional[list[str]]:
if not isinstance(data, dict):
return ["Expected value to be a dict, not a {!r}".format(type(data))]
@@ -1990,7 +1986,7 @@ def pkg_manifest_is_valid_or_error_impl(
error_list = []
value_extract: Dict[str, Optional[object]] = {}
value_extract: dict[str, Optional[object]] = {}
for known_types in (
(pkg_manifest_known_keys_and_types, pkg_manifest_known_keys_and_types_from_repo) if from_repo else
(pkg_manifest_known_keys_and_types, )
@@ -2047,7 +2043,7 @@ def pkg_manifest_is_valid_or_error_impl(
def pkg_manifest_is_valid_or_error(
data: Dict[str, Any],
data: dict[str, Any],
*,
from_repo: bool,
strict: bool,
@@ -2064,11 +2060,11 @@ def pkg_manifest_is_valid_or_error(
def pkg_manifest_is_valid_or_error_all(
data: Dict[str, Any],
data: dict[str, Any],
*,
from_repo: bool,
strict: bool,
) -> Optional[List[str]]:
) -> Optional[list[str]]:
return pkg_manifest_is_valid_or_error_impl(
data,
from_repo=from_repo,
@@ -2080,7 +2076,7 @@ def pkg_manifest_is_valid_or_error_all(
# -----------------------------------------------------------------------------
# Manifest Utilities
def pkg_manifest_dict_apply_build_generated_table(manifest_dict: Dict[str, Any]) -> None:
def pkg_manifest_dict_apply_build_generated_table(manifest_dict: dict[str, Any]) -> None:
# Swap in values from `[build.generated]` if it exists:
if (build_generated := manifest_dict.get("build", {}).get("generated")) is None:
return
@@ -2192,14 +2188,14 @@ def blender_platform_compatible_with_wheel_platform_from_filepath(platform: str,
def paths_filter_wheels_by_platform(
wheels: List[str],
wheels: list[str],
platform: str,
) -> List[str]:
) -> list[str]:
"""
All paths are wheels with filenames that follow the wheel spec.
Return wheels which are compatible with the ``platform``.
"""
wheels_result: List[str] = []
wheels_result: list[str] = []
for wheel_filepath in wheels:
if blender_platform_compatible_with_wheel_platform_from_filepath(platform, wheel_filepath):
@@ -2209,14 +2205,14 @@ def paths_filter_wheels_by_platform(
def build_paths_filter_wheels_by_platform(
build_paths: List[Tuple[str, str]],
build_paths: list[tuple[str, str]],
platform: str,
) -> List[Tuple[str, str]]:
) -> list[tuple[str, str]]:
"""
All paths are wheels with filenames that follow the wheel spec.
Return wheels which are compatible with the ``platform``.
"""
build_paths_for_platform: List[Tuple[str, str]] = []
build_paths_for_platform: list[tuple[str, str]] = []
for item in build_paths:
if blender_platform_compatible_with_wheel_platform_from_filepath(platform, item[1]):
@@ -2226,10 +2222,10 @@ def build_paths_filter_wheels_by_platform(
def build_paths_filter_by_platform(
build_paths: List[Tuple[str, str]],
wheel_range: Tuple[int, int],
platforms: Tuple[str, ...],
) -> Generator[Tuple[List[Tuple[str, str]], str], None, None]:
build_paths: list[tuple[str, str]],
wheel_range: tuple[int, int],
platforms: tuple[str, ...],
) -> Generator[tuple[list[tuple[str, str]], str], None, None]:
if not platforms:
yield (build_paths, "")
return
@@ -2258,9 +2254,9 @@ def build_paths_filter_by_platform(
def repository_filter_skip(
item: Dict[str, Any],
item: dict[str, Any],
*,
filter_blender_version: Tuple[int, int, int],
filter_blender_version: tuple[int, int, int],
filter_platform: str,
# When `skip_message_fn` is set, returning true must call the `skip_message_fn` function.
skip_message_fn: Optional[Callable[[str], None]],
@@ -2326,9 +2322,9 @@ def repository_filter_skip(
return False
def blender_version_parse_or_error(version: str) -> Union[Tuple[int, int, int], str]:
def blender_version_parse_or_error(version: str) -> Union[tuple[int, int, int], str]:
try:
version_tuple: Tuple[int, ...] = tuple(int(x) for x in version.split("."))
version_tuple: tuple[int, ...] = tuple(int(x) for x in version.split("."))
except Exception as ex:
return "unable to parse blender version: {:s}, {:s}".format(version, str(ex))
@@ -2342,7 +2338,7 @@ def blender_version_parse_or_error(version: str) -> Union[Tuple[int, int, int],
)
def blender_version_parse_any_or_error(version: Any) -> Union[Tuple[int, int, int], str]:
def blender_version_parse_any_or_error(version: Any) -> Union[tuple[int, int, int], str]:
if not isinstance(version, str):
return "blender version should be a string, found a: {:s}".format(str(type(version)))
@@ -2351,7 +2347,7 @@ def blender_version_parse_any_or_error(version: Any) -> Union[Tuple[int, int, in
return result
def url_request_headers_create(*, accept_json: bool, user_agent: str, access_token: str) -> Dict[str, str]:
def url_request_headers_create(*, accept_json: bool, user_agent: str, access_token: str) -> dict[str, str]:
headers = {}
if accept_json:
# Default for JSON requests this allows top-level URL's to be used.
@@ -2417,7 +2413,7 @@ def repo_json_is_valid_or_error(filepath: str) -> Optional[str]:
return None
def pkg_manifest_toml_is_valid_or_error(filepath: str, strict: bool) -> Tuple[Optional[str], Dict[str, Any]]:
def pkg_manifest_toml_is_valid_or_error(filepath: str, strict: bool) -> tuple[Optional[str], dict[str, Any]]:
if not os.path.exists(filepath):
return "File missing: " + filepath, {}
@@ -2433,7 +2429,7 @@ def pkg_manifest_toml_is_valid_or_error(filepath: str, strict: bool) -> Tuple[Op
return None, result
def pkg_manifest_detect_duplicates(pkg_items: List[PkgManifest]) -> Optional[str]:
def pkg_manifest_detect_duplicates(pkg_items: list[PkgManifest]) -> Optional[str]:
"""
When a repository includes multiple packages with the same ID, ensure they don't conflict.
@@ -2448,7 +2444,7 @@ def pkg_manifest_detect_duplicates(pkg_items: List[PkgManifest]) -> Optional[str
dummy_verion_min = 0, 0, 0
dummy_verion_max = 1000, 0, 0
def parse_version_or_default(version: Optional[str], default: Tuple[int, int, int]) -> Tuple[int, int, int]:
def parse_version_or_default(version: Optional[str], default: tuple[int, int, int]) -> tuple[int, int, int]:
if version is None:
return default
if isinstance(version_parsed := blender_version_parse_or_error(version), str):
@@ -2457,7 +2453,7 @@ def pkg_manifest_detect_duplicates(pkg_items: List[PkgManifest]) -> Optional[str
return default
return version_parsed
def version_range_as_str(version_min: Tuple[int, int, int], version_max: Tuple[int, int, int]) -> str:
def version_range_as_str(version_min: tuple[int, int, int], version_max: tuple[int, int, int]) -> str:
dummy_min = version_min == dummy_verion_min
dummy_max = version_max == dummy_verion_max
if dummy_min and dummy_max:
@@ -2473,7 +2469,7 @@ def pkg_manifest_detect_duplicates(pkg_items: List[PkgManifest]) -> Optional[str
for platform in (manifest.platforms or ())
)))
manifest_per_platform: Dict[str, List[PkgManifest]] = {platform: [] for platform in platforms_all}
manifest_per_platform: dict[str, list[PkgManifest]] = {platform: [] for platform in platforms_all}
if platforms_all:
for manifest in pkg_items:
# No platforms means all platforms.
@@ -2490,7 +2486,7 @@ def pkg_manifest_detect_duplicates(pkg_items: List[PkgManifest]) -> Optional[str
if len(pkg_items_platform) == 1:
continue
version_ranges: List[Tuple[Tuple[int, int, int], Tuple[int, int, int]]] = []
version_ranges: list[tuple[tuple[int, int, int], tuple[int, int, int]]] = []
for manifest in pkg_items_platform:
version_ranges.append((
parse_version_or_default(manifest.blender_version_min, dummy_verion_min),
@@ -2534,7 +2530,7 @@ def pkg_manifest_detect_duplicates(pkg_items: List[PkgManifest]) -> Optional[str
return None
def toml_from_bytes_or_error(data: bytes) -> Union[Dict[str, Any], str]:
def toml_from_bytes_or_error(data: bytes) -> Union[dict[str, Any], str]:
try:
result = tomllib.loads(data.decode('utf-8'))
assert isinstance(result, dict)
@@ -2543,7 +2539,7 @@ def toml_from_bytes_or_error(data: bytes) -> Union[Dict[str, Any], str]:
return str(ex)
def toml_from_filepath_or_error(filepath: str) -> Union[Dict[str, Any], str]:
def toml_from_filepath_or_error(filepath: str) -> Union[dict[str, Any], str]:
try:
with open(filepath, "rb") as fh:
data = fh.read()
@@ -2710,7 +2706,7 @@ def repo_sync_from_remote(
return True
def repo_pkginfo_from_local_as_dict_or_error(*, local_dir: str) -> Union[Dict[str, Any], str]:
def repo_pkginfo_from_local_as_dict_or_error(*, local_dir: str) -> Union[dict[str, Any], str]:
"""
Load package cache.
"""
@@ -2730,7 +2726,7 @@ def repo_pkginfo_from_local_as_dict_or_error(*, local_dir: str) -> Union[Dict[st
return result
def pkg_repo_data_from_json_or_error(json_data: Dict[str, Any]) -> Union[PkgRepoData, str]:
def pkg_repo_data_from_json_or_error(json_data: dict[str, Any]) -> Union[PkgRepoData, str]:
if not isinstance((version := json_data.get("version", "v1")), str):
return "expected \"version\" to be a string"
@@ -2794,7 +2790,7 @@ def arg_handle_str_as_package_names(value: str) -> Sequence[str]:
return result
def arg_handle_str_as_temp_prefix_and_suffix(value: str) -> Tuple[str, str]:
def arg_handle_str_as_temp_prefix_and_suffix(value: str) -> tuple[str, str]:
if (value.count("/") != 1) and (len(value) > 1):
raise argparse.ArgumentTypeError("Must contain a \"/\" character with a prefix and/or suffix")
a, b = value.split("/", 1)
@@ -3225,7 +3221,7 @@ class subcmd_server:
msglog: MessageLogger,
*,
repo_dir: str,
repo_data: List[Dict[str, Any]],
repo_data: list[dict[str, Any]],
html_template_filepath: str,
) -> bool:
import html
@@ -3240,7 +3236,7 @@ class subcmd_server:
fh = io.StringIO()
# Group extensions by their type.
repo_data_by_type: Dict[str, List[Dict[str, Any]]] = {}
repo_data_by_type: dict[str, list[dict[str, Any]]] = {}
for manifest_dict in repo_data:
manifest_type = manifest_dict["type"]
@@ -3403,8 +3399,8 @@ class subcmd_server:
return False
assert repo_config is None or isinstance(repo_config, PkgServerRepoConfig)
repo_data_idname_map: Dict[str, List[PkgManifest]] = {}
repo_data: List[Dict[str, Any]] = []
repo_data_idname_map: dict[str, list[PkgManifest]] = {}
repo_data: list[dict[str, Any]] = []
# Write package meta-data into each directory.
repo_gen_dict = {
@@ -3560,7 +3556,7 @@ class subcmd_client:
return False
del result_dict
items: List[Dict[str, Any]] = repo_gen_dict.data
items: list[dict[str, Any]] = repo_gen_dict.data
items.sort(key=lambda elem: elem.get("id", ""))
request_exit = False
@@ -3609,9 +3605,9 @@ class subcmd_client:
*,
local_dir: str,
filepath_archive: str,
blender_version_tuple: Tuple[int, int, int],
blender_version_tuple: tuple[int, int, int],
manifest_compare: Optional[PkgManifest],
temp_prefix_and_suffix: Tuple[str, str],
temp_prefix_and_suffix: tuple[str, str],
) -> bool:
# NOTE: Don't use `FATAL_ERROR` because other packages will attempt to install.
@@ -3619,7 +3615,7 @@ class subcmd_client:
# Used for installing from local cache as well as installing a local package from a file.
# Remove `filepath_local_pkg_temp` if this block exits.
directories_to_clean: List[str] = []
directories_to_clean: list[str] = []
with CleanupPathsContext(files=(), directories=directories_to_clean):
try:
# pylint: disable-next=consider-using-with
@@ -3732,7 +3728,7 @@ class subcmd_client:
local_dir: str,
package_files: Sequence[str],
blender_version: str,
temp_prefix_and_suffix: Tuple[str, str],
temp_prefix_and_suffix: tuple[str, str],
) -> bool:
if not os.path.exists(local_dir):
msglog.fatal_error("destination directory \"{:s}\" does not exist".format(local_dir))
@@ -3744,7 +3740,7 @@ class subcmd_client:
assert isinstance(blender_version_tuple, tuple)
# This is a simple file extraction, the main difference is that it validates the manifest before installing.
directories_to_clean: List[str] = []
directories_to_clean: list[str] = []
with CleanupPathsContext(files=(), directories=directories_to_clean):
for filepath_archive in package_files:
if not subcmd_client._install_package_from_file_impl(
@@ -3773,7 +3769,7 @@ class subcmd_client:
blender_version: str,
access_token: str,
timeout_in_seconds: float,
temp_prefix_and_suffix: Tuple[str, str],
temp_prefix_and_suffix: tuple[str, str],
) -> bool:
# Validate arguments.
@@ -3817,7 +3813,7 @@ class subcmd_client:
]
# Narrow down:
json_data_pkg_info_map: Dict[str, List[Dict[str, Any]]] = {pkg_idname: [] for pkg_idname in packages}
json_data_pkg_info_map: dict[str, list[dict[str, Any]]] = {pkg_idname: [] for pkg_idname in packages}
for pkg_info in json_data_pkg_info:
json_data_pkg_info_map[pkg_info["id"]].append(pkg_info)
@@ -3832,7 +3828,7 @@ class subcmd_client:
platform_this = platform_from_this_system()
has_fatal_error = False
packages_info: List[PkgManifest_Archive] = []
packages_info: list[PkgManifest_Archive] = []
for pkg_idname, pkg_info_list in json_data_pkg_info_map.items():
if not pkg_info_list:
msglog.fatal_error("Package \"{:s}\", not found".format(pkg_idname))
@@ -3886,7 +3882,7 @@ class subcmd_client:
request_exit = False
# Ensure all cache is cleared (when `local_cache` is disabled) no matter the cause of exiting.
files_to_clean: List[str] = []
files_to_clean: list[str] = []
with CleanupPathsContext(files=files_to_clean, directories=()):
for manifest_archive in packages_info:
pkg_idname = manifest_archive.manifest.id
@@ -4013,7 +4009,7 @@ class subcmd_client:
local_dir: str,
user_dir: str,
packages: Sequence[str],
temp_prefix_and_suffix: Tuple[str, str],
temp_prefix_and_suffix: tuple[str, str],
) -> bool:
if not os.path.isdir(local_dir):
msglog.fatal_error("Missing local \"{:s}\"".format(local_dir))
@@ -4059,7 +4055,7 @@ class subcmd_client:
if has_fatal_error:
return False
files_to_clean: List[str] = []
files_to_clean: list[str] = []
with CleanupPathsContext(files=files_to_clean, directories=()):
for pkg_idname in packages_valid:
filepath_local_pkg = os.path.join(local_dir, pkg_idname)
@@ -4205,7 +4201,7 @@ class subcmd_author:
if manifest_build.paths_exclude_pattern is not None:
build_paths_exclude_pattern = PathPatternMatch(manifest_build.paths_exclude_pattern)
build_paths: List[Tuple[str, str]] = []
build_paths: list[tuple[str, str]] = []
# Manifest & wheels.
if build_paths_extra:
@@ -4223,7 +4219,7 @@ class subcmd_author:
return filepath_rel
# Use lowercase to prevent duplicates on MS-Windows.
build_paths_extra_canonical: Set[str] = set(
build_paths_extra_canonical: set[str] = set(
filepath_canonical_from_relative(f).lower()
for f in build_paths_extra
)
@@ -5130,7 +5126,7 @@ def msglog_from_args(args: argparse.Namespace) -> MessageLogger:
# Main Function
def main(
argv: Optional[List[str]] = None,
argv: Optional[list[str]] = None,
args_internal: bool = True,
args_extra_subcommands_fn: Optional[ArgsSubparseFn] = None,
prog: Optional[str] = None,

View File

@@ -17,14 +17,11 @@ import tempfile
from typing import (
Callable,
Dict,
List,
Tuple,
)
def _contents_to_filesystem(
contents: Dict[str, bytes],
contents: dict[str, bytes],
directory: str,
) -> None:
swap_slash = os.sep == "\\"
@@ -41,7 +38,7 @@ def _contents_to_filesystem(
fh.write(value)
def search_impl(directory: str, fn: Callable[[os.DirEntry[str]], bool], result: List[str]) -> None:
def search_impl(directory: str, fn: Callable[[os.DirEntry[str]], bool], result: list[str]) -> None:
for entry in os.scandir(directory):
if entry.is_dir():
search_impl(entry.path, fn, result)
@@ -49,8 +46,8 @@ def search_impl(directory: str, fn: Callable[[os.DirEntry[str]], bool], result:
result.append(entry.path)
def search(directory: str, fn: Callable[[os.DirEntry[str]], bool]) -> List[str]:
result: List[str] = []
def search(directory: str, fn: Callable[[os.DirEntry[str]], bool]) -> list[str]:
result: list[str] = []
search_impl(directory, fn, result)
return result
@@ -59,8 +56,8 @@ def generate_from_file_data(
*,
module_name: str,
version: str,
package_contents: Dict[str, bytes],
) -> Tuple[str, bytes]:
package_contents: dict[str, bytes],
) -> tuple[str, bytes]:
"""
:arg package_contents:
The package contents.
@@ -70,7 +67,7 @@ def generate_from_file_data(
Return filename & data.
"""
setup_contents: Dict[str, bytes] = {
setup_contents: dict[str, bytes] = {
"setup.py": """
from setuptools import setup
@@ -132,7 +129,7 @@ def generate_from_source(
module_name: str,
version: str,
source: str,
) -> Tuple[str, bytes]:
) -> tuple[str, bytes]:
"""
Return filename & data.
"""

View File

@@ -27,7 +27,6 @@ import tempfile
from typing import (
Any,
Sequence,
Tuple,
)
BASE_DIR = os.path.abspath(os.path.dirname(__file__))
@@ -70,7 +69,7 @@ if TEMP_DIR_REMOTE and not os.path.isdir(TEMP_DIR_REMOTE):
# -----------------------------------------------------------------------------
# Generic Functions
def command_output_from_json_0(args: Sequence[str]) -> Sequence[Tuple[str, Any]]:
def command_output_from_json_0(args: Sequence[str]) -> Sequence[tuple[str, Any]]:
result = []
for json_bytes in subprocess.check_output(
[*CMD, *args, "--output-type=JSON_0"],

View File

@@ -21,19 +21,15 @@ import zipfile
from typing import (
Any,
Sequence,
Dict,
List,
NamedTuple,
Optional,
Set,
Tuple,
Union,
)
# A tree of files.
FileTree = Dict[str, Union["FileTree", bytes]]
FileTree = dict[str, Union["FileTree", bytes]]
JSON_OutputElem = Tuple[str, Any]
JSON_OutputElem = tuple[str, Any]
# For more useful output that isn't clipped.
# pylint: disable-next=protected-access
@@ -124,7 +120,7 @@ def rmdir_contents(directory: str) -> None:
os.unlink(filepath)
def manifest_dict_from_archive(filepath: str) -> Dict[str, Any]:
def manifest_dict_from_archive(filepath: str) -> dict[str, Any]:
with zipfile.ZipFile(filepath, mode="r") as zip_fh:
manifest_data = zip_fh.read(PKG_MANIFEST_FILENAME_TOML)
manifest_dict = tomllib.loads(manifest_data.decode("utf-8"))
@@ -154,9 +150,9 @@ def my_create_package(
dirpath: str,
filename: str,
*,
metadata: Dict[str, Any],
metadata: dict[str, Any],
files: FileTree,
build_args_extra: Tuple[str, ...],
build_args_extra: tuple[str, ...],
) -> Sequence[JSON_OutputElem]:
"""
Create a package using the command line interface.
@@ -255,14 +251,14 @@ def my_generate_repo(
def command_output_filter_include(
output_json: Sequence[JSON_OutputElem],
include_types: Set[str],
include_types: set[str],
) -> Sequence[JSON_OutputElem]:
return [(a, b) for a, b in output_json if a in include_types]
def command_output_filter_exclude(
output_json: Sequence[JSON_OutputElem],
exclude_types: Set[str],
exclude_types: set[str],
) -> Sequence[JSON_OutputElem]:
return [(a, b) for a, b in output_json if a not in exclude_types]
@@ -287,7 +283,7 @@ def command_output(
def command_output_from_json_0(
args: Sequence[str],
*,
exclude_types: Optional[Set[str]] = None,
exclude_types: Optional[set[str]] = None,
expected_returncode: int = 0,
) -> Sequence[JSON_OutputElem]:
result = []
@@ -391,7 +387,7 @@ class TestCLI_Build(unittest.TestCase):
include_types={'STATUS'},
)
packages: List[Tuple[str, List[JSON_OutputElem]]] = [("", [])]
packages: list[tuple[str, list[JSON_OutputElem]]] = [("", [])]
for _, message in output_json:
if message.startswith("building: "):
assert not packages[-1][0]

View File

@@ -31,11 +31,9 @@ import unittest
from typing import (
Any,
Dict,
NamedTuple,
Optional,
Sequence,
Tuple,
)
@@ -55,7 +53,7 @@ if BLENDER_BIN is None:
raise Exception("BLENDER_BIN: environment variable not defined")
BLENDER_VERSION_STR = subprocess.check_output([BLENDER_BIN, "--version"]).split()[1].decode('ascii')
BLENDER_VERSION: Tuple[int, int, int] = tuple(int(x) for x in BLENDER_VERSION_STR.split(".")) # type: ignore
BLENDER_VERSION: tuple[int, int, int] = tuple(int(x) for x in BLENDER_VERSION_STR.split(".")) # type: ignore
assert len(BLENDER_VERSION) == 3
@@ -71,7 +69,7 @@ import python_wheel_generate # noqa: E402
# Don't import as module, instead load the class.
def execfile(filepath: str, *, name: str = "__main__") -> Dict[str, Any]:
def execfile(filepath: str, *, name: str = "__main__") -> dict[str, Any]:
global_namespace = {"__file__": filepath, "__name__": name}
with open(filepath, encoding="utf-8") as fh:
# pylint: disable-next=exec-used
@@ -105,7 +103,7 @@ USE_PAUSE_BEFORE_EXIT = False
# Generate different version numbers as strings, used for automatically creating versions
# which are known to be compatible or incompatible with the current version.
def blender_version_relative(version_offset: Tuple[int, int, int]) -> str:
def blender_version_relative(version_offset: tuple[int, int, int]) -> str:
version_new = (
BLENDER_VERSION[0] + version_offset[0],
BLENDER_VERSION[1] + version_offset[1],
@@ -149,7 +147,7 @@ def pause_until_keyboard_interrupt() -> None:
def contents_to_filesystem(
contents: Dict[str, bytes],
contents: dict[str, bytes],
directory: str,
) -> None:
swap_slash = os.sep == "\\"
@@ -173,11 +171,11 @@ def create_package(
# Optional.
wheel_params: Optional[WheelModuleParams] = None,
platforms: Optional[Tuple[str, ...]] = None,
platforms: Optional[tuple[str, ...]] = None,
blender_version_min: Optional[str] = None,
blender_version_max: Optional[str] = None,
python_script: Optional[str] = None,
file_contents: Optional[Dict[str, bytes]] = None,
file_contents: Optional[dict[str, bytes]] = None,
) -> None:
pkg_name = pkg_idname.replace("_", " ").title()
@@ -236,7 +234,7 @@ def create_package(
def run_blender(
args: Sequence[str],
force_script_and_pause: bool = False,
) -> Tuple[int, str, str]:
) -> tuple[int, str, str]:
"""
:arg force_script_and_pause:
When true, write out a shell script and wait,
@@ -244,7 +242,7 @@ def run_blender(
are removed once the test finished.
"""
assert BLENDER_BIN is not None
cmd: Tuple[str, ...] = (
cmd: tuple[str, ...] = (
BLENDER_BIN,
# Needed while extensions is experimental.
*BLENDER_ENABLE_EXTENSION_ARGS,
@@ -340,7 +338,7 @@ def run_blender_no_errors(
def run_blender_extensions(
args: Sequence[str],
force_script_and_pause: bool = False,
) -> Tuple[int, str, str]:
) -> tuple[int, str, str]:
return run_blender(("--command", "extension", *args,), force_script_and_pause=force_script_and_pause)
@@ -360,7 +358,7 @@ TEMP_DIR_LOCAL = ""
# Instead, have a test-local temporary directly which is removed when the test finishes.
TEMP_DIR_TMPDIR = ""
user_dirs: Tuple[str, ...] = (
user_dirs: tuple[str, ...] = (
"config",
"datafiles",
"extensions",
@@ -413,11 +411,11 @@ class TestWithTempBlenderUser_MixIn(unittest.TestCase):
# Optional.
pkg_filename: Optional[str] = None,
platforms: Optional[Tuple[str, ...]] = None,
platforms: Optional[tuple[str, ...]] = None,
blender_version_min: Optional[str] = None,
blender_version_max: Optional[str] = None,
python_script: Optional[str] = None,
file_contents: Optional[Dict[str, bytes]] = None,
file_contents: Optional[dict[str, bytes]] = None,
) -> None:
if pkg_filename is None:
pkg_filename = pkg_idname

View File

@@ -10,10 +10,7 @@ import os
from typing import (
Any,
Dict,
List,
Sequence,
Tuple,
Union,
)
@@ -23,7 +20,7 @@ BASE_DIR = os.path.normpath(os.path.join(CURRENT_DIR, ".."))
# Don't import as module, instead load the class.
def execfile(filepath: str, *, name: str = "__main__") -> Dict[str, Any]:
def execfile(filepath: str, *, name: str = "__main__") -> dict[str, Any]:
global_namespace = {"__file__": filepath, "__name__": name}
with open(filepath, encoding="utf-8") as fh:
# pylint: disable-next=exec-used
@@ -39,9 +36,9 @@ class TestPathMatch_MixIn:
def match_paths(
self,
expected_paths: List[Tuple[bool, str]],
expected_paths: list[tuple[bool, str]],
path_pattern: Union[Sequence[str], PathPatternMatch], # type: ignore
) -> List[Tuple[bool, str]]:
) -> list[tuple[bool, str]]:
result = []
if not isinstance(path_pattern, PathPatternMatch):
path_pattern = PathPatternMatch(path_pattern)
@@ -55,11 +52,11 @@ class TestPathMatch_MixIn:
def match_paths_for_cmp(
self,
expected_paths: List[Tuple[bool, str]],
expected_paths: list[tuple[bool, str]],
path_pattern: Union[Sequence[str], PathPatternMatch], # type: ignore
) -> Tuple[
List[Tuple[bool, str]],
List[Tuple[bool, str]],
) -> tuple[
list[tuple[bool, str]],
list[tuple[bool, str]],
]:
return self.match_paths(expected_paths, path_pattern), expected_paths

View File

@@ -21,38 +21,34 @@ import zipfile
from typing import (
Callable,
Dict,
List,
Optional,
Set,
Tuple,
)
WheelSource = Tuple[
WheelSource = tuple[
# Key - doesn't matter what this is... it's just a handle.
str,
# A list of absolute wheel file-paths.
List[str],
list[str],
]
def _read_records_csv(filepath: str) -> List[List[str]]:
def _read_records_csv(filepath: str) -> list[list[str]]:
import csv
with open(filepath, encoding="utf8", errors="surrogateescape") as fh:
return list(csv.reader(fh.read().splitlines()))
def _wheels_from_dir(dirpath: str) -> Tuple[
def _wheels_from_dir(dirpath: str) -> tuple[
# The key is:
# wheel_id
# The values are:
# Top level directories.
Dict[str, List[str]],
dict[str, list[str]],
# Unknown paths.
List[str],
list[str],
]:
result: Dict[str, List[str]] = {}
paths_unused: Set[str] = set()
result: dict[str, list[str]] = {}
paths_unused: set[str] = set()
if not os.path.exists(dirpath):
return result, list(paths_unused)
@@ -73,7 +69,7 @@ def _wheels_from_dir(dirpath: str) -> Tuple[
record_rows = _read_records_csv(filepath_record)
# Build top-level paths.
toplevel_paths_set: Set[str] = set()
toplevel_paths_set: set[str] = set()
for row in record_rows:
if not row:
continue
@@ -110,14 +106,14 @@ def _wheels_from_dir(dirpath: str) -> Tuple[
return result, paths_unused_list
def _wheel_info_dir_from_zip(filepath_wheel: str) -> Optional[Tuple[str, List[str]]]:
def _wheel_info_dir_from_zip(filepath_wheel: str) -> Optional[tuple[str, list[str]]]:
"""
Return:
- The "*-info" directory name which contains meta-data.
- The top-level path list (excluding "..").
"""
dir_info = ""
toplevel_paths: Set[str] = set()
toplevel_paths: set[str] = set()
with zipfile.ZipFile(filepath_wheel, mode="r") as zip_fh:
# This file will always exist.
@@ -228,7 +224,7 @@ WHEEL_VERSION_RE = re.compile(r"(\d+)?(?:\.(\d+))?(?:\.(\d+))")
def wheel_version_from_filename_for_cmp(
filename: str,
) -> Tuple[int, int, int, str]:
) -> tuple[int, int, int, str]:
"""
Extract the version number for comparison.
Note that this only handled the first 3 numbers,
@@ -256,13 +252,13 @@ def wheel_version_from_filename_for_cmp(
def wheel_list_deduplicate_as_skip_set(
wheel_list: List[WheelSource],
) -> Set[str]:
wheel_list: list[WheelSource],
) -> set[str]:
"""
Return all wheel paths to skip.
"""
wheels_to_skip: Set[str] = set()
all_wheels: Set[str] = {
wheels_to_skip: set[str] = set()
all_wheels: set[str] = {
filepath
for _, wheels in wheel_list
for filepath in wheels
@@ -273,7 +269,7 @@ def wheel_list_deduplicate_as_skip_set(
# Keep a map from the base name to the "best" wheel,
# the other wheels get added to `wheels_to_skip` to be ignored.
all_wheels_by_base: Dict[str, str] = {}
all_wheels_by_base: dict[str, str] = {}
for wheel in all_wheels:
wheel_filename = os.path.basename(wheel)
@@ -315,7 +311,7 @@ def apply_action(
*,
local_dir: str,
local_dir_site_packages: str,
wheel_list: List[WheelSource],
wheel_list: list[WheelSource],
remove_error_fn: Callable[[str, Exception], None],
debug: bool,
) -> None:
@@ -337,10 +333,10 @@ def apply_action(
wheels_installed, _paths_unknown = _wheels_from_dir(local_dir_site_packages)
# Wheels and their top level directories (which would be installed).
wheels_packages: Dict[str, List[str]] = {}
wheels_packages: dict[str, list[str]] = {}
# Map the wheel ID to path.
wheels_dir_info_to_filepath_map: Dict[str, str] = {}
wheels_dir_info_to_filepath_map: dict[str, str] = {}
# NOTE(@ideasman42): the wheels skip-set only de-duplicates at the level of the base-name of the wheels filename.
# So the wheel file-paths: