Skip to content

[Feature] Check which future objects are skipped and keep track of their IDs#1014

Merged
jan-janssen merged 14 commits into
mainfrom
batched_duplicate
Jun 11, 2026
Merged

[Feature] Check which future objects are skipped and keep track of their IDs#1014
jan-janssen merged 14 commits into
mainfrom
batched_duplicate

Conversation

@jan-janssen

@jan-janssen jan-janssen commented Jun 11, 2026

Copy link
Copy Markdown
Member

Summary by CodeRabbit

  • Bug Fixes

    • Batch processing now detects and surfaces errors from completed work, ensuring downstream tasks receive failures instead of silent drops.
    • Clearer handling of empty vs. error batch results so dependent tasks are completed or failed appropriately.
  • Tests

    • Added coverage for duplicated results and mixed success/failure scenarios to validate batching behavior and error propagation.

@coderabbitai

coderabbitai Bot commented Jun 11, 2026

Copy link
Copy Markdown
Contributor

Review Change Stack

Note

Reviews paused

It looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the reviews.auto_review.auto_pause_after_reviewed_commits setting.

Use the following commands to manage reviews:

  • @coderabbitai resume to resume automatic reviews.
  • @coderabbitai review to trigger a single review.

Use the checkboxes below for quick actions:

  • ▶️ Resume reviews
  • 🔍 Trigger review
📝 Walkthrough

Walkthrough

batched_futures now collects exceptions and may return an exception payload; the dependency scheduler interprets return shapes to distinguish waiting/success/exception, and tests were updated with cases for duplicated results and mixed success/failure propagation.

Changes

Enhanced exception handling in batched futures

Layer / File(s) Summary
batched_futures exception tracking
src/executorlib/standalone/batched.py
Function signature now allows returning an exception payload; implementation collects exceptions from done futures, appends non-skipped results until expected batch size met, returns batch early, or returns first exception when all non-skipped futures failed.
Dependency scheduler batched task interpretation
src/executorlib/task_scheduler/interactive/dependency.py
_update_waiting_task branches on batched_futures return type: empty list -> keep waiting; non-empty list (list) -> set_result; non-list (exception payload) -> set_exception.
Tests: duplicated results and mixed failures
tests/unit/standalone/test_batched.py
Imports and TestCase reference updated; new test_batched_futures_duplicated validates repeated-value batching; test_batched_futures now includes futures that raise ValueError and asserts exception propagation when nested_skip_lst is complete.

Sequence Diagram

sequenceDiagram
  participant TaskScheduler
  participant batched_futures
  participant FutureCollection
  TaskScheduler->>batched_futures: call batched_futures(lst, nested_skip_lst)
  batched_futures->>FutureCollection: inspect futures (.done(), .exception(), .result())
  alt non-empty batch produced
    batched_futures-->>TaskScheduler: return list (done_lst)
    TaskScheduler->>TaskScheduler: set_result(done_lst)
  else all non-skipped failed
    batched_futures-->>TaskScheduler: return BaseException
    TaskScheduler->>TaskScheduler: set_exception(BaseException)
  else no ready results
    batched_futures-->>TaskScheduler: return []
    TaskScheduler->>TaskScheduler: keep task waiting
  end
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~45 minutes

Possibly related PRs

  • pyiron/executorlib#756: Related changes to batching integration and _update_waiting_task handling for "batched" tasks.
  • pyiron/executorlib#991: Related edits to batched_futures behavior and scheduler handling; overlaps with test updates.

Suggested reviewers

  • samwaseda

Poem

🐰 I peek at futures, one by one,

I count the batches in the sun,
If all the slices turn to cries,
I raise the error to the skies,
Else I bundle dupes and hop—well done!

🚥 Pre-merge checks | ✅ 3 | ❌ 2

❌ Failed checks (1 warning, 1 inconclusive)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 30.00% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
Title check ❓ Inconclusive The PR title mentions tracking future object IDs and skipping logic, but the main changes involve enhancing batched_futures with exception handling, adding a new duplicate test, and updating return types to include BaseException. The title focuses on ID tracking for skipped futures, which is a partial aspect of the changes but misses the significant exception handling improvements and the new test addition. Consider revising to reflect the full scope of changes.
✅ Passed checks (3 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Linked Issues check ✅ Passed Check skipped because no linked issues were found for this pull request.
Out of Scope Changes check ✅ Passed Check skipped because no linked issues were found for this pull request.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
📝 Generate docstrings
  • Create stacked PR
  • Commit on current branch
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch batched_duplicate

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@tests/unit/standalone/test_batched.py`:
- Around line 22-36: Add a final assertion to test_batched_futures_duplicated
that calls batched_futures with nested_skip_lst=batched_lst and asserts the
fully-consumed completion result (an empty list) is returned; specifically call
batched_futures(lst=lst, nested_skip_lst=batched_lst, n=3) and assertEqual(...,
[]). This ensures the identity-based skip_set / n_expected logic in
batched_futures and the downstream expectation of list-like done_lst are
validated.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: dd4b1a52-4e66-49e0-93c1-c9c17f79c182

📥 Commits

Reviewing files that changed from the base of the PR and between 65f1ef3 and 1c7ee9c.

📒 Files selected for processing (1)
  • tests/unit/standalone/test_batched.py

Comment thread tests/unit/standalone/test_batched.py
@codecov

codecov Bot commented Jun 11, 2026

Copy link
Copy Markdown

Codecov Report

✅ All modified and coverable lines are covered by tests.
✅ Project coverage is 94.22%. Comparing base (65f1ef3) to head (8d6b0fb).

Additional details and impacted files
@@            Coverage Diff             @@
##             main    #1014      +/-   ##
==========================================
+ Coverage   94.19%   94.22%   +0.03%     
==========================================
  Files          39       39              
  Lines        2103     2114      +11     
==========================================
+ Hits         1981     1992      +11     
  Misses        122      122              

☔ View full report in Codecov by Harness.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@src/executorlib/task_scheduler/interactive/dependency.py`:
- Around line 378-392: The current logic computes n_expected using len(skip_set)
which loses multiplicity and then only returns when either n_expected successes
or n_expected failures occur, causing deadlocks for tail batches; fix by
computing already_consumed as the total number of skipped items (preserving
multiplicity) e.g. sum(len(f.result()) for f in nested_skip_lst) and set
n_expected = min(n, len(lst) - already_consumed), and change the termination
condition to return once len(done_lst) + len(failed_lst) == n_expected (return
done_lst if any successes else failed_lst) instead of only when one side reaches
n_expected; update references to skip_set, nested_skip_lst, n_expected,
done_lst, failed_lst and check_exception_was_raised accordingly.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: b42001e4-82de-43c5-8f9c-39e94111cbe2

📥 Commits

Reviewing files that changed from the base of the PR and between 1c7ee9c and dbb62f8.

📒 Files selected for processing (4)
  • src/executorlib/standalone/batched.py
  • src/executorlib/task_scheduler/interactive/dependency.py
  • tests/unit/standalone/test_batched.py
  • tests/unit/task_scheduler/interactive/test_dependency.py
💤 Files with no reviewable changes (2)
  • src/executorlib/standalone/batched.py
  • tests/unit/standalone/test_batched.py

Comment thread src/executorlib/task_scheduler/interactive/dependency.py Outdated

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 4

♻️ Duplicate comments (2)
tests/unit/standalone/test_batched.py (1)

23-36: 🛠️ Refactor suggestion | 🟠 Major | ⚡ Quick win

Missing terminal test case for fully-consumed state.

As flagged in a previous review, the test should include a final assertion when all batches have been consumed (nested_skip_lst=batched_lst). This validates the completion behavior and would expose the multiplicity-loss deadlock bug in batched.py.

Current behavior with the multiplicity bug:

batched_futures(lst=lst, nested_skip_lst=batched_lst, n=3)
# skip_set = {id(1), id(2), id(3)} - length 3, should be 9
# n_expected = min(3, 9-3) = 3 (wrong, should be 0)
# All futures skipped, done_lst = []
# Returns [] (keep waiting) - DEADLOCK! ✗

Expected behavior after fix:

# Should return [] correctly recognizing all items consumed
🧪 Suggested test addition
         self.assertEqual(batched_futures(lst=lst, nested_skip_lst=batched_lst[:2], n=3), [3, 3, 3])
+        self.assertEqual(batched_futures(lst=lst, nested_skip_lst=batched_lst, n=3), [])
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@tests/unit/standalone/test_batched.py` around lines 23 - 36, Add a terminal
assertion to the test_batched_futures_duplicated test to verify the
fully-consumed state: call batched_futures with lst and nested_skip_lst equal to
the entire batched_lst and assert it returns an empty list ([]). This ensures
batched_futures correctly recognizes when all items are consumed (references:
test_batched_futures_duplicated, batched_futures, batched_lst, lst).
src/executorlib/standalone/batched.py (1)

20-37: ⚠️ Potential issue | 🔴 Critical | 🏗️ Heavy lift

Multiplicity loss with duplicate values causes deadlock.

This issue was flagged in a previous review and remains unresolved. When nested_skip_lst contains duplicate scalar values (e.g., [1, 1, 1]), the identity-based skip_set loses multiplicity:

  1. Line 20: skip_set = {id(1), id(1), id(1)} becomes {id(1)} (one element, not three) because Python interns small integers
  2. Line 24: n_expected = min(n, 9 - 1) when it should be min(n, 9 - 3)
  3. Line 30: id(v.result()) not in skip_set skips all futures with result 1, not just the already-consumed count

Concrete deadlock scenario:

# 6 futures all returning value 1, batch size 3
lst = [f1(→1), f2(→1), f3(→1), f4(→1), f5(→1), f6(→1)]

# First batch succeeds
batched_futures(lst, [], 3) → [1, 1, 1] ✓

# Second batch deadlocks
batched_futures(lst, [Future(→[1,1,1])], 3)
  skip_set = {id(1)}  # length 1, should track 3 consumed
  # All 6 futures skipped because id(1) in skip_set
  done_lst = []
  → returns [] (keep waiting forever) ✗
🔧 Recommended fix (from previous review)
+from collections import Counter
+
 def batched_futures(
     lst: list[Future], nested_skip_lst: list[Future[list]], n: int
-) -> list[list]:
+) -> list | Exception:
-    skip_set = {id(item) for f in nested_skip_lst for item in f.result()}
-
+    skip_counter = Counter(id(item) for f in nested_skip_lst for item in f.result())
     done_lst = []
     failed_lst = []
-    n_expected = min(n, len(lst) - len(skip_set))
+    n_expected = min(n, len(lst) - sum(skip_counter.values()))
     for v in lst:
         if v.done():
             excp = v.exception()
             if excp is not None:
                 failed_lst.append(excp)
-            elif id(v.result()) not in skip_set:
+            else:
+                result_id = id(v.result())
+                if skip_counter[result_id] > 0:
+                    skip_counter[result_id] -= 1
+                    continue
                 done_lst.append(v.result())
                 if len(done_lst) == n_expected:
                     return done_lst
     if len(failed_lst) == len(lst) and len(failed_lst) > 0:
         return failed_lst[0]
     else:
         return []
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/executorlib/standalone/batched.py` around lines 20 - 37, The code loses
multiplicity by building skip_set = {id(...)}, so replace that with a counts
map: build skip_counts (e.g., Counter of id(r) for r in nested_skip_lst results)
and compute n_expected = min(n, len(lst) - sum(skip_counts.values())); then when
iterating v in lst check r_id = id(v.result()) and if skip_counts.get(r_id, 0) >
0 decrement skip_counts[r_id] and continue (consume one occurrence) otherwise
treat the result as non-skipped and append to done_lst; keep the existing
failed_lst logic unchanged. Ensure you update references to skip_set,
n_expected, and the id(v.result()) check to use skip_counts so duplicate scalar
results are only skipped the correct number of times.
🧹 Nitpick comments (1)
tests/unit/standalone/test_batched.py (1)

4-4: 💤 Low value

Consider importing from the definition site.

batched_futures is defined in executorlib.standalone.batched but imported here from the re-exporting module dependency. While functional, direct imports improve clarity and reduce indirection.

♻️ Suggested change
-from executorlib.task_scheduler.interactive.dependency import batched_futures
+from executorlib.standalone.batched import batched_futures
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@tests/unit/standalone/test_batched.py` at line 4, Replace the indirect import
of batched_futures from executorlib.task_scheduler.interactive.dependency with a
direct import from its definition module executorlib.standalone.batched: update
the import statement in tests/unit/standalone/test_batched.py to import the
symbol batched_futures from executorlib.standalone.batched so the test
references the definition site (locate the current import of batched_futures and
change its module path accordingly).
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@src/executorlib/standalone/batched.py`:
- Line 35: The comment currently says "raise the exception" but the code returns
failed_lst[0]; update the comment (near the return failed_lst[0] statement in
the function that aggregates failed futures) to accurately state that the
exception object is returned to the caller for the scheduler to propagate (e.g.,
"return the exception object to be propagated by the scheduler") rather than
saying it is raised; do not change behavior.
- Around line 4-6: The return annotation for batched_futures is wrong: the
function actually returns either a flat list (done_lst), a single Exception
(failed_lst[0]), or an empty list, not list[list]; update the signature of
batched_futures to a correct union type (e.g. -> Union[List[Any], Exception])
and import the necessary typing names (from typing import Any, List, Union) and
adjust any callers if needed; reference: function batched_futures and variables
done_lst and failed_lst in the implementation.

In `@src/executorlib/task_scheduler/interactive/dependency.py`:
- Around line 351-356: The branch handling for batched results in the scheduler
is inverted: when batched_futures returns a non-empty list (success) you must
call task_wait_dict["future"].set_result(done_lst), and when it returns a
non-list exception object you must call
task_wait_dict["future"].set_exception(done_lst); keep the existing empty-list
(len==0) behavior that appends to wait_tmp_lst. Update the code around the
done_lst checks in the block that manipulates task_wait_dict (the three-way
if/elif/else using isinstance(done_lst, list)) to swap the set_result and
set_exception calls accordingly, referencing task_wait_dict and done_lst so
successful batches deliver results and exception objects are raised.

In `@tests/unit/standalone/test_batched.py`:
- Line 18: The tests call batched_futures(lst=..., n=3, nested_skip_lst=set()),
but batched_futures expects nested_skip_lst: list[Future[list]]; replace the
incorrect set() arguments with an empty list [] in all occurrences (the failing
calls at the current test lines and the other two instances noted) so the tests
match the declared parameter type for batched_futures.

---

Duplicate comments:
In `@src/executorlib/standalone/batched.py`:
- Around line 20-37: The code loses multiplicity by building skip_set =
{id(...)}, so replace that with a counts map: build skip_counts (e.g., Counter
of id(r) for r in nested_skip_lst results) and compute n_expected = min(n,
len(lst) - sum(skip_counts.values())); then when iterating v in lst check r_id =
id(v.result()) and if skip_counts.get(r_id, 0) > 0 decrement skip_counts[r_id]
and continue (consume one occurrence) otherwise treat the result as non-skipped
and append to done_lst; keep the existing failed_lst logic unchanged. Ensure you
update references to skip_set, n_expected, and the id(v.result()) check to use
skip_counts so duplicate scalar results are only skipped the correct number of
times.

In `@tests/unit/standalone/test_batched.py`:
- Around line 23-36: Add a terminal assertion to the
test_batched_futures_duplicated test to verify the fully-consumed state: call
batched_futures with lst and nested_skip_lst equal to the entire batched_lst and
assert it returns an empty list ([]). This ensures batched_futures correctly
recognizes when all items are consumed (references:
test_batched_futures_duplicated, batched_futures, batched_lst, lst).

---

Nitpick comments:
In `@tests/unit/standalone/test_batched.py`:
- Line 4: Replace the indirect import of batched_futures from
executorlib.task_scheduler.interactive.dependency with a direct import from its
definition module executorlib.standalone.batched: update the import statement in
tests/unit/standalone/test_batched.py to import the symbol batched_futures from
executorlib.standalone.batched so the test references the definition site
(locate the current import of batched_futures and change its module path
accordingly).
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 7da6f45c-803b-4a1d-988a-b8f45f0e82af

📥 Commits

Reviewing files that changed from the base of the PR and between dbb62f8 and bdf197e.

📒 Files selected for processing (3)
  • src/executorlib/standalone/batched.py
  • src/executorlib/task_scheduler/interactive/dependency.py
  • tests/unit/standalone/test_batched.py

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Caution

Inline review comments failed to post. This is likely due to GitHub's internal server error or limits when posting large numbers of comments. If you are seeing this consistently it is likely a permissions issue. Please check "Moderation" -> "Code review limits" under your organization settings.

Actionable comments posted: 4

♻️ Duplicate comments (2)
tests/unit/standalone/test_batched.py (1)

23-36: 🛠️ Refactor suggestion | 🟠 Major | ⚡ Quick win

Missing terminal test case for fully-consumed state.

As flagged in a previous review, the test should include a final assertion when all batches have been consumed (nested_skip_lst=batched_lst). This validates the completion behavior and would expose the multiplicity-loss deadlock bug in batched.py.

Current behavior with the multiplicity bug:

batched_futures(lst=lst, nested_skip_lst=batched_lst, n=3)
# skip_set = {id(1), id(2), id(3)} - length 3, should be 9
# n_expected = min(3, 9-3) = 3 (wrong, should be 0)
# All futures skipped, done_lst = []
# Returns [] (keep waiting) - DEADLOCK! ✗

Expected behavior after fix:

# Should return [] correctly recognizing all items consumed
🧪 Suggested test addition
         self.assertEqual(batched_futures(lst=lst, nested_skip_lst=batched_lst[:2], n=3), [3, 3, 3])
+        self.assertEqual(batched_futures(lst=lst, nested_skip_lst=batched_lst, n=3), [])
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@tests/unit/standalone/test_batched.py` around lines 23 - 36, Add a terminal
assertion to the test_batched_futures_duplicated test to verify the
fully-consumed state: call batched_futures with lst and nested_skip_lst equal to
the entire batched_lst and assert it returns an empty list ([]). This ensures
batched_futures correctly recognizes when all items are consumed (references:
test_batched_futures_duplicated, batched_futures, batched_lst, lst).
src/executorlib/standalone/batched.py (1)

20-37: ⚠️ Potential issue | 🔴 Critical | 🏗️ Heavy lift

Multiplicity loss with duplicate values causes deadlock.

This issue was flagged in a previous review and remains unresolved. When nested_skip_lst contains duplicate scalar values (e.g., [1, 1, 1]), the identity-based skip_set loses multiplicity:

  1. Line 20: skip_set = {id(1), id(1), id(1)} becomes {id(1)} (one element, not three) because Python interns small integers
  2. Line 24: n_expected = min(n, 9 - 1) when it should be min(n, 9 - 3)
  3. Line 30: id(v.result()) not in skip_set skips all futures with result 1, not just the already-consumed count

Concrete deadlock scenario:

# 6 futures all returning value 1, batch size 3
lst = [f1(→1), f2(→1), f3(→1), f4(→1), f5(→1), f6(→1)]

# First batch succeeds
batched_futures(lst, [], 3) → [1, 1, 1] ✓

# Second batch deadlocks
batched_futures(lst, [Future(→[1,1,1])], 3)
  skip_set = {id(1)}  # length 1, should track 3 consumed
  # All 6 futures skipped because id(1) in skip_set
  done_lst = []
  → returns [] (keep waiting forever) ✗
🔧 Recommended fix (from previous review)
+from collections import Counter
+
 def batched_futures(
     lst: list[Future], nested_skip_lst: list[Future[list]], n: int
-) -> list[list]:
+) -> list | Exception:
-    skip_set = {id(item) for f in nested_skip_lst for item in f.result()}
-
+    skip_counter = Counter(id(item) for f in nested_skip_lst for item in f.result())
     done_lst = []
     failed_lst = []
-    n_expected = min(n, len(lst) - len(skip_set))
+    n_expected = min(n, len(lst) - sum(skip_counter.values()))
     for v in lst:
         if v.done():
             excp = v.exception()
             if excp is not None:
                 failed_lst.append(excp)
-            elif id(v.result()) not in skip_set:
+            else:
+                result_id = id(v.result())
+                if skip_counter[result_id] > 0:
+                    skip_counter[result_id] -= 1
+                    continue
                 done_lst.append(v.result())
                 if len(done_lst) == n_expected:
                     return done_lst
     if len(failed_lst) == len(lst) and len(failed_lst) > 0:
         return failed_lst[0]
     else:
         return []
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/executorlib/standalone/batched.py` around lines 20 - 37, The code loses
multiplicity by building skip_set = {id(...)}, so replace that with a counts
map: build skip_counts (e.g., Counter of id(r) for r in nested_skip_lst results)
and compute n_expected = min(n, len(lst) - sum(skip_counts.values())); then when
iterating v in lst check r_id = id(v.result()) and if skip_counts.get(r_id, 0) >
0 decrement skip_counts[r_id] and continue (consume one occurrence) otherwise
treat the result as non-skipped and append to done_lst; keep the existing
failed_lst logic unchanged. Ensure you update references to skip_set,
n_expected, and the id(v.result()) check to use skip_counts so duplicate scalar
results are only skipped the correct number of times.
🧹 Nitpick comments (1)
tests/unit/standalone/test_batched.py (1)

4-4: 💤 Low value

Consider importing from the definition site.

batched_futures is defined in executorlib.standalone.batched but imported here from the re-exporting module dependency. While functional, direct imports improve clarity and reduce indirection.

♻️ Suggested change
-from executorlib.task_scheduler.interactive.dependency import batched_futures
+from executorlib.standalone.batched import batched_futures
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@tests/unit/standalone/test_batched.py` at line 4, Replace the indirect import
of batched_futures from executorlib.task_scheduler.interactive.dependency with a
direct import from its definition module executorlib.standalone.batched: update
the import statement in tests/unit/standalone/test_batched.py to import the
symbol batched_futures from executorlib.standalone.batched so the test
references the definition site (locate the current import of batched_futures and
change its module path accordingly).
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@src/executorlib/standalone/batched.py`:
- Line 35: The comment currently says "raise the exception" but the code returns
failed_lst[0]; update the comment (near the return failed_lst[0] statement in
the function that aggregates failed futures) to accurately state that the
exception object is returned to the caller for the scheduler to propagate (e.g.,
"return the exception object to be propagated by the scheduler") rather than
saying it is raised; do not change behavior.
- Around line 4-6: The return annotation for batched_futures is wrong: the
function actually returns either a flat list (done_lst), a single Exception
(failed_lst[0]), or an empty list, not list[list]; update the signature of
batched_futures to a correct union type (e.g. -> Union[List[Any], Exception])
and import the necessary typing names (from typing import Any, List, Union) and
adjust any callers if needed; reference: function batched_futures and variables
done_lst and failed_lst in the implementation.

In `@src/executorlib/task_scheduler/interactive/dependency.py`:
- Around line 351-356: The branch handling for batched results in the scheduler
is inverted: when batched_futures returns a non-empty list (success) you must
call task_wait_dict["future"].set_result(done_lst), and when it returns a
non-list exception object you must call
task_wait_dict["future"].set_exception(done_lst); keep the existing empty-list
(len==0) behavior that appends to wait_tmp_lst. Update the code around the
done_lst checks in the block that manipulates task_wait_dict (the three-way
if/elif/else using isinstance(done_lst, list)) to swap the set_result and
set_exception calls accordingly, referencing task_wait_dict and done_lst so
successful batches deliver results and exception objects are raised.

In `@tests/unit/standalone/test_batched.py`:
- Line 18: The tests call batched_futures(lst=..., n=3, nested_skip_lst=set()),
but batched_futures expects nested_skip_lst: list[Future[list]]; replace the
incorrect set() arguments with an empty list [] in all occurrences (the failing
calls at the current test lines and the other two instances noted) so the tests
match the declared parameter type for batched_futures.

---

Duplicate comments:
In `@src/executorlib/standalone/batched.py`:
- Around line 20-37: The code loses multiplicity by building skip_set =
{id(...)}, so replace that with a counts map: build skip_counts (e.g., Counter
of id(r) for r in nested_skip_lst results) and compute n_expected = min(n,
len(lst) - sum(skip_counts.values())); then when iterating v in lst check r_id =
id(v.result()) and if skip_counts.get(r_id, 0) > 0 decrement skip_counts[r_id]
and continue (consume one occurrence) otherwise treat the result as non-skipped
and append to done_lst; keep the existing failed_lst logic unchanged. Ensure you
update references to skip_set, n_expected, and the id(v.result()) check to use
skip_counts so duplicate scalar results are only skipped the correct number of
times.

In `@tests/unit/standalone/test_batched.py`:
- Around line 23-36: Add a terminal assertion to the
test_batched_futures_duplicated test to verify the fully-consumed state: call
batched_futures with lst and nested_skip_lst equal to the entire batched_lst and
assert it returns an empty list ([]). This ensures batched_futures correctly
recognizes when all items are consumed (references:
test_batched_futures_duplicated, batched_futures, batched_lst, lst).

---

Nitpick comments:
In `@tests/unit/standalone/test_batched.py`:
- Line 4: Replace the indirect import of batched_futures from
executorlib.task_scheduler.interactive.dependency with a direct import from its
definition module executorlib.standalone.batched: update the import statement in
tests/unit/standalone/test_batched.py to import the symbol batched_futures from
executorlib.standalone.batched so the test references the definition site
(locate the current import of batched_futures and change its module path
accordingly).
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 7da6f45c-803b-4a1d-988a-b8f45f0e82af

📥 Commits

Reviewing files that changed from the base of the PR and between dbb62f8 and bdf197e.

📒 Files selected for processing (3)
  • src/executorlib/standalone/batched.py
  • src/executorlib/task_scheduler/interactive/dependency.py
  • tests/unit/standalone/test_batched.py
🛑 Comments failed to post (4)
src/executorlib/standalone/batched.py (2)

4-6: ⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Incorrect return type annotation.

The function signature declares -> list[list], but the implementation returns different types:

  • Line 33: returns done_lst (a flat list like [1, 2, 3], not nested)
  • Line 35: returns failed_lst[0] (a single Exception object)
  • Line 37: returns [] (empty list)
📝 Suggested fix
 def batched_futures(
     lst: list[Future], nested_skip_lst: list[Future[list]], n: int
-) -> list[list]:
+) -> list | Exception:
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

