Cleanup: replace typing.Union/Option with logical OR for extensions
Also remove use of deprecated typing.Sequence.
This commit is contained in:
@@ -18,8 +18,6 @@ import sys
|
||||
|
||||
from typing import (
|
||||
Any,
|
||||
Optional,
|
||||
Union,
|
||||
)
|
||||
|
||||
from .bl_extension_utils import PkgManifest_Normalized
|
||||
@@ -106,7 +104,7 @@ class subcmd_utils:
|
||||
packages: list[str],
|
||||
*,
|
||||
use_local: bool,
|
||||
) -> Union[list[tuple[int, str]], str]:
|
||||
) -> list[tuple[int, str]] | str:
|
||||
# Takes a terse lists of package names and expands to repo index and name list,
|
||||
# returning an error string if any can't be resolved.
|
||||
from . import repo_cache_store_ensure
|
||||
@@ -170,11 +168,11 @@ class subcmd_utils:
|
||||
return repos_and_packages
|
||||
|
||||
@staticmethod
|
||||
def expand_package_ids_from_remote(packages: list[str]) -> Union[list[tuple[int, str]], str]:
|
||||
def expand_package_ids_from_remote(packages: list[str]) -> list[tuple[int, str]] | str:
|
||||
return subcmd_utils._expand_package_ids(packages, use_local=False)
|
||||
|
||||
@staticmethod
|
||||
def expand_package_ids_from_local(packages: list[str]) -> Union[list[tuple[int, str]], str]:
|
||||
def expand_package_ids_from_local(packages: list[str]) -> list[tuple[int, str]] | str:
|
||||
return subcmd_utils._expand_package_ids(packages, use_local=True)
|
||||
|
||||
|
||||
@@ -194,8 +192,8 @@ class subcmd_query:
|
||||
|
||||
def list_item(
|
||||
pkg_id: str,
|
||||
item_local: Optional[PkgManifest_Normalized],
|
||||
item_remote: Optional[PkgManifest_Normalized],
|
||||
item_local: PkgManifest_Normalized | None,
|
||||
item_remote: PkgManifest_Normalized | None,
|
||||
has_remote: bool,
|
||||
item_warnings: list[str],
|
||||
) -> None:
|
||||
|
||||
@@ -546,7 +546,7 @@ def pkg_manifest_params_compatible_or_error_for_this_system(
|
||||
blender_version_min, # `str`
|
||||
blender_version_max, # `str`
|
||||
platforms, # `list[str]`
|
||||
): # `Optional[str]`
|
||||
): # `str | None`
|
||||
# Return true if the parameters are compatible with this system.
|
||||
from .bl_extension_utils import (
|
||||
pkg_manifest_params_compatible_or_error,
|
||||
@@ -3106,7 +3106,7 @@ class EXTENSIONS_OT_package_install(Operator, _ExtCmdMixIn):
|
||||
*,
|
||||
context, # `bpy.types.Context`
|
||||
op_notify, # `OperatorNonBlockingSyncHelper`
|
||||
remote_url, # `Optional[str]`
|
||||
remote_url, # `str | None`
|
||||
repo_from_url_name, # `str`
|
||||
url, # `str`
|
||||
):
|
||||
@@ -3132,7 +3132,7 @@ class EXTENSIONS_OT_package_install(Operator, _ExtCmdMixIn):
|
||||
self,
|
||||
*,
|
||||
context, # `bpy.types.Context`
|
||||
remote_url, # `Optional[str]`
|
||||
remote_url, # `str | None`
|
||||
repo_from_url_name, # `str`
|
||||
url, # `str`
|
||||
):
|
||||
|
||||
@@ -1247,12 +1247,12 @@ def extension_draw_item(
|
||||
layout,
|
||||
*,
|
||||
pkg_id, # `str`
|
||||
item_local, # `Optional[PkgManifest_Normalized]`
|
||||
item_remote, # `Optional[PkgManifest_Normalized]`
|
||||
item_local, # `PkgManifest_Normalized | None`
|
||||
item_remote, # `PkgManifest_Normalized | None`
|
||||
is_enabled, # `bool`
|
||||
is_outdated, # `bool`
|
||||
show, # `bool`.
|
||||
mark, # `Optional[bool]`.
|
||||
mark, # `bool | None`.
|
||||
|
||||
# General vars.
|
||||
repo_index, # `int`
|
||||
|
||||
@@ -65,13 +65,13 @@ import tomllib
|
||||
|
||||
from typing import (
|
||||
Any,
|
||||
Callable,
|
||||
Generator,
|
||||
IO,
|
||||
Optional,
|
||||
NamedTuple,
|
||||
)
|
||||
from collections.abc import (
|
||||
Callable,
|
||||
Sequence,
|
||||
Union,
|
||||
)
|
||||
|
||||
BASE_DIR = os.path.abspath(os.path.dirname(__file__))
|
||||
@@ -161,7 +161,7 @@ else:
|
||||
return True
|
||||
|
||||
|
||||
def file_mtime_or_none(filepath: str) -> Optional[int]:
|
||||
def file_mtime_or_none(filepath: str) -> int | None:
|
||||
try:
|
||||
# For some reason `mypy` thinks this is a float.
|
||||
return int(os.stat(filepath)[stat.ST_MTIME])
|
||||
@@ -173,7 +173,7 @@ def file_mtime_or_none_with_error_fn(
|
||||
filepath: str,
|
||||
*,
|
||||
error_fn: Callable[[Exception], None],
|
||||
) -> Optional[int]:
|
||||
) -> int | None:
|
||||
try:
|
||||
# For some reason `mypy` thinks this is a float.
|
||||
return int(os.stat(filepath)[stat.ST_MTIME])
|
||||
@@ -196,7 +196,7 @@ def rmtree_with_fallback_or_error(
|
||||
*,
|
||||
remove_file: bool = True,
|
||||
remove_link: bool = True,
|
||||
) -> Optional[str]:
|
||||
) -> str | None:
|
||||
from .cli.blender_ext import rmtree_with_fallback_or_error as fn
|
||||
result = fn(
|
||||
path,
|
||||
@@ -300,7 +300,7 @@ def command_output_from_json_0(
|
||||
#
|
||||
|
||||
# pylint: disable-next=useless-return
|
||||
def repositories_validate_or_errors(repos: Sequence[str]) -> Optional[InfoItemSeq]:
|
||||
def repositories_validate_or_errors(repos: Sequence[str]) -> InfoItemSeq | None:
|
||||
_ = repos
|
||||
return None
|
||||
|
||||
@@ -711,7 +711,7 @@ def dummy_progress(
|
||||
# Public (non-command-line-wrapping) functions
|
||||
#
|
||||
|
||||
def json_from_filepath(filepath_json: str) -> Optional[dict[str, Any]]:
|
||||
def json_from_filepath(filepath_json: str) -> dict[str, Any] | None:
|
||||
if os.path.exists(filepath_json):
|
||||
with open(filepath_json, "r", encoding="utf-8") as fh:
|
||||
result = json.loads(fh.read())
|
||||
@@ -720,7 +720,7 @@ def json_from_filepath(filepath_json: str) -> Optional[dict[str, Any]]:
|
||||
return None
|
||||
|
||||
|
||||
def toml_from_filepath(filepath_json: str) -> Optional[dict[str, Any]]:
|
||||
def toml_from_filepath(filepath_json: str) -> dict[str, Any] | None:
|
||||
if os.path.exists(filepath_json):
|
||||
with open(filepath_json, "r", encoding="utf-8") as fh:
|
||||
return tomllib.loads(fh.read())
|
||||
@@ -751,7 +751,7 @@ def pkg_manifest_dict_is_valid_or_error(
|
||||
data: dict[str, Any],
|
||||
from_repo: bool,
|
||||
strict: bool,
|
||||
) -> Optional[str]:
|
||||
) -> str | None:
|
||||
# Exception! In in general `cli` shouldn't be considered a Python module,
|
||||
# it's validation function is handy to reuse.
|
||||
from .cli.blender_ext import pkg_manifest_from_dict_and_validate
|
||||
@@ -764,7 +764,7 @@ def pkg_manifest_dict_is_valid_or_error(
|
||||
|
||||
def pkg_manifest_dict_from_archive_or_error(
|
||||
filepath: str,
|
||||
) -> Union[dict[str, Any], str]:
|
||||
) -> dict[str, Any] | str:
|
||||
from .cli.blender_ext import pkg_manifest_from_archive_and_validate
|
||||
result = pkg_manifest_from_archive_and_validate(filepath, strict=False)
|
||||
if isinstance(result, str):
|
||||
@@ -846,7 +846,7 @@ class CommandBatchItem:
|
||||
|
||||
def __init__(self, fn_with_args: InfoItemCallable):
|
||||
self.fn_with_args = fn_with_args
|
||||
self.fn_iter: Optional[Generator[InfoItemSeq, bool, None]] = None
|
||||
self.fn_iter: Generator[InfoItemSeq, bool, None] | None = None
|
||||
self.status = CommandBatchItem.STATUS_NOT_YET_STARTED
|
||||
self.has_fatal_error = False
|
||||
self.has_error = False
|
||||
@@ -909,7 +909,7 @@ class CommandBatch:
|
||||
for cmd in self._batch:
|
||||
assert cmd.fn_iter is None
|
||||
cmd.fn_iter = cmd.invoke()
|
||||
request_exit: Optional[bool] = None
|
||||
request_exit: bool | None = None
|
||||
while True:
|
||||
try:
|
||||
# Request `request_exit` starts off as None, then it's a boolean.
|
||||
@@ -983,7 +983,7 @@ class CommandBatch:
|
||||
complete_count += 1
|
||||
continue
|
||||
|
||||
send_arg: Optional[bool] = self._request_exit
|
||||
send_arg: bool | None = self._request_exit
|
||||
|
||||
# First time initialization.
|
||||
if cmd.fn_iter is None:
|
||||
@@ -1102,7 +1102,7 @@ class CommandBatch:
|
||||
# Should never reach this line!
|
||||
return "Internal error, unknown state!{:s}".format(fail_text), 'ERROR'
|
||||
|
||||
def calc_status_log_or_none(self) -> Optional[list[tuple[str, str]]]:
|
||||
def calc_status_log_or_none(self) -> list[tuple[str, str]] | None:
|
||||
"""
|
||||
Return the log or None if there were no changes since the last call.
|
||||
"""
|
||||
@@ -1116,7 +1116,7 @@ class CommandBatch:
|
||||
for ty, msg in (cmd.msg_log + ([(cmd.msg_type, cmd.msg_info)] if cmd.msg_type == 'PROGRESS' else []))
|
||||
]
|
||||
|
||||
def calc_status_log_since_last_request_or_none(self) -> Optional[list[list[tuple[str, str]]]]:
|
||||
def calc_status_log_since_last_request_or_none(self) -> list[list[tuple[str, str]]] | None:
|
||||
"""
|
||||
Return a list of new errors per command or None when none are found.
|
||||
"""
|
||||
@@ -1148,7 +1148,7 @@ class PkgBlock_Normalized(NamedTuple):
|
||||
# Only for useful error messages.
|
||||
pkg_idname: str,
|
||||
error_fn: Callable[[Exception], None],
|
||||
) -> Optional["PkgBlock_Normalized"]:
|
||||
) -> "PkgBlock_Normalized | None": # NOTE: quotes can be removed from typing in Py3.12+.
|
||||
|
||||
try:
|
||||
reason = block_dict["reason"]
|
||||
@@ -1188,7 +1188,7 @@ class PkgManifest_Normalized(NamedTuple):
|
||||
archive_url: str
|
||||
|
||||
# Taken from the `blocklist`.
|
||||
block: Optional[PkgBlock_Normalized]
|
||||
block: PkgBlock_Normalized | None
|
||||
|
||||
@staticmethod
|
||||
def from_dict_with_error_fn(
|
||||
@@ -1196,9 +1196,9 @@ class PkgManifest_Normalized(NamedTuple):
|
||||
*,
|
||||
# Only for useful error messages.
|
||||
pkg_idname: str,
|
||||
pkg_block: Optional[PkgBlock_Normalized],
|
||||
pkg_block: PkgBlock_Normalized | None,
|
||||
error_fn: Callable[[Exception], None],
|
||||
) -> Optional["PkgManifest_Normalized"]:
|
||||
) -> "PkgManifest_Normalized | None":
|
||||
# NOTE: it is expected there are no errors here for typical usage.
|
||||
# Any errors here will return none with a terse message which is not intended to
|
||||
# be helpful for debugging, besides letting users/developers know there is a problem.
|
||||
@@ -1219,7 +1219,7 @@ class PkgManifest_Normalized(NamedTuple):
|
||||
|
||||
# Optional.
|
||||
field_website = manifest_dict.get("website", "")
|
||||
field_permissions: Union[list[str], dict[str, str]] = manifest_dict.get("permissions", {})
|
||||
field_permissions: list[str] | dict[str, str] = manifest_dict.get("permissions", {})
|
||||
field_tags = manifest_dict.get("tags", [])
|
||||
field_wheels = manifest_dict.get("wheels", [])
|
||||
|
||||
@@ -1318,7 +1318,7 @@ def repository_id_with_error_fn(
|
||||
*,
|
||||
repo_directory: str,
|
||||
error_fn: Callable[[Exception], None],
|
||||
) -> Optional[str]:
|
||||
) -> str | None:
|
||||
if not (pkg_idname := item.get("id", "")):
|
||||
error_fn(ValueError("{:s}: \"id\" missing".format(repo_directory)))
|
||||
return None
|
||||
@@ -1361,7 +1361,7 @@ def pkg_manifest_params_compatible_or_error(
|
||||
this_platform: tuple[int, int, int],
|
||||
this_blender_version: tuple[int, int, int],
|
||||
error_fn: Callable[[Exception], None],
|
||||
) -> Optional[str]:
|
||||
) -> str | None:
|
||||
from .cli.blender_ext import repository_filter_skip as fn
|
||||
|
||||
# Weak, create the minimum information for a manifest to be checked against.
|
||||
@@ -1501,7 +1501,7 @@ class _RepoDataSouce_ABC(metaclass=abc.ABCMeta):
|
||||
raise Exception("Caller must define")
|
||||
|
||||
@abc.abstractmethod
|
||||
def cache_data(self) -> Optional[RepoRemoteData]:
|
||||
def cache_data(self) -> RepoRemoteData | None:
|
||||
raise Exception("Caller must define")
|
||||
|
||||
# Should not be called directly use `data(..)` which supports cache.
|
||||
@@ -1510,7 +1510,7 @@ class _RepoDataSouce_ABC(metaclass=abc.ABCMeta):
|
||||
self,
|
||||
*,
|
||||
error_fn: Callable[[Exception], None],
|
||||
) -> Optional[RepoRemoteData]:
|
||||
) -> RepoRemoteData | None:
|
||||
raise Exception("Caller must define")
|
||||
|
||||
def data(
|
||||
@@ -1519,7 +1519,7 @@ class _RepoDataSouce_ABC(metaclass=abc.ABCMeta):
|
||||
cache_validate: bool,
|
||||
force: bool,
|
||||
error_fn: Callable[[Exception], None],
|
||||
) -> Optional[RepoRemoteData]:
|
||||
) -> RepoRemoteData | None:
|
||||
if not self.exists():
|
||||
self.cache_clear()
|
||||
return None
|
||||
@@ -1554,7 +1554,7 @@ class _RepoDataSouce_JSON(_RepoDataSouce_ABC):
|
||||
self._filepath: str = filepath
|
||||
self._mtime: int = 0
|
||||
self._filter_params: PkgManifest_FilterParams = filter_params
|
||||
self._data: Optional[RepoRemoteData] = None
|
||||
self._data: RepoRemoteData | None = None
|
||||
|
||||
def exists(self) -> bool:
|
||||
try:
|
||||
@@ -1577,14 +1577,14 @@ class _RepoDataSouce_JSON(_RepoDataSouce_ABC):
|
||||
self._data = None
|
||||
self._mtime = 0
|
||||
|
||||
def cache_data(self) -> Optional[RepoRemoteData]:
|
||||
def cache_data(self) -> RepoRemoteData | None:
|
||||
return self._data
|
||||
|
||||
def _data_load(
|
||||
self,
|
||||
*,
|
||||
error_fn: Callable[[Exception], None],
|
||||
) -> Optional[RepoRemoteData]:
|
||||
) -> RepoRemoteData | None:
|
||||
assert self.exists()
|
||||
|
||||
data = None
|
||||
@@ -1661,8 +1661,8 @@ class _RepoDataSouce_TOML_FILES(_RepoDataSouce_ABC):
|
||||
):
|
||||
self._directory: str = directory
|
||||
self._filter_params = filter_params
|
||||
self._mtime_for_each_package: Optional[dict[str, int]] = None
|
||||
self._data: Optional[RepoRemoteData] = None
|
||||
self._mtime_for_each_package: dict[str, int] | None = None
|
||||
self._data: RepoRemoteData | None = None
|
||||
|
||||
def exists(self) -> bool:
|
||||
try:
|
||||
@@ -1693,14 +1693,14 @@ class _RepoDataSouce_TOML_FILES(_RepoDataSouce_ABC):
|
||||
self._data = None
|
||||
self._mtime_for_each_package = None
|
||||
|
||||
def cache_data(self) -> Optional[RepoRemoteData]:
|
||||
def cache_data(self) -> RepoRemoteData | None:
|
||||
return self._data
|
||||
|
||||
def _data_load(
|
||||
self,
|
||||
*,
|
||||
error_fn: Callable[[Exception], None],
|
||||
) -> Optional[RepoRemoteData]:
|
||||
) -> RepoRemoteData | None:
|
||||
assert self.exists()
|
||||
|
||||
mtime_for_each_package = self._mtime_for_each_package_create(
|
||||
@@ -1839,8 +1839,8 @@ class _RepoCacheEntry:
|
||||
self.remote_url = remote_url
|
||||
# Manifest data per package loaded from the packages local JSON.
|
||||
# TODO(@ideasman42): use `_RepoDataSouce_ABC` for `pkg_manifest_local`.
|
||||
self._pkg_manifest_local: Optional[dict[str, PkgManifest_Normalized]] = None
|
||||
self._pkg_manifest_remote: Optional[dict[str, PkgManifest_Normalized]] = None
|
||||
self._pkg_manifest_local: dict[str, PkgManifest_Normalized] | None = None
|
||||
self._pkg_manifest_remote: dict[str, PkgManifest_Normalized] | None = None
|
||||
self._pkg_manifest_remote_data_source: _RepoDataSouce_ABC = (
|
||||
_RepoDataSouce_JSON(directory, filter_params) if remote_url else
|
||||
_RepoDataSouce_TOML_FILES(directory, filter_params)
|
||||
@@ -1854,14 +1854,14 @@ class _RepoCacheEntry:
|
||||
error_fn: Callable[[Exception], None],
|
||||
check_files: bool = False,
|
||||
ignore_missing: bool = False,
|
||||
) -> Optional[dict[str, PkgManifest_Normalized]]:
|
||||
) -> dict[str, PkgManifest_Normalized] | None:
|
||||
data = self._pkg_manifest_remote_data_source.data(
|
||||
cache_validate=check_files,
|
||||
force=False,
|
||||
error_fn=error_fn,
|
||||
)
|
||||
|
||||
pkg_manifest_remote: Optional[dict[str, PkgManifest_Normalized]] = None
|
||||
pkg_manifest_remote: dict[str, PkgManifest_Normalized] | None = None
|
||||
if data is not None:
|
||||
pkg_manifest_remote = data.pkg_manifest_map
|
||||
|
||||
@@ -1884,14 +1884,14 @@ class _RepoCacheEntry:
|
||||
*,
|
||||
error_fn: Callable[[Exception], None],
|
||||
force: bool = False,
|
||||
) -> Optional[dict[str, PkgManifest_Normalized]]:
|
||||
) -> dict[str, PkgManifest_Normalized] | None:
|
||||
data = self._pkg_manifest_remote_data_source.data(
|
||||
cache_validate=True,
|
||||
force=force,
|
||||
error_fn=error_fn,
|
||||
)
|
||||
|
||||
pkg_manifest_remote: Optional[dict[str, PkgManifest_Normalized]] = None
|
||||
pkg_manifest_remote: dict[str, PkgManifest_Normalized] | None = None
|
||||
if data is not None:
|
||||
pkg_manifest_remote = data.pkg_manifest_map
|
||||
|
||||
@@ -1905,7 +1905,7 @@ class _RepoCacheEntry:
|
||||
*,
|
||||
error_fn: Callable[[Exception], None],
|
||||
ignore_missing: bool = False,
|
||||
) -> Optional[dict[str, PkgManifest_Normalized]]:
|
||||
) -> dict[str, PkgManifest_Normalized] | None:
|
||||
# Important for local-only repositories (where the directory name defines the ID).
|
||||
has_remote = self.remote_url != ""
|
||||
|
||||
@@ -1967,7 +1967,7 @@ class _RepoCacheEntry:
|
||||
*,
|
||||
error_fn: Callable[[Exception], None],
|
||||
ignore_missing: bool = False,
|
||||
) -> Optional[dict[str, PkgManifest_Normalized]]:
|
||||
) -> dict[str, PkgManifest_Normalized] | None:
|
||||
if self._pkg_manifest_remote is None:
|
||||
self._json_data_ensure(
|
||||
ignore_missing=ignore_missing,
|
||||
@@ -2024,7 +2024,7 @@ class RepoCacheStore:
|
||||
*,
|
||||
error_fn: Callable[[Exception], None],
|
||||
force: bool = False,
|
||||
) -> Optional[dict[str, PkgManifest_Normalized]]:
|
||||
) -> dict[str, PkgManifest_Normalized] | None:
|
||||
for repo_entry in self._repos:
|
||||
if directory == repo_entry.directory:
|
||||
# pylint: disable-next=protected-access
|
||||
@@ -2037,7 +2037,7 @@ class RepoCacheStore:
|
||||
*,
|
||||
error_fn: Callable[[Exception], None],
|
||||
ignore_missing: bool = False,
|
||||
) -> Optional[dict[str, PkgManifest_Normalized]]:
|
||||
) -> dict[str, PkgManifest_Normalized] | None:
|
||||
for repo_entry in self._repos:
|
||||
if directory == repo_entry.directory:
|
||||
# Force refresh.
|
||||
@@ -2054,8 +2054,8 @@ class RepoCacheStore:
|
||||
error_fn: Callable[[Exception], None],
|
||||
check_files: bool = False,
|
||||
ignore_missing: bool = False,
|
||||
directory_subset: Optional[set[str]] = None,
|
||||
) -> Generator[Optional[dict[str, PkgManifest_Normalized]], None, None]:
|
||||
directory_subset: set[str] | None = None,
|
||||
) -> Generator[dict[str, PkgManifest_Normalized] | None, None, None]:
|
||||
for repo_entry in self._repos:
|
||||
if directory_subset is not None:
|
||||
if repo_entry.directory not in directory_subset:
|
||||
@@ -2079,8 +2079,8 @@ class RepoCacheStore:
|
||||
error_fn: Callable[[Exception], None],
|
||||
check_files: bool = False,
|
||||
ignore_missing: bool = False,
|
||||
directory_subset: Optional[set[str]] = None,
|
||||
) -> Generator[Optional[dict[str, PkgManifest_Normalized]], None, None]:
|
||||
directory_subset: set[str] | None = None,
|
||||
) -> Generator[dict[str, PkgManifest_Normalized] | None, None, None]:
|
||||
for repo_entry in self._repos:
|
||||
if directory_subset is not None:
|
||||
if repo_entry.directory not in directory_subset:
|
||||
@@ -2142,7 +2142,7 @@ class RepoLock:
|
||||
sys.stderr.write("{:s}: freed without releasing lock!".format(type(self).__name__))
|
||||
|
||||
@staticmethod
|
||||
def _is_locked_with_stale_cookie_removal(local_lock_file: str, cookie: str) -> Optional[str]:
|
||||
def _is_locked_with_stale_cookie_removal(local_lock_file: str, cookie: str) -> str | None:
|
||||
if os.path.exists(local_lock_file):
|
||||
try:
|
||||
with open(local_lock_file, "r", encoding="utf8") as fh:
|
||||
@@ -2163,7 +2163,7 @@ class RepoLock:
|
||||
return "lock file could not be removed ({:s})".format(str(ex))
|
||||
return None
|
||||
|
||||
def acquire(self) -> dict[str, Optional[str]]:
|
||||
def acquire(self) -> dict[str, str | None]:
|
||||
"""
|
||||
Return directories and the lock status,
|
||||
with None if locking succeeded.
|
||||
@@ -2174,7 +2174,7 @@ class RepoLock:
|
||||
raise Exception("acquire(): cookie doesn't exist! (when it should)")
|
||||
|
||||
# Assume all succeed.
|
||||
result: dict[str, Optional[str]] = {directory: None for directory in self._repo_directories}
|
||||
result: dict[str, str | None] = {directory: None for directory in self._repo_directories}
|
||||
for directory in self._repo_directories:
|
||||
local_private_dir = os.path.join(directory, REPO_LOCAL_PRIVATE_DIR)
|
||||
|
||||
@@ -2209,12 +2209,12 @@ class RepoLock:
|
||||
self._held = True
|
||||
return result
|
||||
|
||||
def release(self) -> dict[str, Optional[str]]:
|
||||
def release(self) -> dict[str, str | None]:
|
||||
# NOTE: lots of error checks here, mostly to give insights in the very unlikely case this fails.
|
||||
if not self._held:
|
||||
raise Exception("release(): called without a lock!")
|
||||
|
||||
result: dict[str, Optional[str]] = {directory: None for directory in self._repo_directories}
|
||||
result: dict[str, str | None] = {directory: None for directory in self._repo_directories}
|
||||
for directory, local_lock_file in self._repo_lock_files:
|
||||
if not os.path.exists(local_lock_file):
|
||||
result[directory] = "release(): lock missing when expected, continuing."
|
||||
@@ -2248,7 +2248,7 @@ class RepoLockContext:
|
||||
def __init__(self, *, repo_directories: Sequence[str], cookie: str):
|
||||
self._repo_lock = RepoLock(repo_directories=repo_directories, cookie=cookie)
|
||||
|
||||
def __enter__(self) -> dict[str, Optional[str]]:
|
||||
def __enter__(self) -> dict[str, str | None]:
|
||||
return self._repo_lock.acquire()
|
||||
|
||||
def __exit__(self, _ty: Any, _value: Any, _traceback: Any) -> None:
|
||||
@@ -2262,7 +2262,7 @@ class RepoLockContext:
|
||||
def repo_lock_directory_query(
|
||||
directory: str,
|
||||
cookie: str,
|
||||
) -> Optional[tuple[bool, float, str]]:
|
||||
) -> tuple[bool, float, str] | None:
|
||||
local_lock_file = os.path.join(directory, REPO_LOCAL_PRIVATE_DIR, REPO_LOCAL_PRIVATE_LOCK)
|
||||
|
||||
cookie_is_ours = False
|
||||
@@ -2292,7 +2292,7 @@ def repo_lock_directory_query(
|
||||
|
||||
def repo_lock_directory_force_unlock(
|
||||
directory: str,
|
||||
) -> Optional[str]:
|
||||
) -> str | None:
|
||||
local_lock_file = os.path.join(directory, REPO_LOCAL_PRIVATE_DIR, REPO_LOCAL_PRIVATE_LOCK)
|
||||
try:
|
||||
os.remove(local_lock_file)
|
||||
|
||||
@@ -29,11 +29,11 @@ from typing import (
|
||||
Any,
|
||||
Generator,
|
||||
IO,
|
||||
Optional,
|
||||
Sequence,
|
||||
Callable,
|
||||
NamedTuple,
|
||||
Union,
|
||||
)
|
||||
from collections.abc import (
|
||||
Callable,
|
||||
Sequence,
|
||||
)
|
||||
|
||||
ArgsSubparseFn = Callable[["argparse._SubParsersAction[argparse.ArgumentParser]"], None]
|
||||
@@ -51,8 +51,8 @@ def signal_handler_sigint(_sig: int, _frame: Any) -> None:
|
||||
|
||||
|
||||
# A primitive type that can be communicated via message passing.
|
||||
PrimType = Union[int, str]
|
||||
PrimTypeOrSeq = Union[PrimType, Sequence[PrimType]]
|
||||
PrimType = int | str
|
||||
PrimTypeOrSeq = PrimType | Sequence[PrimType]
|
||||
|
||||
MessageFn = Callable[[str, PrimTypeOrSeq], bool]
|
||||
|
||||
@@ -280,7 +280,7 @@ def size_as_fmt_string(num: float, *, precision: int = 1) -> str:
|
||||
return "{:.{:d}f}{:s}".format(num, precision, unit)
|
||||
|
||||
|
||||
def read_with_timeout(fh: IO[bytes], size: int, *, timeout_in_seconds: float) -> Optional[bytes]:
|
||||
def read_with_timeout(fh: IO[bytes], size: int, *, timeout_in_seconds: float) -> bytes | None:
|
||||
# TODO: implement timeout (TimeoutError).
|
||||
_ = timeout_in_seconds
|
||||
return fh.read(size)
|
||||
@@ -328,8 +328,8 @@ class PkgRepoData(NamedTuple):
|
||||
|
||||
class PkgManifest_Build(NamedTuple):
|
||||
"""Package Build Information (for the "build" sub-command)."""
|
||||
paths: Optional[list[str]]
|
||||
paths_exclude_pattern: Optional[list[str]]
|
||||
paths: list[str] | None
|
||||
paths_exclude_pattern: list[str] | None
|
||||
|
||||
@staticmethod
|
||||
def _from_dict_impl(
|
||||
@@ -337,7 +337,7 @@ class PkgManifest_Build(NamedTuple):
|
||||
*,
|
||||
extra_paths: Sequence[str],
|
||||
all_errors: bool,
|
||||
) -> Union["PkgManifest_Build", list[str]]:
|
||||
) -> "PkgManifest_Build | list[str]": # NOTE: quotes can be removed from typing in Py3.12+.
|
||||
# TODO: generalize the type checks, see: `pkg_manifest_is_valid_or_error_impl`.
|
||||
error_list = []
|
||||
if value := manifest_build_dict.get("paths"):
|
||||
@@ -379,7 +379,7 @@ class PkgManifest_Build(NamedTuple):
|
||||
def from_dict_all_errors(
|
||||
manifest_build_dict: dict[str, Any],
|
||||
extra_paths: Sequence[str],
|
||||
) -> Union["PkgManifest_Build", list[str]]:
|
||||
) -> "PkgManifest_Build | list[str]": # NOTE: quotes can be removed from typing in Py3.12+.
|
||||
return PkgManifest_Build._from_dict_impl(
|
||||
manifest_build_dict,
|
||||
extra_paths=extra_paths,
|
||||
@@ -400,13 +400,13 @@ class PkgManifest(NamedTuple):
|
||||
blender_version_min: str
|
||||
|
||||
# Optional (set all defaults).
|
||||
blender_version_max: Optional[str] = None
|
||||
website: Optional[str] = None
|
||||
copyright: Optional[list[str]] = None
|
||||
permissions: Optional[list[str]] = None
|
||||
tags: Optional[list[str]] = None
|
||||
platforms: Optional[list[str]] = None
|
||||
wheels: Optional[list[str]] = None
|
||||
blender_version_max: str | None = None
|
||||
website: str | None = None
|
||||
copyright: list[str] | None = None
|
||||
permissions: list[str] | None = None
|
||||
tags: list[str] | None = None
|
||||
platforms: list[str] | None = None
|
||||
wheels: list[str] | None = None
|
||||
|
||||
|
||||
class PkgManifest_Archive(NamedTuple):
|
||||
@@ -457,7 +457,7 @@ def path_from_url(path: str) -> str:
|
||||
return result
|
||||
|
||||
|
||||
def random_acii_lines(*, seed: Union[int, str], width: int) -> Generator[str, None, None]:
|
||||
def random_acii_lines(*, seed: int | str, width: int) -> Generator[str, None, None]:
|
||||
"""
|
||||
Generate random ASCII text [A-Za-z0-9].
|
||||
Intended not to compress well, it's possible to simulate downloading a large package.
|
||||
@@ -481,7 +481,7 @@ def sha256_from_file_or_error(
|
||||
filepath: str,
|
||||
block_size: int = 1 << 20,
|
||||
hash_prefix: bool = False,
|
||||
) -> Union[tuple[int, str], str]:
|
||||
) -> tuple[int, str] | str:
|
||||
"""
|
||||
Returns an arbitrary sized unique ASCII string based on the file contents.
|
||||
(exact hashing method may change).
|
||||
@@ -549,7 +549,7 @@ def rmtree_with_fallback_or_error(
|
||||
*,
|
||||
remove_file: bool = True,
|
||||
remove_link: bool = True,
|
||||
) -> Optional[str]:
|
||||
) -> str | None:
|
||||
"""
|
||||
Remove a directory, with optional fallbacks to removing files & links.
|
||||
Use this when a directory is expected, but there is the possibility
|
||||
@@ -618,7 +618,7 @@ def rmtree_with_fallback_or_error_pseudo_atomic(
|
||||
temp_prefix_and_suffix: tuple[str, str],
|
||||
remove_file: bool = True,
|
||||
remove_link: bool = True,
|
||||
) -> Optional[str]:
|
||||
) -> str | None:
|
||||
|
||||
# It's possible the directory doesn't exist, only attempt a rename if it does.
|
||||
try:
|
||||
@@ -733,7 +733,7 @@ def pkg_manifest_from_dict_and_validate_impl(
|
||||
from_repo: bool,
|
||||
all_errors: bool,
|
||||
strict: bool,
|
||||
) -> Union[PkgManifest, list[str]]:
|
||||
) -> PkgManifest | list[str]:
|
||||
error_list = []
|
||||
# Validate the dictionary.
|
||||
if all_errors:
|
||||
@@ -767,7 +767,7 @@ def pkg_manifest_from_dict_and_validate(
|
||||
data: dict[Any, Any],
|
||||
from_repo: bool,
|
||||
strict: bool,
|
||||
) -> Union[PkgManifest, str]:
|
||||
) -> PkgManifest | str:
|
||||
manifest = pkg_manifest_from_dict_and_validate_impl(data, from_repo=from_repo, all_errors=False, strict=strict)
|
||||
if isinstance(manifest, list):
|
||||
return manifest[0]
|
||||
@@ -778,7 +778,7 @@ def pkg_manifest_from_dict_and_validate_all_errros(
|
||||
data: dict[Any, Any],
|
||||
from_repo: bool,
|
||||
strict: bool,
|
||||
) -> Union[PkgManifest, list[str]]:
|
||||
) -> PkgManifest | list[str]:
|
||||
"""
|
||||
Validate the manifest and return all errors.
|
||||
"""
|
||||
@@ -788,7 +788,7 @@ def pkg_manifest_from_dict_and_validate_all_errros(
|
||||
def pkg_manifest_archive_from_dict_and_validate(
|
||||
data: dict[Any, Any],
|
||||
strict: bool,
|
||||
) -> Union[PkgManifest_Archive, str]:
|
||||
) -> PkgManifest_Archive | str:
|
||||
manifest = pkg_manifest_from_dict_and_validate(data, from_repo=True, strict=strict)
|
||||
if isinstance(manifest, str):
|
||||
return manifest
|
||||
@@ -807,7 +807,7 @@ def pkg_manifest_archive_from_dict_and_validate(
|
||||
def pkg_manifest_from_toml_and_validate_all_errors(
|
||||
filepath: str,
|
||||
strict: bool,
|
||||
) -> Union[PkgManifest, list[str]]:
|
||||
) -> PkgManifest | list[str]:
|
||||
"""
|
||||
This function is responsible for not letting invalid manifest from creating packages with ID names
|
||||
or versions that would not properly install.
|
||||
@@ -825,7 +825,7 @@ def pkg_manifest_from_toml_and_validate_all_errors(
|
||||
|
||||
def pkg_zipfile_detect_subdir_or_none(
|
||||
zip_fh: zipfile.ZipFile,
|
||||
) -> Optional[str]:
|
||||
) -> str | None:
|
||||
if PKG_MANIFEST_FILENAME_TOML in zip_fh.NameToInfo:
|
||||
return ""
|
||||
# Support one directory containing the expected TOML.
|
||||
@@ -858,7 +858,7 @@ def pkg_manifest_from_zipfile_and_validate_impl(
|
||||
archive_subdir: str,
|
||||
all_errors: bool,
|
||||
strict: bool,
|
||||
) -> Union[PkgManifest, list[str]]:
|
||||
) -> PkgManifest | list[str]:
|
||||
"""
|
||||
Validate the manifest and return all errors.
|
||||
"""
|
||||
@@ -894,7 +894,7 @@ def pkg_manifest_from_zipfile_and_validate(
|
||||
zip_fh: zipfile.ZipFile,
|
||||
archive_subdir: str,
|
||||
strict: bool,
|
||||
) -> Union[PkgManifest, str]:
|
||||
) -> PkgManifest | str:
|
||||
manifest = pkg_manifest_from_zipfile_and_validate_impl(
|
||||
zip_fh,
|
||||
archive_subdir,
|
||||
@@ -910,7 +910,7 @@ def pkg_manifest_from_zipfile_and_validate_all_errors(
|
||||
zip_fh: zipfile.ZipFile,
|
||||
archive_subdir: str,
|
||||
strict: bool,
|
||||
) -> Union[PkgManifest, list[str]]:
|
||||
) -> PkgManifest | list[str]:
|
||||
return pkg_manifest_from_zipfile_and_validate_impl(
|
||||
zip_fh,
|
||||
archive_subdir,
|
||||
@@ -922,7 +922,7 @@ def pkg_manifest_from_zipfile_and_validate_all_errors(
|
||||
def pkg_manifest_from_archive_and_validate(
|
||||
filepath: str,
|
||||
strict: bool,
|
||||
) -> Union[PkgManifest, str]:
|
||||
) -> PkgManifest | str:
|
||||
try:
|
||||
# pylint: disable-next=consider-using-with
|
||||
zip_fh_context = zipfile.ZipFile(filepath, mode="r")
|
||||
@@ -937,7 +937,7 @@ def pkg_manifest_from_archive_and_validate(
|
||||
|
||||
def pkg_server_repo_config_from_toml_and_validate(
|
||||
filepath: str,
|
||||
) -> Union[PkgServerRepoConfig, str]:
|
||||
) -> PkgServerRepoConfig | str:
|
||||
|
||||
if isinstance(result := toml_from_filepath_or_error(filepath), str):
|
||||
return result
|
||||
@@ -1027,7 +1027,7 @@ def remote_url_params_strip(url: str) -> str:
|
||||
return new_url
|
||||
|
||||
|
||||
def remote_url_validate_or_error(url: str) -> Optional[str]:
|
||||
def remote_url_validate_or_error(url: str) -> str | None:
|
||||
if url_has_known_prefix(url):
|
||||
return None
|
||||
return "remote URL doesn't begin with a known prefix: {:s}".format(" ".join(URL_KNOWN_PREFIX))
|
||||
@@ -1286,7 +1286,7 @@ class PathPatternMatch:
|
||||
def url_retrieve_to_data_iter(
|
||||
url: str,
|
||||
*,
|
||||
data: Optional[Any] = None,
|
||||
data: Any | None = None,
|
||||
headers: dict[str, str],
|
||||
chunk_size: int,
|
||||
timeout_in_seconds: float,
|
||||
@@ -1355,7 +1355,7 @@ def url_retrieve_to_filepath_iter(
|
||||
filepath: str,
|
||||
*,
|
||||
headers: dict[str, str],
|
||||
data: Optional[Any] = None,
|
||||
data: Any | None = None,
|
||||
chunk_size: int,
|
||||
timeout_in_seconds: float,
|
||||
) -> Generator[tuple[int, int, Any], None, None]:
|
||||
@@ -1444,7 +1444,7 @@ def url_retrieve_to_filepath_iter_or_filesystem(
|
||||
|
||||
|
||||
def url_retrieve_exception_is_connectivity(
|
||||
ex: Union[Exception, KeyboardInterrupt],
|
||||
ex: Exception | KeyboardInterrupt,
|
||||
) -> bool:
|
||||
if isinstance(ex, FileNotFoundError):
|
||||
return True
|
||||
@@ -1458,7 +1458,7 @@ def url_retrieve_exception_is_connectivity(
|
||||
|
||||
|
||||
def url_retrieve_exception_as_message(
|
||||
ex: Union[Exception, KeyboardInterrupt],
|
||||
ex: Exception | KeyboardInterrupt,
|
||||
*,
|
||||
prefix: str,
|
||||
url: str,
|
||||
@@ -1482,7 +1482,7 @@ def url_retrieve_exception_as_message(
|
||||
return "{:s}: unexpected error ({:s}) reading {!r}!".format(prefix, str(ex), url_strip)
|
||||
|
||||
|
||||
def pkg_idname_is_valid_or_error(pkg_idname: str) -> Optional[str]:
|
||||
def pkg_idname_is_valid_or_error(pkg_idname: str) -> str | None:
|
||||
if not pkg_idname.isidentifier():
|
||||
return "Not a valid identifier"
|
||||
if "__" in pkg_idname:
|
||||
@@ -1494,7 +1494,7 @@ def pkg_idname_is_valid_or_error(pkg_idname: str) -> Optional[str]:
|
||||
return None
|
||||
|
||||
|
||||
def pkg_manifest_validate_terse_description_or_error(value: str) -> Optional[str]:
|
||||
def pkg_manifest_validate_terse_description_or_error(value: str) -> str | None:
|
||||
# Could be an argument.
|
||||
length_limit = TERSE_DESCRIPTION_MAX_LENGTH
|
||||
if (length_limit != -1) and (len(value) > length_limit):
|
||||
@@ -1519,7 +1519,7 @@ def pkg_manifest_validate_terse_description_or_error(value: str) -> Optional[str
|
||||
|
||||
def pkg_manifest_tags_load_valid_map_from_python(
|
||||
valid_tags_filepath: str,
|
||||
) -> Union[str, dict[str, set[str]]]:
|
||||
) -> str | dict[str, set[str]]:
|
||||
try:
|
||||
data = execfile(valid_tags_filepath)
|
||||
except Exception as ex:
|
||||
@@ -1542,7 +1542,7 @@ def pkg_manifest_tags_load_valid_map_from_python(
|
||||
|
||||
def pkg_manifest_tags_load_valid_map_from_json(
|
||||
valid_tags_filepath: str,
|
||||
) -> Union[str, dict[str, set[str]]]:
|
||||
) -> str | dict[str, set[str]]:
|
||||
try:
|
||||
with open(valid_tags_filepath, "rb") as fh:
|
||||
data = json.load(fh)
|
||||
@@ -1569,7 +1569,7 @@ def pkg_manifest_tags_load_valid_map_from_json(
|
||||
|
||||
def pkg_manifest_tags_load_valid_map(
|
||||
valid_tags_filepath: str,
|
||||
) -> Union[str, dict[str, set[str]]]:
|
||||
) -> str | dict[str, set[str]]:
|
||||
# Allow Python data (Blender stores this internally).
|
||||
if valid_tags_filepath.endswith(".py"):
|
||||
return pkg_manifest_tags_load_valid_map_from_python(valid_tags_filepath)
|
||||
@@ -1580,7 +1580,7 @@ def pkg_manifest_tags_valid_or_error(
|
||||
valid_tags_data: dict[str, Any],
|
||||
manifest_type: str,
|
||||
manifest_tags: list[str],
|
||||
) -> Optional[str]:
|
||||
) -> str | None:
|
||||
valid_tags = valid_tags_data[manifest_type]
|
||||
for tag in manifest_tags:
|
||||
if tag not in valid_tags:
|
||||
@@ -1607,7 +1607,7 @@ def pkg_manifest_tags_valid_or_error(
|
||||
def pkg_manifest_validate_field_nop(
|
||||
value: Any,
|
||||
strict: bool,
|
||||
) -> Optional[str]:
|
||||
) -> str | None:
|
||||
_ = strict, value
|
||||
return None
|
||||
|
||||
@@ -1615,7 +1615,7 @@ def pkg_manifest_validate_field_nop(
|
||||
def pkg_manifest_validate_field_any_non_empty_string(
|
||||
value: str,
|
||||
strict: bool,
|
||||
) -> Optional[str]:
|
||||
) -> str | None:
|
||||
_ = strict
|
||||
if not value.strip():
|
||||
return "A non-empty string expected"
|
||||
@@ -1625,7 +1625,7 @@ def pkg_manifest_validate_field_any_non_empty_string(
|
||||
def pkg_manifest_validate_field_any_non_empty_string_stripped_no_control_chars(
|
||||
value: str,
|
||||
strict: bool,
|
||||
) -> Optional[str]:
|
||||
) -> str | None:
|
||||
_ = strict
|
||||
value_strip = value.strip()
|
||||
if not value_strip:
|
||||
@@ -1637,7 +1637,7 @@ def pkg_manifest_validate_field_any_non_empty_string_stripped_no_control_chars(
|
||||
return None
|
||||
|
||||
|
||||
def pkg_manifest_validate_field_any_list_of_non_empty_strings(value: list[Any], strict: bool) -> Optional[str]:
|
||||
def pkg_manifest_validate_field_any_list_of_non_empty_strings(value: list[Any], strict: bool) -> str | None:
|
||||
_ = strict
|
||||
for i, tag in enumerate(value):
|
||||
if not isinstance(tag, str):
|
||||
@@ -1650,7 +1650,7 @@ def pkg_manifest_validate_field_any_list_of_non_empty_strings(value: list[Any],
|
||||
def pkg_manifest_validate_field_any_non_empty_list_of_non_empty_strings(
|
||||
value: list[Any],
|
||||
strict: bool,
|
||||
) -> Optional[str]:
|
||||
) -> str | None:
|
||||
if not value:
|
||||
return "list may not be empty"
|
||||
|
||||
@@ -1660,7 +1660,7 @@ def pkg_manifest_validate_field_any_non_empty_list_of_non_empty_strings(
|
||||
def pkg_manifest_validate_field_any_version(
|
||||
value: str,
|
||||
strict: bool,
|
||||
) -> Optional[str]:
|
||||
) -> str | None:
|
||||
_ = strict
|
||||
if not RE_MANIFEST_SEMVER.match(value):
|
||||
return "to be a semantic-version, found {!r}".format(value)
|
||||
@@ -1670,7 +1670,7 @@ def pkg_manifest_validate_field_any_version(
|
||||
def pkg_manifest_validate_field_any_version_primitive(
|
||||
value: str,
|
||||
strict: bool,
|
||||
) -> Optional[str]:
|
||||
) -> str | None:
|
||||
_ = strict
|
||||
# Parse simple `1.2.3`, `1.2` & `1` numbers.
|
||||
for number in value.split("."):
|
||||
@@ -1682,7 +1682,7 @@ def pkg_manifest_validate_field_any_version_primitive(
|
||||
def pkg_manifest_validate_field_any_version_primitive_or_empty(
|
||||
value: str,
|
||||
strict: bool,
|
||||
) -> Optional[str]:
|
||||
) -> str | None:
|
||||
if value:
|
||||
return pkg_manifest_validate_field_any_version_primitive(value, strict)
|
||||
return None
|
||||
@@ -1691,12 +1691,12 @@ def pkg_manifest_validate_field_any_version_primitive_or_empty(
|
||||
# Manifest Validation (Specific Callbacks)
|
||||
|
||||
|
||||
def pkg_manifest_validate_field_idname(value: str, strict: bool) -> Optional[str]:
|
||||
def pkg_manifest_validate_field_idname(value: str, strict: bool) -> str | None:
|
||||
_ = strict
|
||||
return pkg_idname_is_valid_or_error(value)
|
||||
|
||||
|
||||
def pkg_manifest_validate_field_type(value: str, strict: bool) -> Optional[str]:
|
||||
def pkg_manifest_validate_field_type(value: str, strict: bool) -> str | None:
|
||||
_ = strict
|
||||
# NOTE: add "keymap" in the future.
|
||||
value_expected = {"add-on", "theme"}
|
||||
@@ -1708,7 +1708,7 @@ def pkg_manifest_validate_field_type(value: str, strict: bool) -> Optional[str]:
|
||||
def pkg_manifest_validate_field_blender_version(
|
||||
value: str,
|
||||
strict: bool,
|
||||
) -> Optional[str]:
|
||||
) -> str | None:
|
||||
if (error := pkg_manifest_validate_field_any_version_primitive(value, strict)) is not None:
|
||||
return error
|
||||
|
||||
@@ -1724,14 +1724,14 @@ def pkg_manifest_validate_field_blender_version(
|
||||
def pkg_manifest_validate_field_blender_version_or_empty(
|
||||
value: str,
|
||||
strict: bool,
|
||||
) -> Optional[str]:
|
||||
) -> str | None:
|
||||
if value:
|
||||
return pkg_manifest_validate_field_blender_version(value, strict)
|
||||
|
||||
return None
|
||||
|
||||
|
||||
def pkg_manifest_validate_field_tagline(value: str, strict: bool) -> Optional[str]:
|
||||
def pkg_manifest_validate_field_tagline(value: str, strict: bool) -> str | None:
|
||||
if strict:
|
||||
return pkg_manifest_validate_terse_description_or_error(value)
|
||||
else:
|
||||
@@ -1744,7 +1744,7 @@ def pkg_manifest_validate_field_tagline(value: str, strict: bool) -> Optional[st
|
||||
def pkg_manifest_validate_field_copyright(
|
||||
value: list[str],
|
||||
strict: bool,
|
||||
) -> Optional[str]:
|
||||
) -> str | None:
|
||||
if strict:
|
||||
for i, copyrignt_text in enumerate(value):
|
||||
if not isinstance(copyrignt_text, str):
|
||||
@@ -1769,14 +1769,14 @@ def pkg_manifest_validate_field_copyright(
|
||||
|
||||
|
||||
def pkg_manifest_validate_field_permissions(
|
||||
value: Union[
|
||||
value: (
|
||||
# `dict[str, str]` is expected but at this point it's only guaranteed to be a dict.
|
||||
dict[Any, Any],
|
||||
dict[Any, Any] |
|
||||
# Kept for old files.
|
||||
list[Any],
|
||||
],
|
||||
list[Any]
|
||||
),
|
||||
strict: bool,
|
||||
) -> Optional[str]:
|
||||
) -> str | None:
|
||||
|
||||
keys_valid = {
|
||||
"files",
|
||||
@@ -1824,7 +1824,7 @@ def pkg_manifest_validate_field_permissions(
|
||||
return None
|
||||
|
||||
|
||||
def pkg_manifest_validate_field_build_path_list(value: list[Any], strict: bool) -> Optional[str]:
|
||||
def pkg_manifest_validate_field_build_path_list(value: list[Any], strict: bool) -> str | None:
|
||||
_ = strict
|
||||
value_duplicate_check: set[str] = set()
|
||||
|
||||
@@ -1868,7 +1868,7 @@ def pkg_manifest_validate_field_build_path_list(value: list[Any], strict: bool)
|
||||
def pkg_manifest_validate_field_wheels(
|
||||
value: list[Any],
|
||||
strict: bool,
|
||||
) -> Optional[str]:
|
||||
) -> str | None:
|
||||
if (error := pkg_manifest_validate_field_any_list_of_non_empty_strings(value, strict)) is not None:
|
||||
return error
|
||||
# Enforce naming spec:
|
||||
@@ -1902,7 +1902,7 @@ def pkg_manifest_validate_field_wheels(
|
||||
def pkg_manifest_validate_field_archive_size(
|
||||
value: int,
|
||||
strict: bool,
|
||||
) -> Optional[str]:
|
||||
) -> str | None:
|
||||
_ = strict
|
||||
if value <= 0:
|
||||
return "to be a positive integer, found {!r}".format(value)
|
||||
@@ -1912,7 +1912,7 @@ def pkg_manifest_validate_field_archive_size(
|
||||
def pkg_manifest_validate_field_archive_hash(
|
||||
value: str,
|
||||
strict: bool,
|
||||
) -> Optional[str]:
|
||||
) -> str | None:
|
||||
_ = strict
|
||||
import string
|
||||
# Expect: `sha256:{HASH}`.
|
||||
@@ -1932,7 +1932,11 @@ def pkg_manifest_validate_field_archive_hash(
|
||||
# Keep in sync with `PkgManifest`.
|
||||
# key, type, check_fn.
|
||||
pkg_manifest_known_keys_and_types: tuple[
|
||||
tuple[str, Union[type, tuple[type, ...]], Callable[[Any, bool], Optional[str]]],
|
||||
tuple[
|
||||
str,
|
||||
type | tuple[type, ...],
|
||||
Callable[[Any, bool], str | None],
|
||||
],
|
||||
...,
|
||||
] = (
|
||||
("id", str, pkg_manifest_validate_field_idname),
|
||||
@@ -1958,7 +1962,7 @@ pkg_manifest_known_keys_and_types: tuple[
|
||||
|
||||
# Keep in sync with `PkgManifest_Archive`.
|
||||
pkg_manifest_known_keys_and_types_from_repo: tuple[
|
||||
tuple[str, type, Callable[[Any, bool], Optional[str]]],
|
||||
tuple[str, type, Callable[[Any, bool], str | None]],
|
||||
...,
|
||||
] = (
|
||||
("archive_size", int, pkg_manifest_validate_field_archive_size),
|
||||
@@ -1976,7 +1980,7 @@ def pkg_manifest_is_valid_or_error_impl(
|
||||
from_repo: bool,
|
||||
all_errors: bool,
|
||||
strict: bool,
|
||||
) -> Optional[list[str]]:
|
||||
) -> list[str] | None:
|
||||
if not isinstance(data, dict):
|
||||
return ["Expected value to be a dict, not a {!r}".format(type(data))]
|
||||
|
||||
@@ -1986,7 +1990,7 @@ def pkg_manifest_is_valid_or_error_impl(
|
||||
|
||||
error_list = []
|
||||
|
||||
value_extract: dict[str, Optional[object]] = {}
|
||||
value_extract: dict[str, object | None] = {}
|
||||
for known_types in (
|
||||
(pkg_manifest_known_keys_and_types, pkg_manifest_known_keys_and_types_from_repo) if from_repo else
|
||||
(pkg_manifest_known_keys_and_types, )
|
||||
@@ -2047,7 +2051,7 @@ def pkg_manifest_is_valid_or_error(
|
||||
*,
|
||||
from_repo: bool,
|
||||
strict: bool,
|
||||
) -> Optional[str]:
|
||||
) -> str | None:
|
||||
error_list = pkg_manifest_is_valid_or_error_impl(
|
||||
data,
|
||||
from_repo=from_repo,
|
||||
@@ -2064,7 +2068,7 @@ def pkg_manifest_is_valid_or_error_all(
|
||||
*,
|
||||
from_repo: bool,
|
||||
strict: bool,
|
||||
) -> Optional[list[str]]:
|
||||
) -> list[str] | None:
|
||||
return pkg_manifest_is_valid_or_error_impl(
|
||||
data,
|
||||
from_repo=from_repo,
|
||||
@@ -2259,7 +2263,7 @@ def repository_filter_skip(
|
||||
filter_blender_version: tuple[int, int, int],
|
||||
filter_platform: str,
|
||||
# When `skip_message_fn` is set, returning true must call the `skip_message_fn` function.
|
||||
skip_message_fn: Optional[Callable[[str], None]],
|
||||
skip_message_fn: Callable[[str], None] | None,
|
||||
error_fn: Callable[[Exception], None],
|
||||
) -> bool:
|
||||
if (platforms := item.get("platforms")) is not None:
|
||||
@@ -2322,7 +2326,7 @@ def repository_filter_skip(
|
||||
return False
|
||||
|
||||
|
||||
def blender_version_parse_or_error(version: str) -> Union[tuple[int, int, int], str]:
|
||||
def blender_version_parse_or_error(version: str) -> tuple[int, int, int] | str:
|
||||
try:
|
||||
version_tuple: tuple[int, ...] = tuple(int(x) for x in version.split("."))
|
||||
except Exception as ex:
|
||||
@@ -2338,7 +2342,7 @@ def blender_version_parse_or_error(version: str) -> Union[tuple[int, int, int],
|
||||
)
|
||||
|
||||
|
||||
def blender_version_parse_any_or_error(version: Any) -> Union[tuple[int, int, int], str]:
|
||||
def blender_version_parse_any_or_error(version: Any) -> tuple[int, int, int] | str:
|
||||
if not isinstance(version, str):
|
||||
return "blender version should be a string, found a: {:s}".format(str(type(version)))
|
||||
|
||||
@@ -2363,7 +2367,7 @@ def url_request_headers_create(*, accept_json: bool, user_agent: str, access_tok
|
||||
return headers
|
||||
|
||||
|
||||
def repo_json_is_valid_or_error(filepath: str) -> Optional[str]:
|
||||
def repo_json_is_valid_or_error(filepath: str) -> str | None:
|
||||
if not os.path.exists(filepath):
|
||||
return "File missing: " + filepath
|
||||
|
||||
@@ -2413,7 +2417,7 @@ def repo_json_is_valid_or_error(filepath: str) -> Optional[str]:
|
||||
return None
|
||||
|
||||
|
||||
def pkg_manifest_toml_is_valid_or_error(filepath: str, strict: bool) -> tuple[Optional[str], dict[str, Any]]:
|
||||
def pkg_manifest_toml_is_valid_or_error(filepath: str, strict: bool) -> tuple[str | None, dict[str, Any]]:
|
||||
if not os.path.exists(filepath):
|
||||
return "File missing: " + filepath, {}
|
||||
|
||||
@@ -2429,7 +2433,7 @@ def pkg_manifest_toml_is_valid_or_error(filepath: str, strict: bool) -> tuple[Op
|
||||
return None, result
|
||||
|
||||
|
||||
def pkg_manifest_detect_duplicates(pkg_items: list[PkgManifest]) -> Optional[str]:
|
||||
def pkg_manifest_detect_duplicates(pkg_items: list[PkgManifest]) -> str | None:
|
||||
"""
|
||||
When a repository includes multiple packages with the same ID, ensure they don't conflict.
|
||||
|
||||
@@ -2444,7 +2448,7 @@ def pkg_manifest_detect_duplicates(pkg_items: list[PkgManifest]) -> Optional[str
|
||||
dummy_verion_min = 0, 0, 0
|
||||
dummy_verion_max = 1000, 0, 0
|
||||
|
||||
def parse_version_or_default(version: Optional[str], default: tuple[int, int, int]) -> tuple[int, int, int]:
|
||||
def parse_version_or_default(version: str | None, default: tuple[int, int, int]) -> tuple[int, int, int]:
|
||||
if version is None:
|
||||
return default
|
||||
if isinstance(version_parsed := blender_version_parse_or_error(version), str):
|
||||
@@ -2530,7 +2534,7 @@ def pkg_manifest_detect_duplicates(pkg_items: list[PkgManifest]) -> Optional[str
|
||||
return None
|
||||
|
||||
|
||||
def toml_from_bytes_or_error(data: bytes) -> Union[dict[str, Any], str]:
|
||||
def toml_from_bytes_or_error(data: bytes) -> dict[str, Any] | str:
|
||||
try:
|
||||
result = tomllib.loads(data.decode('utf-8'))
|
||||
assert isinstance(result, dict)
|
||||
@@ -2539,7 +2543,7 @@ def toml_from_bytes_or_error(data: bytes) -> Union[dict[str, Any], str]:
|
||||
return str(ex)
|
||||
|
||||
|
||||
def toml_from_filepath_or_error(filepath: str) -> Union[dict[str, Any], str]:
|
||||
def toml_from_filepath_or_error(filepath: str) -> dict[str, Any] | str:
|
||||
try:
|
||||
with open(filepath, "rb") as fh:
|
||||
data = fh.read()
|
||||
@@ -2560,7 +2564,7 @@ def repo_local_private_dir_ensure(
|
||||
*,
|
||||
local_dir: str,
|
||||
error_fn: Callable[[Exception], None],
|
||||
) -> Optional[str]:
|
||||
) -> str | None:
|
||||
"""
|
||||
Ensure the repos hidden directory exists.
|
||||
"""
|
||||
@@ -2580,7 +2584,7 @@ def repo_local_private_dir_ensure_with_subdir(
|
||||
local_dir: str,
|
||||
subdir: str,
|
||||
error_fn: Callable[[Exception], None],
|
||||
) -> Optional[str]:
|
||||
) -> str | None:
|
||||
"""
|
||||
Return a local directory used to cache package downloads.
|
||||
"""
|
||||
@@ -2706,7 +2710,7 @@ def repo_sync_from_remote(
|
||||
return True
|
||||
|
||||
|
||||
def repo_pkginfo_from_local_as_dict_or_error(*, local_dir: str) -> Union[dict[str, Any], str]:
|
||||
def repo_pkginfo_from_local_as_dict_or_error(*, local_dir: str) -> dict[str, Any] | str:
|
||||
"""
|
||||
Load package cache.
|
||||
"""
|
||||
@@ -2726,7 +2730,7 @@ def repo_pkginfo_from_local_as_dict_or_error(*, local_dir: str) -> Union[dict[st
|
||||
return result
|
||||
|
||||
|
||||
def pkg_repo_data_from_json_or_error(json_data: dict[str, Any]) -> Union[PkgRepoData, str]:
|
||||
def pkg_repo_data_from_json_or_error(json_data: dict[str, Any]) -> PkgRepoData | str:
|
||||
if not isinstance((version := json_data.get("version", "v1")), str):
|
||||
return "expected \"version\" to be a string"
|
||||
|
||||
@@ -2750,7 +2754,7 @@ def pkg_repo_data_from_json_or_error(json_data: dict[str, Any]) -> Union[PkgRepo
|
||||
return result_new
|
||||
|
||||
|
||||
def repo_pkginfo_from_local_or_none(*, local_dir: str) -> Union[PkgRepoData, str]:
|
||||
def repo_pkginfo_from_local_or_none(*, local_dir: str) -> PkgRepoData | str:
|
||||
if isinstance((result := repo_pkginfo_from_local_as_dict_or_error(local_dir=local_dir)), str):
|
||||
return result
|
||||
return pkg_repo_data_from_json_or_error(result)
|
||||
@@ -3606,7 +3610,7 @@ class subcmd_client:
|
||||
local_dir: str,
|
||||
filepath_archive: str,
|
||||
blender_version_tuple: tuple[int, int, int],
|
||||
manifest_compare: Optional[PkgManifest],
|
||||
manifest_compare: PkgManifest | None,
|
||||
temp_prefix_and_suffix: tuple[str, str],
|
||||
) -> bool:
|
||||
# NOTE: Don't use `FATAL_ERROR` because other packages will attempt to install.
|
||||
@@ -4197,7 +4201,7 @@ class subcmd_author:
|
||||
|
||||
del manifest_build_data, manifest_data
|
||||
|
||||
build_paths_exclude_pattern: Optional[PathPatternMatch] = None
|
||||
build_paths_exclude_pattern: PathPatternMatch | None = None
|
||||
if manifest_build.paths_exclude_pattern is not None:
|
||||
build_paths_exclude_pattern = PathPatternMatch(manifest_build.paths_exclude_pattern)
|
||||
|
||||
@@ -4327,7 +4331,7 @@ class subcmd_author:
|
||||
with contextlib.closing(zip_fh_context) as zip_fh:
|
||||
for filepath_abs, filepath_rel in build_paths_for_platform:
|
||||
|
||||
zip_data_override: Optional[bytes] = None
|
||||
zip_data_override: bytes | None = None
|
||||
if platform and (filepath_rel == PKG_MANIFEST_FILENAME_TOML):
|
||||
zip_data_override = b"".join((
|
||||
b"\n",
|
||||
@@ -5024,8 +5028,8 @@ def argparse_create_dummy_progress(subparsers: "argparse._SubParsersAction[argpa
|
||||
|
||||
def argparse_create(
|
||||
args_internal: bool = True,
|
||||
args_extra_subcommands_fn: Optional[ArgsSubparseFn] = None,
|
||||
prog: Optional[str] = None,
|
||||
args_extra_subcommands_fn: ArgsSubparseFn | None = None,
|
||||
prog: str | None = None,
|
||||
) -> argparse.ArgumentParser:
|
||||
|
||||
parser = argparse.ArgumentParser(
|
||||
@@ -5126,10 +5130,10 @@ def msglog_from_args(args: argparse.Namespace) -> MessageLogger:
|
||||
# Main Function
|
||||
|
||||
def main(
|
||||
argv: Optional[list[str]] = None,
|
||||
argv: list[str] | None = None,
|
||||
args_internal: bool = True,
|
||||
args_extra_subcommands_fn: Optional[ArgsSubparseFn] = None,
|
||||
prog: Optional[str] = None,
|
||||
args_extra_subcommands_fn: ArgsSubparseFn | None = None,
|
||||
prog: str | None = None,
|
||||
) -> int:
|
||||
# NOTE: only manipulate Python's run-time such as encoding & SIGINT when running stand-alone.
|
||||
|
||||
|
||||
@@ -15,7 +15,7 @@ import subprocess
|
||||
import sys
|
||||
import tempfile
|
||||
|
||||
from typing import (
|
||||
from collections.abc import (
|
||||
Callable,
|
||||
)
|
||||
|
||||
|
||||
@@ -26,6 +26,8 @@ import tempfile
|
||||
|
||||
from typing import (
|
||||
Any,
|
||||
)
|
||||
from collections.abc import (
|
||||
Sequence,
|
||||
)
|
||||
|
||||
|
||||
@@ -20,14 +20,14 @@ import zipfile
|
||||
|
||||
from typing import (
|
||||
Any,
|
||||
Sequence,
|
||||
NamedTuple,
|
||||
Optional,
|
||||
Union,
|
||||
)
|
||||
from collections.abc import (
|
||||
Sequence,
|
||||
)
|
||||
|
||||
# A tree of files.
|
||||
FileTree = dict[str, Union["FileTree", bytes]]
|
||||
FileTree = dict[str, "FileTree | bytes"]
|
||||
|
||||
JSON_OutputElem = tuple[str, Any]
|
||||
|
||||
@@ -283,7 +283,7 @@ def command_output(
|
||||
def command_output_from_json_0(
|
||||
args: Sequence[str],
|
||||
*,
|
||||
exclude_types: Optional[set[str]] = None,
|
||||
exclude_types: set[str] | None = None,
|
||||
expected_returncode: int = 0,
|
||||
) -> Sequence[JSON_OutputElem]:
|
||||
result = []
|
||||
|
||||
@@ -32,7 +32,8 @@ import unittest
|
||||
from typing import (
|
||||
Any,
|
||||
NamedTuple,
|
||||
Optional,
|
||||
)
|
||||
from collections.abc import (
|
||||
Sequence,
|
||||
)
|
||||
|
||||
@@ -170,12 +171,12 @@ def create_package(
|
||||
pkg_idname: str,
|
||||
|
||||
# Optional.
|
||||
wheel_params: Optional[WheelModuleParams] = None,
|
||||
platforms: Optional[tuple[str, ...]] = None,
|
||||
blender_version_min: Optional[str] = None,
|
||||
blender_version_max: Optional[str] = None,
|
||||
python_script: Optional[str] = None,
|
||||
file_contents: Optional[dict[str, bytes]] = None,
|
||||
wheel_params: WheelModuleParams | None = None,
|
||||
platforms: tuple[str, ...] | None = None,
|
||||
blender_version_min: str | None = None,
|
||||
blender_version_max: str | None = None,
|
||||
python_script: str | None = None,
|
||||
file_contents: dict[str, bytes] | None = None,
|
||||
) -> None:
|
||||
pkg_name = pkg_idname.replace("_", " ").title()
|
||||
|
||||
@@ -407,15 +408,15 @@ class TestWithTempBlenderUser_MixIn(unittest.TestCase):
|
||||
self,
|
||||
*,
|
||||
pkg_idname: str,
|
||||
wheel_params: Optional[WheelModuleParams] = None,
|
||||
wheel_params: WheelModuleParams | None = None,
|
||||
|
||||
# Optional.
|
||||
pkg_filename: Optional[str] = None,
|
||||
platforms: Optional[tuple[str, ...]] = None,
|
||||
blender_version_min: Optional[str] = None,
|
||||
blender_version_max: Optional[str] = None,
|
||||
python_script: Optional[str] = None,
|
||||
file_contents: Optional[dict[str, bytes]] = None,
|
||||
pkg_filename: str | None = None,
|
||||
platforms: tuple[str, ...] | None = None,
|
||||
blender_version_min: str | None = None,
|
||||
blender_version_max: str | None = None,
|
||||
python_script: str | None = None,
|
||||
file_contents: dict[str, bytes] | None = None,
|
||||
) -> None:
|
||||
if pkg_filename is None:
|
||||
pkg_filename = pkg_idname
|
||||
|
||||
@@ -10,8 +10,9 @@ import os
|
||||
|
||||
from typing import (
|
||||
Any,
|
||||
)
|
||||
from collections.abc import (
|
||||
Sequence,
|
||||
Union,
|
||||
)
|
||||
|
||||
|
||||
@@ -37,7 +38,7 @@ class TestPathMatch_MixIn:
|
||||
def match_paths(
|
||||
self,
|
||||
expected_paths: list[tuple[bool, str]],
|
||||
path_pattern: Union[Sequence[str], PathPatternMatch], # type: ignore
|
||||
path_pattern: Sequence[str] | PathPatternMatch, # type: ignore
|
||||
) -> list[tuple[bool, str]]:
|
||||
result = []
|
||||
if not isinstance(path_pattern, PathPatternMatch):
|
||||
@@ -53,7 +54,7 @@ class TestPathMatch_MixIn:
|
||||
def match_paths_for_cmp(
|
||||
self,
|
||||
expected_paths: list[tuple[bool, str]],
|
||||
path_pattern: Union[Sequence[str], PathPatternMatch], # type: ignore
|
||||
path_pattern: Sequence[str] | PathPatternMatch, # type: ignore
|
||||
) -> tuple[
|
||||
list[tuple[bool, str]],
|
||||
list[tuple[bool, str]],
|
||||
|
||||
@@ -19,9 +19,8 @@ import shutil
|
||||
import sys
|
||||
import zipfile
|
||||
|
||||
from typing import (
|
||||
from collections.abc import (
|
||||
Callable,
|
||||
Optional,
|
||||
)
|
||||
|
||||
WheelSource = tuple[
|
||||
@@ -106,7 +105,7 @@ def _wheels_from_dir(dirpath: str) -> tuple[
|
||||
return result, paths_unused_list
|
||||
|
||||
|
||||
def _wheel_info_dir_from_zip(filepath_wheel: str) -> Optional[tuple[str, list[str]]]:
|
||||
def _wheel_info_dir_from_zip(filepath_wheel: str) -> tuple[str, list[str]] | None:
|
||||
"""
|
||||
Return:
|
||||
- The "*-info" directory name which contains meta-data.
|
||||
@@ -141,7 +140,7 @@ def _wheel_info_dir_from_zip(filepath_wheel: str) -> Optional[tuple[str, list[st
|
||||
return dir_info, toplevel_paths_list
|
||||
|
||||
|
||||
def _rmtree_safe(dir_remove: str, expected_root: str) -> Optional[Exception]:
|
||||
def _rmtree_safe(dir_remove: str, expected_root: str) -> Exception | None:
|
||||
if not dir_remove.startswith(expected_root):
|
||||
raise Exception("Expected prefix not found")
|
||||
|
||||
@@ -165,7 +164,7 @@ def _rmtree_safe(dir_remove: str, expected_root: str) -> Optional[Exception]:
|
||||
return ex_result
|
||||
|
||||
|
||||
def _remove_safe(file_remove: str) -> Optional[Exception]:
|
||||
def _remove_safe(file_remove: str) -> Exception | None:
|
||||
ex_result = None
|
||||
|
||||
try:
|
||||
@@ -389,7 +388,7 @@ def apply_action(
|
||||
if debug:
|
||||
print("removing wheel:", filepath_rel)
|
||||
|
||||
ex: Optional[Exception] = None
|
||||
ex: Exception | None = None
|
||||
if os.path.isdir(filepath_abs):
|
||||
ex = _rmtree_safe(filepath_abs, local_dir)
|
||||
# For symbolic-links, use remove as a fallback.
|
||||
|
||||
Reference in New Issue
Block a user