Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,7 @@ require (
github.com/stretchr/testify v1.11.1
github.com/twmb/franz-go v1.18.0
github.com/twmb/franz-go/pkg/kadm v1.11.0
github.com/twmb/franz-go/pkg/kmsg v1.9.0
github.com/twpayne/go-geom v1.4.2
github.com/xdg-go/pbkdf2 v1.0.0
github.com/xdg-go/scram v1.1.2
Expand Down Expand Up @@ -470,7 +471,6 @@ require (
github.com/tklauser/numcpus v0.3.0 // indirect
github.com/twitchtv/twirp v8.1.0+incompatible // indirect
github.com/twitchyliquid64/golang-asm v0.15.1 // indirect
github.com/twmb/franz-go/pkg/kmsg v1.9.0 // indirect
github.com/twpayne/go-kml v1.5.2 // indirect
github.com/ugorji/go/codec v1.2.11 // indirect
github.com/urfave/cli/v2 v2.3.0 // indirect
Expand Down
1 change: 1 addition & 0 deletions pkg/ccl/changefeedccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,7 @@ go_library(
"@com_github_twmb_franz_go//pkg/kgo",
"@com_github_twmb_franz_go//pkg/kversion",
"@com_github_twmb_franz_go_pkg_kadm//:kadm",
"@com_github_twmb_franz_go_pkg_kmsg//:kmsg",
"@com_google_cloud_go_pubsub//apiv1",
"@com_google_cloud_go_pubsub//apiv1/pubsubpb",
"@org_golang_google_api//impersonate",
Expand Down
7 changes: 7 additions & 0 deletions pkg/ccl/changefeedccl/changefeed_stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -1951,6 +1951,13 @@ func (b *changefeedResumer) resumeWithRetries(
}
}
}
var kafkaKnobs kafkaSinkV2Knobs
if knobs != nil {
kafkaKnobs = knobs.KafkaSinkV2Knobs
}
if err := maybeCreateKafkaTopics(ctx, execCfg, details, targets, schemaTS, kafkaKnobs); err != nil {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think that we want to be calling kafka topic creation right here before starting the distributed changefeed. The ideal would be if we could call this from inside the sink when the sink starts up.

This makes me think of something else: this should happen if a user adds a target in an alter changefeed too. I think that that's true as is because this will run when that altered changefeed is resumed, but a test for that edge case would be helpful.

return errors.Wrap(err, "failed to create kafka topics")
}
var err error
prevResult, err = startDistChangefeed(ctx, jobExec, jobID, schemaTS, details, description,
initialHighWater, progress, resolvedSpans, prevResult, startedCh, onTracingEvent, targets)
Expand Down
87 changes: 87 additions & 0 deletions pkg/ccl/changefeedccl/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdctest"
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase"
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/kvevent"
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/mocks"
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/resolvedspan"
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/schemafeed/schematestutils"
_ "github.com/cockroachdb/cockroach/pkg/ccl/multiregionccl" // locality-related table mutations
Expand Down Expand Up @@ -106,9 +107,13 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/cockroachdb/errors"
"github.com/dustin/go-humanize"
"github.com/golang/mock/gomock"
"github.com/lib/pq"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/twmb/franz-go/pkg/kadm"
"github.com/twmb/franz-go/pkg/kerr"
"github.com/twmb/franz-go/pkg/kgo"
)

var testServerRegion = "us-east-1"
Expand Down Expand Up @@ -13849,6 +13854,88 @@ func TestSinkClosedOnEventConsumerError(t *testing.T) {
})
}

// TestChangefeedCreateKafkaTopicsExplicit verifies that the resumer of a
// changefeed created with `create_kafka_topics='explicit'` invokes the kafka
// admin client's CreateTopics for each watched topic before emitting rows.
func TestChangefeedCreateKafkaTopicsExplicit(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx := context.Background()
ctrl := gomock.NewController(t)
kafkaClient := mocks.NewMockKafkaClientV2(ctrl)
adminClient := mocks.NewMockKafkaAdminClientV2(ctrl)
kafkaClient.EXPECT().Close().AnyTimes()

const expectedTopic = "pre-foo"
var topicsCreated atomic.Bool
topicCreatedCh := make(chan struct{})
adminClient.EXPECT().ListTopics(gomock.Any(), expectedTopic).DoAndReturn(
func(ctx context.Context, topics ...string) (kadm.TopicDetails, error) {
require.Equal(t, []string{expectedTopic}, topics)
detail := kadm.TopicDetail{
Topic: expectedTopic,
Partitions: map[int32]kadm.PartitionDetail{
0: {Topic: expectedTopic, Partition: 0, Leader: 0, Replicas: []int32{0}, ISR: []int32{0}},
},
}
if !topicsCreated.Load() {
detail.Err = kerr.UnknownTopicOrPartition
detail.Partitions = nil
}
return kadm.TopicDetails{expectedTopic: detail}, nil
},
).AnyTimes()
adminClient.EXPECT().ValidateCreateTopics(gomock.Any(), int32(-1), int16(-1), nil, expectedTopic).Return(kadm.CreateTopicResponses{
expectedTopic: {Topic: expectedTopic},
}, nil)
adminClient.EXPECT().CreateTopics(gomock.Any(), int32(-1), int16(-1), nil, expectedTopic).DoAndReturn(
func(ctx context.Context, partitions int32, replicationFactor int16, configs map[string]*string, topics ...string) (kadm.CreateTopicResponses, error) {
require.Equal(t, []string{expectedTopic}, topics)
if topicsCreated.CompareAndSwap(false, true) {
close(topicCreatedCh)
}
return kadm.CreateTopicResponses{
expectedTopic: {Topic: expectedTopic, NumPartitions: 1, ReplicationFactor: 1},
}, nil
},
)

s, cleanup := makeServer(t, feedTestNoTenants, withKnobsFn(func(knobs *base.TestingKnobs) {
if knobs.DistSQL == nil {
knobs.DistSQL = &execinfra.TestingKnobs{}
}
if knobs.DistSQL.(*execinfra.TestingKnobs).Changefeed == nil {
knobs.DistSQL.(*execinfra.TestingKnobs).Changefeed = &TestingKnobs{}
}
cfKnobs := knobs.DistSQL.(*execinfra.TestingKnobs).Changefeed.(*TestingKnobs)
cfKnobs.KafkaSinkV2Knobs = kafkaSinkV2Knobs{
OverrideClient: func(opts []kgo.Opt) (KafkaClientV2, KafkaAdminClientV2) {
return kafkaClient, adminClient
},
SkipCreateTopicVersionCheck: true,
}
}))
defer cleanup()

execCfg := s.Server.ExecutorConfig().(sql.ExecutorConfig)
KafkaV2Enabled.Override(ctx, &execCfg.Settings.SV, true)
sqlDB := sqlutils.MakeSQLRunner(s.DB)
sqlDB.Exec(t, `CREATE TABLE foo (a INT PRIMARY KEY)`)

var jobID int
sqlDB.QueryRow(t,
`CREATE CHANGEFEED FOR foo INTO 'kafka://localhost:9092?topic_prefix=pre-' WITH initial_scan = 'no', create_kafka_topics = 'explicit'`,
).Scan(&jobID)
defer sqlDB.Exec(t, `CANCEL JOB $1`, jobID)

select {
case <-topicCreatedCh:
case <-time.After(30 * time.Second):
t.Fatal("timed out waiting for changefeed resumer to create kafka topics")
}
}

// closeTrackingSink wraps a Sink and counts Dial and Close calls.
type closeTrackingSink struct {
wrapped Sink
Expand Down
55 changes: 55 additions & 0 deletions pkg/ccl/changefeedccl/mocks/kafka_admin_v2_generated.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 9 additions & 1 deletion pkg/ccl/changefeedccl/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -267,9 +267,17 @@ func getSink(
return nil, err
}
if KafkaV2Enabled.Get(&serverCfg.Settings.SV) {
var kafkaKnobs kafkaSinkV2Knobs
if knobs, ok := serverCfg.TestingKnobs.Changefeed.(*TestingKnobs); ok {
kafkaKnobs = knobs.KafkaSinkV2Knobs
}
createTopics, err := opts.GetCreateKafkaTopics()
if err != nil {
return nil, err
}
return makeKafkaSinkV2(ctx, &changefeedbase.SinkURL{URL: u}, targets, sinkOpts,
numSinkIOWorkers(serverCfg), newCPUPacerFactory(ctx, serverCfg), timeutil.DefaultTimeSource{},
serverCfg.Settings, metricsBuilder, kafkaSinkV2Knobs{})
serverCfg.Settings, metricsBuilder, kafkaKnobs, createTopics)
} else {
return makeKafkaSink(ctx, &changefeedbase.SinkURL{URL: u}, targets, sinkOpts, serverCfg.Settings, metricsBuilder)
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/ccl/changefeedccl/sink_kafka_connection_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -416,7 +416,8 @@ func TestAzureKafkaDefaults(t *testing.T) {

assertExpectedKgoOpts := func(exp expectation, opts []kgo.Opt) {
sinkClient, err := newKafkaSinkClientV2(ctx, opts, sinkBatchConfig{},
"", cluster.MakeTestingClusterSettings(), kafkaSinkV2Knobs{}, nilMetricsRecorderBuilder, nil, nil, "" /* partitionAlg */)
"", cluster.MakeTestingClusterSettings(), kafkaSinkV2Knobs{}, nilMetricsRecorderBuilder, nil,
nil, "" /* partitionAlg */, changefeedbase.CreateKafkaTopicsBrokerAuto)
require.NoError(t, err)
defer func() { require.NoError(t, sinkClient.Close()) }()
client := sinkClient.client.(*kgo.Client)
Expand Down
Loading
Loading