Skip to content

Support consuming N messages from channel in one command#600

Open
pradhankukiran wants to merge 3 commits into
indeedeng:mainfrom
pradhankukiran:feature/issue-301-channel-consume-n-messages
Open

Support consuming N messages from channel in one command#600
pradhankukiran wants to merge 3 commits into
indeedeng:mainfrom
pradhankukiran:feature/issue-301-channel-consume-n-messages

Conversation

@pradhankukiran
Copy link
Copy Markdown

Description

Implements server-side support for AtLeast/AtMost fields on InterStateChannelCommand (fields already exist in IDL but were unused). Enables three consumption modes:

  • Exact N: AtLeast=N, AtMost=N — wait for exactly N, consume exactly N
  • OneToAll: AtLeast=1 (no AtMost) — wait for at least 1, consume all available
  • ZeroToAll: AtLeast=0 (no AtMost) — don't wait, consume all available

Messages are consumed atomically. Gated behind global version 11 for backward compat with existing workflows.

Note: The Values field on InterStateChannelResult was added manually to the generated model. A corresponding update to iwf-idl/iwf.yaml is needed so make idl-code-gen produces the same result.

Checklist

  • Code compiles correctly
  • Tests for the changes have been added
  • All tests passing
  • This PR change is backwards-compatible
  • This PR CONTAINS a (planned) breaking change (it is not backwards compatible)

Related Issue

Closes #301

@longquanzheng
Copy link
Copy Markdown
Contributor

thanks for contribution. Can you add test to make sure it works?

@codecov
Copy link
Copy Markdown

codecov Bot commented Mar 18, 2026

Codecov Report

❌ Patch coverage is 45.74468% with 51 lines in your changes missing coverage. Please review.
✅ Project coverage is 65.37%. Comparing base (ed9f8c2) to head (5ee0e69).
⚠️ Report is 1 commits behind head on main.

Files with missing lines Patch % Lines
service/interpreter/workflowImpl.go 46.00% 16 Missing and 11 partials ⚠️
service/interpreter/InternalChannel.go 15.00% 17 Missing ⚠️
service/interpreter/deciderTriggerer.go 61.11% 3 Missing and 4 partials ⚠️
Additional details and impacted files

Impacted file tree graph

@@            Coverage Diff             @@
##             main     #600      +/-   ##
==========================================
- Coverage   65.70%   65.37%   -0.33%     
==========================================
  Files          64       64              
  Lines        6461     6542      +81     
==========================================
+ Hits         4245     4277      +32     
- Misses       1893     1929      +36     
- Partials      323      336      +13     
Files with missing lines Coverage Δ
service/interfaces.go 85.71% <ø> (ø)
service/interpreter/continueAsNewer.go 87.16% <100.00%> (+0.08%) ⬆️
service/interpreter/globalVersioner.go 89.47% <100.00%> (+0.58%) ⬆️
service/interpreter/deciderTriggerer.go 78.84% <61.11%> (-7.65%) ⬇️
service/interpreter/InternalChannel.go 65.45% <15.00%> (-28.84%) ⬇️
service/interpreter/workflowImpl.go 84.12% <46.00%> (-2.69%) ⬇️
🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

@pradhankukiran pradhankukiran force-pushed the feature/issue-301-channel-consume-n-messages branch from 5ee0e69 to 6a37310 Compare March 25, 2026 10:52
@pradhankukiran pradhankukiran force-pushed the feature/issue-301-channel-consume-n-messages branch from 6a37310 to 1bd86db Compare March 28, 2026 12:16
@pradhankukiran
Copy link
Copy Markdown
Author

@longquanzheng added integration tests covering all 4 consumption modes (ExactN, OneToAll, ZeroToAll, AtMostOnly) across both Temporal and Cadence backends, with and without ContinueAsNew.

Also fixed a bug where AtLeast=0, AtMost=1 would panic on an empty channel, Retrieve() doesn't handle empty channels, so the code now routes through the safe RetrieveUpToN path when atLeast == 0.