def batched_futures(
    lst: list[Future], nested_skip_lst: list[Future[list]], n: int
) -> list | Exception:
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/executorlib/standalone/batched.py` around lines 4 - 6, The return
annotation for batched_futures is wrong: the function actually returns either a
flat list (done_lst), a single Exception (failed_lst[0]), or an empty list, not
list[list]; update the signature of batched_futures to a correct union type
(e.g. -> Union[List[Any], Exception]) and import the necessary typing names
(from typing import Any, List, Union) and adjust any callers if needed;
reference: function batched_futures and variables done_lst and failed_lst in the
implementation.

35-35: ⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Misleading comment: code returns, does not raise.

The comment states "raise the exception" but the code returns the exception object to the caller. The scheduler is responsible for propagating it via set_exception.

📝 Suggested fix
-        return failed_lst[0]  # raise the exception only after all futures have failed
+        return failed_lst[0]  # return the exception to propagate after all futures have failed
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

        return failed_lst[0]  # return the exception to propagate after all futures have failed
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/executorlib/standalone/batched.py` at line 35, The comment currently says
"raise the exception" but the code returns failed_lst[0]; update the comment
(near the return failed_lst[0] statement in the function that aggregates failed
futures) to accurately state that the exception object is returned to the caller
for the scheduler to propagate (e.g., "return the exception object to be
propagated by the scheduler") rather than saying it is raised; do not change
behavior.
src/executorlib/task_scheduler/interactive/dependency.py (1)

351-356: ⚠️ Potential issue | 🔴 Critical | ⚡ Quick win

Critical: set_result and set_exception calls are swapped.

The type-based dispatch interprets batched_futures return values incorrectly. Based on the implementation in batched.py:

  • Line 33 returns a list of results (e.g., [1, 2, 3]) on success
  • Line 35 returns a single exception object when all futures fail
  • Line 37 returns [] when incomplete

Current scheduler logic (INCORRECT):

  1. Line 351: Empty list [] → keep waiting ✓
  2. Line 353-354: Non-list (exception) → set_result(done_lst)Sets exception as result!
  3. Line 355-356: Non-empty list (results) → set_exception(done_lst)Raises results list as exception!

Concrete failure examples:

Example 1: Successful batch raises as exception

# 3 futures complete successfully
batched_futures(...) → [1, 2, 3]
# Scheduler line 351: isinstance([1,2,3], list) and len == 0 → False
# Scheduler line 353: not isinstance([1,2,3], list) → False  
# Scheduler line 355: else → set_exception([1,2,3])
# Result: future.result() raises [1,2,3] as an exception! ✗

Example 2: Exception returned as result

# All futures fail
batched_futures(...) → RuntimeError("failed")
# Scheduler line 351: isinstance(exc, list) → False
# Scheduler line 353: not isinstance(exc, list) → True → set_result(exc)
# Result: future.result() returns the exception object, doesn't raise it! ✗
🐛 Proposed fix
             done_lst = batched_futures(
                 lst=task_wait_dict["kwargs"]["lst"],
                 n=task_wait_dict["kwargs"]["n"],
                 nested_skip_lst=task_wait_dict["kwargs"]["skip_lst"],
             )
             if isinstance(done_lst, list) and len(done_lst) == 0:
                 wait_tmp_lst.append(task_wait_dict)
-            elif not isinstance(done_lst, list):
-                task_wait_dict["future"].set_result(done_lst)
-            else:
+            elif isinstance(done_lst, list):
+                task_wait_dict["future"].set_result(done_lst)
+            else:
                 task_wait_dict["future"].set_exception(done_lst)
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/executorlib/task_scheduler/interactive/dependency.py` around lines 351 -
356, The branch handling for batched results in the scheduler is inverted: when
batched_futures returns a non-empty list (success) you must call
task_wait_dict["future"].set_result(done_lst), and when it returns a non-list
exception object you must call task_wait_dict["future"].set_exception(done_lst);
keep the existing empty-list (len==0) behavior that appends to wait_tmp_lst.
Update the code around the done_lst checks in the block that manipulates
task_wait_dict (the three-way if/elif/else using isinstance(done_lst, list)) to
swap the set_result and set_exception calls accordingly, referencing
task_wait_dict and done_lst so successful batches deliver results and exception
objects are raised.
tests/unit/standalone/test_batched.py (1)

18-18: ⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Type mismatch: passing set() instead of empty list.

The batched_futures signature expects nested_skip_lst: list[Future[list]], but these test calls pass set(). While this works by accident (iterating an empty set produces no items), it violates the type contract.

🔧 Proposed fix
-        self.assertEqual(batched_futures(lst=lst, n=3, nested_skip_lst=set()), [0, 1, 2])
+        self.assertEqual(batched_futures(lst=lst, n=3, nested_skip_lst=[]), [0, 1, 2])

Apply the same change to lines 34 and 43.

Also applies to: 34-34, 43-43

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@tests/unit/standalone/test_batched.py` at line 18, The tests call
batched_futures(lst=..., n=3, nested_skip_lst=set()), but batched_futures
expects nested_skip_lst: list[Future[list]]; replace the incorrect set()
arguments with an empty list [] in all occurrences (the failing calls at the
current test lines and the other two instances noted) so the tests match the
declared parameter type for batched_futures.

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
src/executorlib/standalone/batched.py (1)

24-37: ⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Don't return [] after the batch has reached a terminal state.

len(failed_lst) == len(lst) only covers the "everything failed and nothing was skipped" case. After earlier batches have populated nested_skip_lst, or when some inputs succeed and the rest fail, this can still return [] even though every unskipped future is already done and no further results can arrive. _update_waiting_task() treats [] as "keep waiting", so the batched task can hang forever. Please base the terminal check on the remaining unskipped futures, not the original len(lst).

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/executorlib/standalone/batched.py` around lines 24 - 37, The
terminal-state check is wrong: don't compare failed_lst against len(lst) because
some items may be skipped; instead compute the number of unskipped futures
(remaining = len(lst) - len(skip_set) or use n_expected) and treat the batch as
terminal when done_lst.size + failed_lst.size == remaining; return done_lst when
done_lst.size == remaining, or return the first exception (or appropriate
aggregate) when failed_lst.size == remaining; update the condition that
currently reads "if len(failed_lst) == len(lst) and len(failed_lst) > 0" to use
remaining/unskipped counts (n_expected or len(lst)-len(skip_set)) so
_update_waiting_task() doesn't interpret a terminal state as "keep waiting".
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@src/executorlib/standalone/batched.py`:
- Line 6: The return-shape contract in src/executorlib/standalone/batched.py was
inverted vs _update_waiting_task: change the function (signature currently ")->
list[list] | BaseException") so that success produces a non-list result (e.g.,
the expected success value or None) and failures produce a non-empty list of
exception(s); update the annotation and every return site in this function
(including the returns around the block corresponding to lines 32-35 in the
diff) to return a list[list] for failure and a non-list for success so it aligns
with _update_waiting_task's handling, or if you prefer the other approach, apply
the inverse change to _update_waiting_task so it treats BaseException returns as
failures—prefer changing batched.py to match existing _update_waiting_task
behavior.

---

Outside diff comments:
In `@src/executorlib/standalone/batched.py`:
- Around line 24-37: The terminal-state check is wrong: don't compare failed_lst
against len(lst) because some items may be skipped; instead compute the number
of unskipped futures (remaining = len(lst) - len(skip_set) or use n_expected)
and treat the batch as terminal when done_lst.size + failed_lst.size ==
remaining; return done_lst when done_lst.size == remaining, or return the first
exception (or appropriate aggregate) when failed_lst.size == remaining; update
the condition that currently reads "if len(failed_lst) == len(lst) and
len(failed_lst) > 0" to use remaining/unskipped counts (n_expected or
len(lst)-len(skip_set)) so _update_waiting_task() doesn't interpret a terminal
state as "keep waiting".
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: ff106972-5ca8-48ca-92a7-7c45a5895d71

