changefeed: create kafka topics if requested #171139
Conversation
|
Merging to
After your PR is submitted to the merge queue, this comment will be automatically updated with its status. If the PR fails, failure details will also be posted here |
3ab1568 to
033b986
Compare
| opt.set(tn) | ||
| } | ||
| tn := newTopicNamer(opts...) | ||
| tn.join = '.' |
There was a problem hiding this comment.
Bug: tn.join = '.' unconditionally overwrites whatever WithJoinByte set via newTopicNamer(opts...) on line 127. Previously, the default '.' was set in the struct literal before options were applied, so WithJoinByte('+') would override it. Now the order is reversed — options run first, then this line clobbers them.
This breaks sink_cloudstorage.go:435 which calls MakeTopicNamer(targets, WithJoinByte('+')) — the cloud storage sink will silently use '.' instead of '+' as the column-family separator in file paths (e.g. table.family instead of table+family).
| tn.join = '.' | |
| tn.join = '.' |
Suggested fix: set the default join inside newTopicNamer before the options loop:
func newTopicNamer(opts ...TopicNameOption) *TopicNamer {
tn := &TopicNamer{join: '.'}
for _, opt := range opts {
opt.set(tn)
}
return tn
}and remove tn.join = '.' from both line 128 and line 151.
| opts ...TopicNameOption, | ||
| ) ([]string, error) { | ||
| tn := newTopicNamer(opts...) | ||
| tn.join = '.' |
There was a problem hiding this comment.
Same issue as line 128: tn.join = '.' overwrites any WithJoinByte value set by newTopicNamer(opts...). Not triggered by current callers, but latently wrong — if any future caller passes WithJoinByte here it will be silently ignored. Remove this line and set the default inside newTopicNamer instead.
AI Review: Potential Issue(s) DetectedInline comments have been added to the relevant lines in Summary: The refactoring of This breaks The same pattern exists at line 151 in If helpful: add |
57929e0 to
688ada0
Compare
|
Detected infrastructure failure (matched: self-hosted runner lost communication with the server). Automatically rerunning failed jobs. (run link) |
APIs Add ApiVersions, ValidateCreateTopics, and CreateTopics to the KafkaAdminClientV2 interface so that follow-on work can pre-create Kafka topics through the admin client. This commit only widens the interface and regenerates the gomock Part of: cockroachdb#155157 Release note: None
This change allows creating kafka topics when requested, previously users had to rely on kafka cluster for auto topic creation, now with `create_kafka_topics='explicit'` the changefeed creates topics via the kafka admin API. `create_kafka_topics='broker_auto'` (default), results in auto topic creation enabled on kafka cluster and `create_kafka_topics='off'` disables both options. Fixes: cockroachdb#155157 Release note (general change): The `CREATE CHANGFEED` statement now allows `create_kafka_topics` with options `explicit`, `off` and `broker_auto`(default). The `broker_auto` is default behavior which relies on kafka cluster to create kafka topics, when set to `explicit`, the changefeed creates the topics, and `off` disables both.
688ada0 to
73efcc9
Compare
aerfrei
left a comment
There was a problem hiding this comment.
Looks good, exciting to see the auto topic creation. I left this review to stuff that's a little more structural which I think might help with some of the more cosmetic changes I had in mind for the tests
| if knobs != nil { | ||
| kafkaKnobs = knobs.KafkaSinkV2Knobs | ||
| } | ||
| if err := maybeCreateKafkaTopics(ctx, execCfg, details, targets, schemaTS, kafkaKnobs); err != nil { |
There was a problem hiding this comment.
I don't think that we want to be calling kafka topic creation right here before starting the distributed changefeed. The ideal would be if we could call this from inside the sink when the sink starts up.
This makes me think of something else: this should happen if a user adds a target in an alter changefeed too. I think that that's true as is because this will run when that altered changefeed is resumed, but a test for that edge case would be helpful.
| topicsForConnectionCheck []string, | ||
| constHeaders map[string][]byte, | ||
| partitionAlg string, | ||
| createTopics changefeedbase.CreateKafkaTopics, |
There was a problem hiding this comment.
I don't think we want to pass a changefeedOption into newKafkaSinkClientV2. Seems to me like we're using that option to determine what kgo options to specify (kgo.AllowAutoTopicCreation()) and I think we should be able to pass those in via clientOpts.
| if err != nil { | ||
| return err | ||
| } | ||
| if !isKafkaSink(parsedSinkURL) { |
There was a problem hiding this comment.
I think if we move the kafka topic creation into the kafka v2 sink, we should be able to avoid a lot of the checks in this function.
This change allows creating kafka topics when
requested, previously users had to rely on kafka cluster
for auto topic creation, now with
create_kafka_topics='explicit'the changefeed creates topics via the kafka admin API.
create_kafka_topics='broker_auto'(default), results in auto topiccreation enabled on kafka cluster and
create_kafka_topics='off'disables both options.
Fixes: #155157
Release note (general change): The
CREATE CHANGFEEDstatement nowallows
create_kafka_topicswith optionsexplicit,offandbroker_auto(default). Thebroker_autois default behavior whichrelies on kafka cluster to create kafka topics, when set to
explicit,the changefeed creates the topics, and
offdisables both.