Extensions: install all updates in parallel

Run each update as a separate job, previously updates for each
repository was split into a separate job however that would only
use parallel connections with multiple online repositories which
isn't used by default. Currently the connection limit is hard-coded but
will be made into a preference.
This commit is contained in:
Campbell Barton
2024-06-14 15:12:23 +10:00
parent 04f310a19a
commit 176864ab03
4 changed files with 70 additions and 23 deletions

View File

@@ -222,6 +222,7 @@ def sync_status_generator(repos_fn):
# Used as a prefix in status.
title="Update",
batch=cmd_batch_partial,
batch_job_limit=5,
)
del cmd_batch_partial

View File

@@ -133,6 +133,13 @@ class CheckSIGINT_Context:
# Internal Utilities
#
def _sequence_split_with_job_limit(items, job_limit):
# When only one job is allowed at a time, there is no advantage to splitting the sequence.
if job_limit == 1:
return (items,)
return [(elem,) for elem in items]
def _preferences_repo_find_by_remote_url(context, remote_url):
remote_url = remote_url.rstrip("/")
prefs = context.preferences
@@ -1066,6 +1073,7 @@ class EXTENSIONS_OT_dummy_progress(Operator, _ExtCmdMixIn):
use_idle=is_modal,
),
],
batch_job_limit=1,
)
def exec_command_finish(self, canceled):
@@ -1129,6 +1137,7 @@ class EXTENSIONS_OT_repo_sync(Operator, _ExtCmdMixIn):
return bl_extension_utils.CommandBatch(
title="Sync",
batch=cmd_batch,
batch_job_limit=1,
)
def exec_command_finish(self, canceled):
@@ -1222,6 +1231,7 @@ class EXTENSIONS_OT_repo_sync_all(Operator, _ExtCmdMixIn):
return bl_extension_utils.CommandBatch(
title="Sync \"{:s}\"".format(repos_all[0].name) if use_active_only else "Sync All",
batch=cmd_batch,
batch_job_limit=1,
)
def exec_command_finish(self, canceled):
@@ -1378,6 +1388,9 @@ class EXTENSIONS_OT_package_upgrade_all(Operator, _ExtCmdMixIn):
assert False, "unreachable" # Poll prevents this.
return None
# TODO: use a preference.
network_connection_limit = 5
# NOTE: Unless we have a "clear-cache" operator - there isn't a great place to apply cache-clearing.
# So when cache is disabled simply clear all cache before performing an update.
# Further, individual install & remove operation will manage the cache
@@ -1425,18 +1438,20 @@ class EXTENSIONS_OT_package_upgrade_all(Operator, _ExtCmdMixIn):
for repo_index, pkg_id_sequence in enumerate(packages_to_upgrade):
if not pkg_id_sequence:
continue
repo_item = repos_all[repo_index]
cmd_batch.append(partial(
bl_extension_utils.pkg_install,
directory=repo_item.directory,
remote_url=url_append_defaults(repo_item.remote_url),
pkg_id_sequence=pkg_id_sequence,
online_user_agent=online_user_agent_from_blender(),
blender_version=bpy.app.version,
access_token=repo_item.access_token,
use_cache=repo_item.use_cache,
use_idle=is_modal,
))
for pkg_id_sequence in sequence_split_with_job_limit(pkg_id_sequence, network_connection_limit):
cmd_batch.append(partial(
bl_extension_utils.pkg_install,
directory=repo_item.directory,
remote_url=url_append_defaults(repo_item.remote_url),
pkg_id_sequence=pkg_id_sequence,
online_user_agent=online_user_agent_from_blender(),
blender_version=bpy.app.version,
access_token=repo_item.access_token,
use_cache=repo_item.use_cache,
use_idle=is_modal,
))
self._repo_directories.add(repo_item.directory)
if not cmd_batch:
@@ -1465,6 +1480,7 @@ class EXTENSIONS_OT_package_upgrade_all(Operator, _ExtCmdMixIn):
"Update {:d} Package(s)".format(package_count)
),
batch=cmd_batch,
batch_job_limit=network_connection_limit,
)
def exec_command_finish(self, canceled):
@@ -1522,6 +1538,9 @@ class EXTENSIONS_OT_package_install_marked(Operator, _ExtCmdMixIn):
self._repo_map_packages_addon_only = []
package_count = 0
# TODO: use a preference.
network_connection_limit = 5
cmd_batch = []
for repo_index, pkg_id_sequence in sorted(repo_pkg_map.items()):
repo_item = repos_all[repo_index]
@@ -1536,17 +1555,19 @@ class EXTENSIONS_OT_package_install_marked(Operator, _ExtCmdMixIn):
if not pkg_id_sequence:
continue
cmd_batch.append(partial(
bl_extension_utils.pkg_install,
directory=repo_item.directory,
remote_url=url_append_defaults(repo_item.remote_url),
pkg_id_sequence=pkg_id_sequence,
online_user_agent=online_user_agent_from_blender(),
blender_version=bpy.app.version,
access_token=repo_item.access_token,
use_cache=repo_item.use_cache,
use_idle=is_modal,
))
for pkg_id_sequence in _sequence_split_with_job_limit(pkg_id_sequence, network_connection_limit):
cmd_batch.append(partial(
bl_extension_utils.pkg_install,
directory=repo_item.directory,
remote_url=url_append_defaults(repo_item.remote_url),
pkg_id_sequence=pkg_id_sequence,
online_user_agent=online_user_agent_from_blender(),
blender_version=bpy.app.version,
access_token=repo_item.access_token,
use_cache=repo_item.use_cache,
use_idle=is_modal,
))
self._repo_directories.add(repo_item.directory)
package_count += len(pkg_id_sequence)
@@ -1575,6 +1596,7 @@ class EXTENSIONS_OT_package_install_marked(Operator, _ExtCmdMixIn):
return bl_extension_utils.CommandBatch(
title="Install {:d} Marked Package(s)".format(package_count),
batch=cmd_batch,
batch_job_limit=network_connection_limit,
)
def exec_command_finish(self, canceled):
@@ -1696,6 +1718,7 @@ class EXTENSIONS_OT_package_uninstall_marked(Operator, _ExtCmdMixIn):
return bl_extension_utils.CommandBatch(
title="Uninstall {:d} Marked Package(s)".format(package_count),
batch=cmd_batch,
batch_job_limit=1,
)
def exec_command_finish(self, canceled):
@@ -1892,6 +1915,7 @@ class EXTENSIONS_OT_package_install_files(Operator, _ExtCmdMixIn):
use_idle=is_modal,
)
],
batch_job_limit=1,
)
def exec_command_finish(self, canceled):
@@ -2171,6 +2195,7 @@ class EXTENSIONS_OT_package_install(Operator, _ExtCmdMixIn):
use_idle=is_modal,
)
],
batch_job_limit=1,
)
def exec_command_finish(self, canceled):
@@ -2369,6 +2394,7 @@ class EXTENSIONS_OT_package_uninstall(Operator, _ExtCmdMixIn):
use_idle=is_modal,
),
],
batch_job_limit=1,
)
def exec_command_finish(self, canceled):

View File

@@ -777,6 +777,7 @@ class CommandBatch:
"title",
"_batch",
"_batch_job_limit",
"_request_exit",
"_log_added_since_accessed",
)
@@ -786,9 +787,11 @@ class CommandBatch:
*,
title: str,
batch: Sequence[InfoItemCallable],
batch_job_limit: int,
):
self.title = title
self._batch = [CommandBatchItem(fn_with_args) for fn_with_args in batch]
self._batch_job_limit = batch_job_limit
self._request_exit = False
self._log_added_since_accessed = True
@@ -858,7 +861,16 @@ class CommandBatch:
status_data_changed = False
# NOTE: the method of limiting the number of running jobs won't be efficient
# with large numbers of jobs (tens of thousands or more), since all jobs are iterated over each time.
# To support this a queue of not-yet-started jobs could be used ... or this whole function could be re-thought.
# At this point in time using such large numbers of jobs isn't likely, so accept the simple loop each time
# this function is called.
batch_job_limit = self._batch_job_limit
running_count = 0
complete_count = 0
for cmd_index in reversed(range(len(self._batch))):
cmd = self._batch[cmd_index]
if cmd.status == CommandBatchItem.STATUS_COMPLETE:
@@ -869,6 +881,10 @@ class CommandBatch:
# First time initialization.
if cmd.fn_iter is None:
if 0 != batch_job_limit and running_count >= batch_job_limit:
# Try again later.
continue
cmd.fn_iter = cmd.invoke()
cmd.status = CommandBatchItem.STATUS_RUNNING
status_data_changed = True
@@ -908,6 +924,9 @@ class CommandBatch:
status_data_changed = True
cmd.msg_log.append((ty, msg))
if cmd.status == CommandBatchItem.STATUS_RUNNING:
running_count += 1
# Check if all are complete.
assert complete_count == len([cmd for cmd in self._batch if cmd.status == CommandBatchItem.STATUS_COMPLETE])
all_complete = complete_count == len(self._batch)

View File

@@ -557,8 +557,9 @@ class TestSimple(TestWithTempBlenderUser_MixIn, unittest.TestCase):
# Install.
stdout = run_blender_extensions_no_errors(("install", ",".join(packages_to_install), "--enable"))
# Sort output because the order doesn't matter and may change depending on how jobs are split up.
self.assertEqual(
tuple(line for line in stdout.split("\n") if line.startswith("STATUS ")),
tuple(line for line in sorted(stdout.split("\n")) if line.startswith("STATUS ")),
(
'''STATUS Installed "my_test_pkg"''',
'''STATUS Installed "my_test_pkg_a"''',