Extensions: basic validation of remote repository data

Prevent non-compliant data in remote repositories from causing errors
in Blender's interface.

Move from a dictionary to a named-tuple which uses normalized values.
This commit is contained in:
Campbell Barton
2024-06-12 15:07:58 +10:00
parent 820fc124d4
commit c99a40b286
5 changed files with 220 additions and 70 deletions

View File

@@ -171,7 +171,7 @@ def repo_stats_calc_outdated_for_repo_directory(repo_directory):
if item_remote is None:
continue
if item_remote["version"] != item_local["version"]:
if item_remote.version != item_local.version:
package_count += 1
return package_count
@@ -537,7 +537,7 @@ def theme_preset_draw(menu, context):
repo_item = repos_all[i]
directory = repo_item.directory
for pkg_idname, value in pkg_manifest_local.items():
if value["type"] != "theme":
if value.type != "theme":
continue
theme_dir, theme_files = pkg_theme_file_list(directory, pkg_idname)

View File

@@ -16,9 +16,10 @@ import argparse
import os
import sys
from .bl_extension_utils import PkgManifest_Normalized
from typing import (
Any,
Dict,
List,
Optional,
Tuple,
@@ -189,19 +190,19 @@ class subcmd_query:
def list_item(
pkg_id: str,
item_remote: Optional[Dict[str, Any]],
item_local: Optional[Dict[str, Any]],
item_remote: Optional[PkgManifest_Normalized],
item_local: Optional[PkgManifest_Normalized],
) -> None:
# Both can't be None.
assert item_remote is not None or item_local is not None
if item_remote is not None:
item_version = item_remote["version"]
item_version = item_remote.version
if item_local is None:
item_local_version = None
is_outdated = False
else:
item_local_version = item_local["version"]
item_local_version = item_local.version
is_outdated = item_local_version != item_version
if item_local is not None:
@@ -218,15 +219,15 @@ class subcmd_query:
else:
# All local-only packages are installed.
status_info = " [{:s}]".format(colorize("installed", "green"))
assert isinstance(item_local, dict)
assert isinstance(item_local, PkgManifest_Normalized)
item = item_local
print(
" {:s}{:s}: \"{:s}\", {:s}".format(
colorize(pkg_id, "bold"),
status_info,
item["name"],
colorize(item.get("tagline", "<no tagline>"), "faint"),
item.name,
colorize(item.tagline or "<no tagline>", "faint"),
))
if sync:

View File

@@ -172,7 +172,7 @@ def extension_url_find_repo_index_and_pkg_id(url):
if not remote_url:
continue
for pkg_id, item_remote in pkg_manifest_remote.items():
archive_url = item_remote["archive_url"]
archive_url = item_remote.archive_url
archive_url_basename = archive_url.rpartition("/")[2]
# First compare the filenames, if this matches, check the full URL.
if url_basename != archive_url_basename:
@@ -222,7 +222,7 @@ def pkg_info_check_exclude_filter_ex(name, tagline, search_lower):
def pkg_info_check_exclude_filter(item, search_lower):
return pkg_info_check_exclude_filter_ex(item["name"], item["tagline"], search_lower)
return pkg_info_check_exclude_filter_ex(item.name, item.tagline, search_lower)
def extension_theme_enable_filepath(filepath):
@@ -443,14 +443,14 @@ def _preferences_install_post_enable_on_install(
print("Package should have been installed but not found:", pkg_id)
return
if item_local["type"] == "add-on":
if item_local.type == "add-on":
# Check if the add-on will have been enabled from re-installing.
if pkg_id in pkg_id_sequence_upgrade:
continue
addon_module_name = "bl_ext.{:s}.{:s}".format(repo_item.module, pkg_id)
addon_utils.enable(addon_module_name, default_set=True, handle_error=handle_error)
elif item_local["type"] == "theme":
elif item_local.type == "theme":
if has_theme:
continue
extension_theme_enable(directory, pkg_id)
@@ -616,7 +616,7 @@ def _pkg_marked_by_repo(pkg_manifest_all):
item = pkg_manifest.get(pkg_id)
if item is None:
continue
if filter_by_type and (filter_by_type != item["type"]):
if filter_by_type and (filter_by_type != item.type):
continue
if search_lower and not pkg_info_check_exclude_filter(item, search_lower):
continue
@@ -710,10 +710,8 @@ def _extensions_repo_sync_wheels(repo_cache_store):
repo_directory = repo.directory
for pkg_id, item_local in pkg_manifest_local.items():
pkg_dirpath = os.path.join(repo_directory, pkg_id)
wheels_rel = item_local.get("wheels", None)
if wheels_rel is None:
continue
if not isinstance(wheels_rel, list):
wheels_rel = item_local.wheels
if not wheels_rel:
continue
# Filter only the wheels for this platform.
@@ -1407,7 +1405,7 @@ class EXTENSIONS_OT_package_upgrade_all(Operator, _ExtCmdMixIn):
# Not installed.
continue
if item_remote["version"] != item_local["version"]:
if item_remote.version != item_local.version:
packages_to_upgrade[repo_index].append(pkg_id)
package_count += 1
@@ -1545,7 +1543,9 @@ class EXTENSIONS_OT_package_install_marked(Operator, _ExtCmdMixIn):
pkg_manifest_remote = pkg_manifest_remote_all[repo_index]
pkg_id_sequence_addon_only = [
pkg_id for pkg_id in pkg_id_sequence if pkg_manifest_remote[pkg_id]["type"] == "add-on"]
pkg_id for pkg_id in pkg_id_sequence
if pkg_manifest_remote[pkg_id].type == "add-on"
]
if pkg_id_sequence_addon_only:
self._repo_map_packages_addon_only.append((repo_item.directory, pkg_id_sequence_addon_only))
@@ -2268,17 +2268,17 @@ class EXTENSIONS_OT_package_install(Operator, _ExtCmdMixIn):
_repo_index, repo_name, pkg_id, item_remote = self._drop_variables
layout.label(text="Do you want to install the following {:s}?".format(item_remote["type"]))
layout.label(text="Do you want to install the following {:s}?".format(item_remote.type))
col = layout.column(align=True)
col.label(text="Name: {:s}".format(item_remote["name"]))
col.label(text="Name: {:s}".format(item_remote.name))
col.label(text="Repository: {:s}".format(repo_name))
col.label(text="Size: {:s}".format(size_as_fmt_string(item_remote["archive_size"], precision=0)))
col.label(text="Size: {:s}".format(size_as_fmt_string(item_remote.archive_size, precision=0)))
del col
layout.separator()
layout.prop(self, "enable_on_install", text=rna_prop_enable_on_install_type_map[item_remote["type"]])
layout.prop(self, "enable_on_install", text=rna_prop_enable_on_install_type_map[item_remote.type])
@staticmethod
def _do_legacy_replace(pkg_id, pkg_manifest_local):

View File

@@ -48,26 +48,6 @@ def sizes_as_percentage_string(size_partial: int, size_final: int) -> str:
return "{:-6.2f}%".format(percent * 100)
def license_info_to_text(license_list):
# See: https://spdx.org/licenses/
# - Note that we could include all, for now only common, GPL compatible licenses.
# - Note that many of the human descriptions are not especially more humanly readable
# than the short versions, so it's questionable if we should attempt to add all of these.
_spdx_id_to_text = {
"GPL-2.0-only": "GNU General Public License v2.0 only",
"GPL-2.0-or-later": "GNU General Public License v2.0 or later",
"GPL-3.0-only": "GNU General Public License v3.0 only",
"GPL-3.0-or-later": "GNU General Public License v3.0 or later",
}
result = []
for item in license_list:
if item.startswith("SPDX:"):
item = item[5:]
item = _spdx_id_to_text.get(item, item)
result.append(item)
return ", ".join(result)
def pkg_repo_and_id_from_theme_path(repos_all, filepath):
import os
if not filepath:
@@ -642,7 +622,7 @@ def extensions_panel_draw_impl(
is_system_repo = repos_all[repo_index].source == 'SYSTEM'
for pkg_id, item_remote in pkg_manifest_remote.items():
if filter_by_type and (filter_by_type != item_remote["type"]):
if filter_by_type and (filter_by_type != item_remote.type):
continue
if search_lower and (not pkg_info_check_exclude_filter(item_remote, search_lower)):
continue
@@ -654,7 +634,7 @@ def extensions_panel_draw_impl(
continue
if extension_tags:
if tags := item_remote.get("tags"):
if tags := item_remote.tags:
if not any(True for t in tags if extension_tags.get(t, True)):
continue
else:
@@ -663,7 +643,7 @@ def extensions_panel_draw_impl(
is_addon = False
is_theme = False
match item_remote["type"]:
match item_remote.type:
case "add-on":
is_addon = True
case "theme":
@@ -689,12 +669,12 @@ def extensions_panel_draw_impl(
if enabled_only and (not is_enabled):
continue
item_version = item_remote["version"]
item_version = item_remote.version
if item_local is None:
item_local_version = None
is_outdated = False
else:
item_local_version = item_local["version"]
item_local_version = item_local.version
is_outdated = item_local_version != item_version
if updates_only:
@@ -763,7 +743,7 @@ def extensions_panel_draw_impl(
sub = row.row()
sub.active = is_enabled
sub.label(text=item_remote["name"], translate=False)
sub.label(text=item_remote.name, translate=False)
del sub
row_right = row.row()
@@ -797,9 +777,9 @@ def extensions_panel_draw_impl(
col_b = split.column()
# The full tagline may be multiple lines (not yet supported by Blender's UI).
col_a.label(text="{:s}.".format(item_remote["tagline"]), translate=False)
col_a.label(text="{:s}.".format(item_remote.tagline), translate=False)
if value := item_remote.get("website"):
if value := item_remote.website:
# Use half size button, for legacy add-ons there are two, here there is one
# however one large button looks silly, so use a half size still.
col_a.split(factor=0.5).operator(
@@ -837,16 +817,14 @@ def extensions_panel_draw_impl(
# WARNING: while this is documented to be a dict, old packages may contain a list of strings.
# As it happens dictionary keys & list values both iterate over string,
# however we will want to show the dictionary values eventually.
if (value := item_remote.get("permissions")):
if (value := item_remote.permissions):
col_b.label(text=", ".join([iface_(x.title()) for x in value]), translate=False)
else:
col_b.label(text="No permissions specified")
del value
# Remove the maintainers email while it's not private, showing prominently
# could cause maintainers to get direct emails instead of issue tracking systems.
col_a.label(text="Maintainer")
col_b.label(text=item_remote["maintainer"].split("<", 1)[0].rstrip(), translate=False)
col_b.label(text=item_remote.maintainer, translate=False)
col_a.label(text="Version")
if is_outdated:
@@ -859,10 +837,10 @@ def extensions_panel_draw_impl(
if has_remote:
col_a.label(text="Size")
col_b.label(text=size_as_fmt_string(item_remote["archive_size"]), translate=False)
col_b.label(text=size_as_fmt_string(item_remote.archive_size), translate=False)
col_a.label(text="License")
col_b.label(text=license_info_to_text(item_remote["license"]), translate=False)
col_b.label(text=item_remote.license, translate=False)
if len(repos_all) > 1:
col_a.label(text="Repository")
@@ -1137,9 +1115,9 @@ def tags_current(wm):
if pkg_manifest_remote is None:
continue
for item_remote in pkg_manifest_remote.values():
if filter_by_type != item_remote["type"]:
if filter_by_type != item_remote.type:
continue
if pkg_tags := item_remote.get("tags"):
if pkg_tags := item_remote.tags:
tags.update(pkg_tags)
if filter_by_type == "add-on":

View File

@@ -310,6 +310,26 @@ def repository_iter_package_dirs(
yield entry
def license_info_to_text(license_list: Sequence[str]) -> str:
# See: https://spdx.org/licenses/
# - Note that we could include all, for now only common, GPL compatible licenses.
# - Note that many of the human descriptions are not especially more humanly readable
# than the short versions, so it's questionable if we should attempt to add all of these.
_spdx_id_to_text = {
"GPL-2.0-only": "GNU General Public License v2.0 only",
"GPL-2.0-or-later": "GNU General Public License v2.0 or later",
"GPL-3.0-only": "GNU General Public License v3.0 only",
"GPL-3.0-or-later": "GNU General Public License v3.0 or later",
}
result = []
for item in license_list:
if item.startswith("SPDX:"):
item = item[5:]
item = _spdx_id_to_text.get(item, item)
result.append(item)
return ", ".join(result)
# -----------------------------------------------------------------------------
# Public Stand-Alone Utilities
#
@@ -997,6 +1017,146 @@ class CommandBatch:
# Internal Repo Data Source
#
# See similar named tuple: `bl_pkg.cli.blender_ext.PkgManifest`.
# This type is loaded from an external source and had it's valued parsed into a known "normalized" state.
# Some transformation is performed to the purpose of displaying in the UI although this type isn't specifically for UI.
class PkgManifest_Normalized(NamedTuple):
# Intentionally excluded:
# - `id`: The caller must know the ID and is typically stored as part of a dictionary
# where the `id` is the key and `PkgManifest_Normalized` is the value.
# - `schema_version`: any versioning should be handled as part of normalization.
# - `blender_version_max`: this is used to exclude packages, part of filtering before inclusion.
name: str
tagline: str
version: str
type: str
maintainer: str
license: str
# Optional.
website: str
permissions: Dict[str, str]
tags: Tuple[str]
wheels: Tuple[str]
# Remote.
archive_size: int
archive_url: str
@staticmethod
def from_dict_with_error_fn(
manifest_dict: Dict[str, Any],
*,
# Only for useful error messages.
pkg_idname: str,
error_fn: Callable[[Exception], None],
) -> Optional["PkgManifest_Normalized"]:
# NOTE: it is expected there are no errors here for typical usage.
# Any errors here will return none with a terse message which is not intended to
# be helpful for debugging, besides letting users/developers know there is a problem.
#
# This is done because it's expected the data from repositories is valid and
# anyone developing packages runs the "validate" function before publishing for others to use.
#
# Checks here are mainly to prevent corrupt/invalid repositories or TOML files
# from breaking Blender's internal functionality.
try:
name = manifest_dict["name"]
tagline = manifest_dict["tagline"]
version = manifest_dict["version"]
type = manifest_dict["type"]
maintainer = manifest_dict["maintainer"]
license = manifest_dict["license"]
# Optional.
website = manifest_dict.get("website", "")
permissions: Union[List[str], Dict[str, str]] = manifest_dict.get("permissions", {})
tags = manifest_dict.get("tags", [])
wheels = manifest_dict.get("wheels", [])
# Remote only (not found in TOML files).
archive_size = manifest_dict.get("archive_size", 0)
archive_url = manifest_dict.get("archive_url", "")
except KeyError as ex:
error_fn(KeyError("{:s}: missing key {:s}".format(pkg_idname, str(ex))))
return None
# This is an old (now unsupported) format, convert into a dictionary.
if isinstance(permissions, list):
permissions = {key: "Undefined" for key in permissions}
try:
if not (isinstance(name, str) and name):
raise TypeError("{:s}: \"name\" must be a non-empty string".format(pkg_idname))
if not isinstance(tagline, str):
raise TypeError("{:s}: \"tagline\" must be a string".format(pkg_idname))
if not (isinstance(version, str) and version):
raise TypeError("{:s}: \"version\" must be a non-empty string".format(pkg_idname))
if not (isinstance(type, str) and type):
raise TypeError("{:s}: \"type\" must be a non-empty string".format(pkg_idname))
if not (isinstance(maintainer, str) and maintainer):
raise TypeError("{:s}: \"maintainer\" must be a non-empty string".format(pkg_idname))
if not (
isinstance(license, list) and
license and
(not any(1 for x in license if not isinstance(x, str)))
):
raise TypeError("{:s}: \"license\" must be a non-empty list of strings".format(pkg_idname))
# Optional.
if not isinstance(website, str):
raise TypeError("{:s}: \"website\" must be a string".format(pkg_idname))
if not (
isinstance(permissions, dict) and
(not any(1 for k, v in permissions.items() if not (isinstance(k, str) and isinstance(v, str))))
):
raise TypeError("{:s}: \"permissions\" must be a non-empty list of strings".format(pkg_idname))
if not (isinstance(tags, list) and (not any(1 for x in tags if not isinstance(x, str)))):
raise TypeError("{:s}: \"tags\" must be a non-empty list of strings".format(pkg_idname))
if not (isinstance(wheels, list) and (not any(1 for x in wheels if not isinstance(x, str)))):
raise TypeError("{:s}: \"wheels\" must be a non-empty list of strings".format(pkg_idname))
# Remote only.
if not isinstance(archive_size, int):
raise TypeError("{:s}: \"archive_size\" must be an int".format(pkg_idname))
if not isinstance(archive_url, str):
raise TypeError("{:s}: \"archive_url\" must a string".format(pkg_idname))
except TypeError as ex:
error_fn(ex)
return None
return PkgManifest_Normalized(
name=name,
tagline=tagline,
version=version,
type=type,
# Remove the maintainers email while it's not private, showing prominently
# could cause maintainers to get direct emails instead of issue tracking systems.
maintainer=maintainer.split("<", 1)[0].rstrip(),
license=license_info_to_text(license),
# Optional.
website=website,
permissions=permissions,
tags=tuple(tags),
wheels=tuple(wheels),
archive_size=archive_size,
archive_url=archive_url,
)
class RepoRemoteData(NamedTuple):
version: str
@@ -1317,7 +1477,7 @@ class _RepoCacheEntry:
self.directory = directory
self.remote_url = remote_url
# Manifest data per package loaded from the packages local JSON.
self._pkg_manifest_local: Optional[Dict[str, Dict[str, Any]]] = None
self._pkg_manifest_local: Optional[Dict[str, PkgManifest_Normalized]] = None
self._pkg_manifest_remote: Optional[RepoRemoteData] = None
self._pkg_manifest_remote_data_source: _RepoDataSouce_ABC = (
_RepoDataSouce_JSON(directory) if remote_url else
@@ -1369,7 +1529,7 @@ class _RepoCacheEntry:
*,
error_fn: Callable[[Exception], None],
ignore_missing: bool = False,
) -> Optional[Dict[str, Dict[str, Any]]]:
) -> Optional[Dict[str, PkgManifest_Normalized]]:
# Important for local-only repositories (where the directory name defines the ID).
has_remote = self.remote_url != ""
@@ -1409,7 +1569,13 @@ class _RepoCacheEntry:
error_fn(Exception(error_str))
continue
pkg_manifest_local[pkg_idname] = item_local
if (value := PkgManifest_Normalized.from_dict_with_error_fn(
item_local,
pkg_idname=pkg_idname,
error_fn=error_fn,
)) is not None:
pkg_manifest_local[pkg_idname] = value
del value
self._pkg_manifest_local = pkg_manifest_local
return self._pkg_manifest_local
@@ -1484,7 +1650,7 @@ class RepoCacheStore:
error_fn: Callable[[Exception], None],
ignore_missing: bool = False,
directory_subset: Optional[Set[str]] = None,
) -> Optional[Dict[str, Dict[str, Any]]]:
) -> Optional[Dict[str, PkgManifest_Normalized]]:
for repo_entry in self._repos:
if directory == repo_entry.directory:
# Force refresh.
@@ -1502,7 +1668,7 @@ class RepoCacheStore:
check_files: bool = False,
ignore_missing: bool = False,
directory_subset: Optional[Set[str]] = None,
) -> Generator[Optional[Dict[str, Dict[str, Any]]], None, None]:
) -> Generator[Optional[Dict[str, PkgManifest_Normalized]], None, None]:
for repo_entry in self._repos:
if directory_subset is not None:
if repo_entry.directory not in directory_subset:
@@ -1522,9 +1688,14 @@ class RepoCacheStore:
# TODO(@ideasman42): we may want to include the "id", as part of moving to a new format
# the "id" used not to be part of each item so users of this API assume it's not.
# The `item_remote` could be used in-place however that needs further testing.
item_remove_copy = item_remote.copy()
pkg_idname = item_remove_copy.pop("id")
pkg_manifest_remote[pkg_idname] = item_remove_copy
pkg_idname = item_remote["id"]
if (value := PkgManifest_Normalized.from_dict_with_error_fn(
item_remote,
pkg_idname=pkg_idname,
error_fn=error_fn,
)) is not None:
pkg_manifest_remote[pkg_idname] = value
del value
yield pkg_manifest_remote
def pkg_manifest_from_local_ensure(
@@ -1533,7 +1704,7 @@ class RepoCacheStore:
error_fn: Callable[[Exception], None],
check_files: bool = False,
directory_subset: Optional[Set[str]] = None,
) -> Generator[Optional[Dict[str, Dict[str, Any]]], None, None]:
) -> Generator[Optional[Dict[str, PkgManifest_Normalized]], None, None]:
for repo_entry in self._repos:
if directory_subset is not None:
if repo_entry.directory not in directory_subset: