Skip to content

CAMEL-23336: camel-mongodb - Expose Resume Token (Change Streams Consumer)#22674

Merged
davsclaus merged 5 commits into
apache:mainfrom
Michwo:feat-camel-mongodb-cs-resume-token
Jun 24, 2026
Merged

CAMEL-23336: camel-mongodb - Expose Resume Token (Change Streams Consumer)#22674
davsclaus merged 5 commits into
apache:mainfrom
Michwo:feat-camel-mongodb-cs-resume-token

Conversation

@Michwo

@Michwo Michwo commented Apr 17, 2026

Copy link
Copy Markdown
Contributor

Description

Adds option to configure the change streams resume token in change streams consumer endpoint (property resumeToken) and exposes the current resume token in exchange header (MongoDBConstants.RESUME_TOKEN).

@github-actions

Copy link
Copy Markdown
Contributor

🌟 Thank you for your contribution to the Apache Camel project! 🌟
🤖 CI automation will test this PR automatically.

🐫 Apache Camel Committers, please review the following items:

  • First-time contributors require MANUAL approval for the GitHub Actions to run
  • You can use the command /component-test (camel-)component-name1 (camel-)component-name2.. to request a test from the test bot although they are normally detected and executed by CI.
  • You can label PRs using skip-tests and test-dependents to fine-tune the checks executed by this PR.
  • Build and test logs are available in the summary page. Only Apache Camel committers have access to the summary.

⚠️ Be careful when sharing logs. Review their contents before sharing them publicly.

@Michwo Michwo marked this pull request as draft April 17, 2026 12:35
@oscerd

oscerd commented Apr 17, 2026

Copy link
Copy Markdown
Contributor

Please run a full mvn clean install -DskipTests from root folder.

@davsclaus

Copy link
Copy Markdown
Contributor

you need to build the source code for the entire project as when you add new options there are code changed elsewhere

modified:   catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/mongodb.json
modified:   components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbConstants.java
modified:   dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/MongoDbEndpointBuilderFactory.java

@Michwo

Michwo commented Apr 17, 2026

Copy link
Copy Markdown
Contributor Author

you need to build the source code for the entire project as when you add new options there are code changed elsewhere

modified:   catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/mongodb.json
modified:   components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbConstants.java
modified:   dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/MongoDbEndpointBuilderFactory.java

Hi, thanks for the info.

I think my problem is that I've done the build on Windows and it messed up all the line endings in generated source (over 3k files changed). I've commited the changes inside components/camel-mongodb but that seems not enough.

I will try to run the build on linux next week.

@Croway

Croway commented Apr 20, 2026

Copy link
Copy Markdown
Contributor

Hi @Michwo I think that this feature can be implemented with Camel's resume strategy (ResumeAware Consumer). Using the resume strategy, the Camel route would look like:

from("mongodb:myDb?consumerType=changeStreams&database=mydb&collection=mycol")
      .resumable(myStategy)
      .process(this::process);

The resumeToken handling will be offloaded to the framework, and hopefully it will be easier to use.

The ResumeAware implementation can be done in another PR, no need to change the current implementation, but please open an issue about it.

fyi @orpiske

@github-actions

github-actions Bot commented Apr 21, 2026

Copy link
Copy Markdown
Contributor

🧪 CI tested the following changed modules:

  • catalog/camel-catalog
  • components/camel-mongodb
  • core/camel-core-engine
  • dsl/camel-endpointdsl
  • dsl/camel-kamelet-main

ℹ️ Dependent modules were not tested because the total number of affected modules exceeded the threshold (50). Use the test-dependents label to force testing all dependents.

Build reactor — dependencies compiled but only changed modules were tested (5 modules)
  • Camel :: Catalog :: Camel Catalog
  • Camel :: Core Engine
  • Camel :: Endpoint DSL
  • Camel :: Kamelet Main
  • Camel :: MongoDB

⚙️ View full build and test results

@orpiske orpiske left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for this contribution. However, I think this this could be implemented using our Resume Strategies API.

@Michwo

Michwo commented Apr 23, 2026

Copy link
Copy Markdown
Contributor Author

Thanks for this contribution. However, I think this this could be implemented using our Resume Strategies API.

Hi @Michwo I think that this feature can be implemented with Camel's resume strategy (ResumeAware Consumer). Using the resume strategy, the Camel route would look like:

from("mongodb:myDb?consumerType=changeStreams&database=mydb&collection=mycol")
      .resumable(myStategy)
      .process(this::process);

The resumeToken handling will be offloaded to the framework, and hopefully it will be easier to use.

The ResumeAware implementation can be done in another PR, no need to change the current implementation, but please open an issue about it.

fyi @orpiske

Should I then abandon this Issue/PR and try to implement the resume strategy for camel-mongodb?

@orpiske

orpiske commented Apr 23, 2026

Copy link
Copy Markdown
Contributor

Should I then abandon this Issue/PR and try to implement the resume strategy for camel-mongodb?

I think either way is fine. You are free to pick the way you find more convenient.

Personally, I'd prefer to reuse the PR, so we have the history of the conversation (specially because the idea IS good - we'd like to have this on the code - it's just the implementation that needs to be tweaked to fit into what we already have).

@Michwo Michwo closed this Jun 19, 2026
@Michwo Michwo force-pushed the feat-camel-mongodb-cs-resume-token branch from e805484 to 357f397 Compare June 19, 2026 12:53
@Michwo

Michwo commented Jun 23, 2026

Copy link
Copy Markdown
Contributor Author

I've finally found some time to work on that again - I've reimplemented this like @orpiske and @Croway have suggested, as an resume strategy eip.

I used the camel-kafka's implementation as a template for this.

Changes included:

  • MongoDB change streams now work with the Resume API/EIP.
  • Added MongoDB resume strategy + configuration builder support.
  • Resume token (offset) key is based on route id and collection (with endpoint fallback).
    This was a decision based on the way WatchStreamConsumer is configured - more consumers per collection possible watching different changes based on BSON filter - therefore each consumer endpoint should be able to ingest the changes separately.
  • Resume tokens are stored as JSON strings
  • resumable completion updates the strategy from exchange offset

Startup token priority: resume strategy token first, then endpoint/repository

Tests for the added code are included (value/integration).

The documentation is also updated to reflect the changes (camel-mongodb and Resume Strategies).

@Michwo Michwo reopened this Jun 23, 2026
@Michwo Michwo marked this pull request as ready for review June 23, 2026 14:05
@Michwo Michwo requested a review from orpiske June 23, 2026 14:46
@github-actions github-actions Bot added the core label Jun 24, 2026

@davsclaus davsclaus left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice work adding resume token support to the MongoDB change streams consumer — the dual approach (endpoint-level + resume strategy) is well designed and the test coverage is solid.

Two items worth looking at:

1. doStop() lifecycle ordering (medium): In MongoDbChangeStreamsConsumer.doStop(), the token repository is shut down before the change streams thread is stopped. The thread could still be in the middle of commitManager.commit()repo.setState() when the repo is already closed. For MemoryStateRepository this is harmless (just a HashMap), but for a persistent StateRepository it could throw or silently drop the final token. Consider stopping the repo after the thread and executor shutdown.

2. PR description mismatch (minor): The description says the resume token is exposed via MongoDBConstants.RESUME_TOKEN, but the code correctly uses Exchange.OFFSET with a MongoDbResumable wrapper (the standard Camel resume pattern). The description should be updated to match.

3. Duplicate test utility (nit): The identical InMemoryStringResumeCache inner class appears in both MongoDbResumeAdapterTest and MongoDbResumeStrategyTest — could be extracted to a shared test helper.

This review was generated by an AI agent and may contain inaccuracies. Please verify all suggestions before applying.

@davsclaus davsclaus left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All feedback from the prior review has been addressed:

  • doStop() lifecycle ordering — fixed: the token repository is now stopped after the thread and executor shutdown, preventing potential writes to a closed repo.
  • Duplicate InMemoryStringResumeCache — extracted to a shared support/ test utility.
  • Integration test — fixed to actually run.

The dual approach (endpoint-level changeStreamTokenRepository/changeStreamToken + route-level Resume Strategy via ResumeAware) is clean and well tested. This also addresses the earlier review from @orpiske requesting Resume Strategies API integration.

LGTM.

This review was generated by an AI agent and may contain inaccuracies. Please verify all suggestions before applying.

@davsclaus davsclaus merged commit db15e20 into apache:main Jun 24, 2026
6 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants