[Feature] Fixed batched_futures()#1006
Conversation
|
Important Review skippedDraft detected. Please check the settings in the CodeRabbit UI or the ⚙️ Run configurationConfiguration used: defaults Review profile: CHILL Plan: Pro Run ID: You can disable this status message by setting the Use the checkbox below for a quick retry:
📝 WalkthroughWalkthroughThe PR modifies batching to exclude and safely log exception-completing futures, preventing exceptions from cascading downstream. It rewrites the core batching loop to track skipped identities, emit batches upon collecting eligible results, and integrates exception-safe handling into the task scheduler to avoid crashes. ChangesBatched futures exception handling and scheduler integration
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~20 minutes Possibly related PRs
Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 inconclusive)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
Codecov Report✅ All modified and coverable lines are covered by tests. Additional details and impacted files@@ Coverage Diff @@
## main #1006 +/- ##
==========================================
+ Coverage 94.19% 94.25% +0.05%
==========================================
Files 39 39
Lines 2103 2122 +19
==========================================
+ Hits 1981 2000 +19
Misses 122 122 ☔ View full report in Codecov by Harness. 🚀 New features to boost your workflow:
|
There was a problem hiding this comment.
🧹 Nitpick comments (1)
src/executorlib/standalone/batched.py (1)
39-43: ⚡ Quick winConsider using a proper logger instead of
print().The current implementation uses
print()for logging failed futures. Using Python'sloggingmodule would provide better control over log levels, formatting, and output destinations, making it easier to integrate with application-wide logging configuration.📝 Suggested improvement
+import logging from concurrent.futures import Future +logger = logging.getLogger(__name__) + # Future objects we have already reported as failed -- so each failed job is logged once, not on # every scheduler pass (batched_futures is re-evaluated many times until a batch fills). _logged_failed_ids: set = set()Then replace the print statement:
if v.exception() is not None: if id(v) not in _logged_failed_ids: _logged_failed_ids.add(id(v)) - print( - f"[batched_futures] EXCLUDING failed future from batch: " - f"{type(v.exception()).__name__}: {v.exception()}", - flush=True, - ) + logger.warning( + "EXCLUDING failed future from batch: %s: %s", + type(v.exception()).__name__, + v.exception(), + )🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@src/executorlib/standalone/batched.py` around lines 39 - 43, Replace the ad-hoc print in the batched_futures code with the Python logging module: create/get a module logger (e.g., logger = logging.getLogger(__name__)) and use an appropriate level (logger.warning or logger.error) to log the message about excluding a failed future, include the exception type and message in the log string and pass exc_info=True or the exception object for richer context; update any imports to include logging.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Nitpick comments:
In `@src/executorlib/standalone/batched.py`:
- Around line 39-43: Replace the ad-hoc print in the batched_futures code with
the Python logging module: create/get a module logger (e.g., logger =
logging.getLogger(__name__)) and use an appropriate level (logger.warning or
logger.error) to log the message about excluding a failed future, include the
exception type and message in the log string and pass exc_info=True or the
exception object for richer context; update any imports to include logging.
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: c66e7c31-af06-47bc-9340-31aef8ad288e
📒 Files selected for processing (2)
src/executorlib/standalone/batched.pysrc/executorlib/task_scheduler/interactive/dependency.py
|
@copilot Extend the unit tests to cover the newly added lines. |
Updated versioning information and cleaned up type hints.
Added tests in commit
|
|
@copilot There are still three newly added lines in |
Added in commit
Regarding |
|
I split the changes in three different pull requests:
The concepts are the same, the place where the individual features were implemented are slightly different. |
Repo: executorlib · Base: upstream
main@f926c40· Files:standalone/batched.py,task_scheduler/interactive/dependency.py,tests/unit/standalone/test_batched.py,tests/unit/executor/test_single_dependencies.py· Size: +55 −12 (production) + new testsVerified: the patch applies clean with
git apply, and both files passpython -m py_compile.Context. A dependency-scheduler workload submits ~100k interdependent futures and uses a batched
collector —
exe.submit(batched, lst=<all upstream futures>, skip_lst=[...], n=<batch size>)— tostream completed results into the next stage in completion order (downstream must start on the first
ndone, not wait for all oflst). At that scale, three independent issues surfaced in this path: twoare scaling bugs, one is a fault-tolerance bug. They live in two files, and
batched_futuresis touchedby two of them — so below, the reasoning is per-issue and the code is shown per-function, each change
attributed to its issue.
Issues at a glance
batched_futuresx not in list) → O(N²); on rich objects (ASEAtoms, whose__eq__is ~100× a dict compare) a scan took >6.7 min at N=5000. Value-equality is also wrong — two distinct results can be==.id()in a set (O(1), identity-correct)_execute_tasks_with_dependenciesget_future_objects_from_inputtreats all 100klstfutures as inputs and scans them — O(N) per collector, on every wait-list pass → scheduler stalls, downstream starvesfn=="batched": register only the smallskip_lst; never ingestlstbatched_futures+_update_waiting_task.result()on an exception-completed future re-raises; the scheduler thenset_exceptions the batch future → it cascades to every downstream dependent. One failed input → all stages "finish" with zero output.try/exceptconfines any real exception to the one batch futureThe three are independent and can be reviewed/merged separately.
Why each fix is correct
Issue 1 —
id()dedup.skipped_idsholds the identity of every result already emitted in a priorbatch, so
id(result) not in skipped_idsanswers exactly the intended question — "have I already returnedthis object?" Every result is a distinct object, so
len(skipped_ids) == len(old list)andn_expectedis unchanged: identical behavior, O(1) identity instead of an O(N) value compare (and no reliance on a
possibly-expensive/ambiguous
__eq__). Scan dropped from >6.7 min → ~0.86 s at N=5000.Issue 2 — don't ingest
lst. A batched collector's readiness never depended onlst: it is drivenby
skip_lst(the few prior batch futures) plus thebatched_futuresscan, which runs once, whenskip_lstis done, and still returns the first-n-done — so completion order is preserved. Walking100k futures on ingestion (and every pass) was pure waste; registering only
skip_lstremoves theO(N)-per-collector stall with no semantic change.
Issue 3 — drop failed futures instead of re-raising.
Future.result()re-raises a failed future'sexception, so calling it inside the collector lets a single failed input abort the whole batch, which
the scheduler converts into a failure of the batch future and all its dependents. Checking
v.exception()first and skipping that future confines the failure to one item; the batch is still builtfrom the successful results. The
all_resolved/ partial-batch return covers the tail case (input fullyresolved but
nunreachable because some failed) so the collector can't wait forever for a batch that cannever fill. The
try/exceptin the scheduler is a backstop: a genuine exception (e.g. from askip_lstfuture) fails only that batch future instead of crashing the scheduler thread.
Tests
New unit tests cover all new code paths:
test_batched_futures_with_failed_future— a failed future insidelstis excluded and the batch is built from the remaining successful results (no exception raised).test_batched_futures_failed_future_logged_once—_logged_failed_idsdedup: the same failed future is recorded only once, even across repeated calls.test_batched_futures_partial_batch_due_to_failures— when all futures are resolved butnis unreachable due to failures, a partial batch is emitted instead of blocking forever.test_batched_with_failed_upstream_future(integration) — end-to-end check viaSingleNodeExecutor.batched()that a failed upstream future is silently dropped and all batch futures still resolve successfully without cascading the exception downstream.test_update_waiting_task_batched_exception— directly calls_update_waiting_taskwith a mockskip_lstfuture whose.result()raises, verifying that theexcept Exceptionhandler in_update_waiting_taskcatches the error and propagates it to the batch future (instead of crashing the scheduler thread).Summary by CodeRabbit
Release Notes