📥 Commits

Reviewing files that changed from the base of the PR and between bdf197e and 87d38ca.

📒 Files selected for processing (1)
  • src/executorlib/standalone/batched.py

Comment thread src/executorlib/standalone/batched.py Outdated
@jan-janssen jan-janssen linked an issue Jun 11, 2026 that may be closed by this pull request

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@tests/unit/standalone/test_batched.py`:
- Line 38: The test function test_batched_futures is defined twice, causing the
later definition to override the earlier one; rename the duplicate (the second
occurrence) to a unique name (e.g., test_batched_futures_with_<scenario>) or
merge its assertions into the original so both behaviors are tested; update any
references or fixtures used by the renamed function to match the new name.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 1684885a-df3b-4bb8-a47c-bc9e9ec3622f

📥 Commits

Reviewing files that changed from the base of the PR and between 87d38ca and a1cd6ce.

📒 Files selected for processing (3)
  • src/executorlib/standalone/batched.py
  • src/executorlib/task_scheduler/interactive/dependency.py
  • tests/unit/standalone/test_batched.py
🚧 Files skipped from review as they are similar to previous changes (1)
  • src/executorlib/standalone/batched.py

Comment thread tests/unit/standalone/test_batched.py
@jan-janssen jan-janssen changed the title [Test] Add duplicated test for batched [Feature] Check which future objects are skipped and keep track of their IDs Jun 11, 2026
@jan-janssen jan-janssen merged commit 04f2983 into main Jun 11, 2026
36 checks passed
@jan-janssen jan-janssen deleted the batched_duplicate branch June 11, 2026 10:29
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[Bug] Batching for failed futures

2 participants