commandRequest iwfidl.CommandRequest,
completedTimerCommands map[int]service.InternalTimerStatus,
completedSignalCommands, completedInterStateChannelCommands map[int]*iwfidl.EncodedObject,
completedInterStateChannelMultiCmds map[int][]*iwfidl.EncodedObject,
Copy link
Copy Markdown
Contributor

@longquanzheng longquanzheng May 28, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do you think we can just change completedInterStateChannelCommands to be map[int][]*iwfidl.EncodedObject?
Or that's a breaking change that will break the determinism?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changing completedInterStateChannelCommands to map[int][]*EncodedObject would break continue-as-new compatibility. Existing snapshots have object values, not arrays, so old waiting workflows may fail unmarshal/replay. Separate CompletedInterStateChannelMultiCmds keeps old snapshot shape intact.

@pradhankukiran
Copy link
Copy Markdown
Author

I am adding a follow-up commit. Plain is to keep CompletedInterStateChannelCommands as-is for continue-as-new compatibility, add InterStateChannelResult.values through IDL/codegen, validate invalid atLeast/atMost, and add focused unit coverage for the new channel/decider paths.
I’ll update generated Go from the IDL locally, but the canonical iwf-idl change may need a separate PR/update in the IDL repo.

@longquanzheng
Copy link
Copy Markdown
Contributor

I am adding a follow-up commit. Plain is to keep CompletedInterStateChannelCommands as-is for continue-as-new compatibility, add InterStateChannelResult.values through IDL/codegen, validate invalid atLeast/atMost, and add focused unit coverage for the new channel/decider paths. I’ll update generated Go from the IDL locally, but the canonical iwf-idl change may need a separate PR/update in the IDL repo.

yes, please open a separate PR in idl repo and merge in that first.

Comment on lines +1187 to +1193
return fmt.Errorf("InterStateChannelCommand atLeast cannot be negative")
}
if cmd.HasAtMost() && cmd.GetAtMost() < 0 {
return fmt.Errorf("InterStateChannelCommand atMost cannot be negative")
}
if cmd.HasAtLeast() && cmd.HasAtMost() && cmd.GetAtMost() < cmd.GetAtLeast() {
return fmt.Errorf("InterStateChannelCommand atMost cannot be less than atLeast")
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

returning plain error here will cause workflow task running in a loop. we can return application erorr to fail the workflow directly.

Comment thread service/interfaces.go
@@ -160,6 +160,9 @@ type (
CompletedTimerCommands map[int]InternalTimerStatus `json:"completedTimerCommands"`
CompletedSignalCommands map[int]*iwfidl.EncodedObject `json:"completedSignalCommands"`
CompletedInterStateChannelCommands map[int]*iwfidl.EncodedObject `json:"completedInterStateChannelCommands"`
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you leave a comment here? are we deprecating that or it's still being used?

stateExecutionLocal = startResponse.GetUpsertStateLocals()
}

if globalVersioner.IsAfterVersionOfChannelConsumeN() {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you explain a bit more why this is necssary here?


if received {
completedInterStateChannelCmds[idx] = interStateChannel.Retrieve(cmd.ChannelName)
for {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we need a for loop? or maybe I missed something(sorry btw I could be wrong, haven't touched the code for a while)

Comment on lines +813 to +816
if len(values) > 0 {
completedInterStateChannelCmds[idx] = values[0]
} else {
completedInterStateChannelMultiCmds[idx] = values
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel like it's better not to always backfill the legacy field for single cmd? this means activity input will have duplicate data which cause the history size grow faster.

Since SDK need to change to support this anyway, we should change the SDK to only use the new field only.

For compatibility of old sdk, it will not send the mult-command, server will still need to check if it's single command, then fallback to the old code path and use the single command completed fields.

@pradhankukiran
Copy link
Copy Markdown
Author

@longquanzheng Opened the IDL PR here: indeedeng/iwf-idl#91

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Support get (up to) N or all messages from channel in one command

2 participants