Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Kafka connector preview #770

Merged
merged 3 commits into from
Mar 21, 2024
Merged

Kafka connector preview #770

merged 3 commits into from
Mar 21, 2024

Conversation

sravotto
Copy link
Contributor

@sravotto sravotto commented Mar 15, 2024

This PR consists of 3 separate commits:

  • hlc: adding range.Contains function: adds a utility function to check if a range contains a given timestamp.
  • Kafka connector preview: adds a source connector that consumes CockroachDB changefeeds send through a Kafka cluster. The connector supports JSON message, and currently uses immediate mode.
  • Kafka connector integration test: verifies that change events originated by a CockroachDB changefeed, and routed via a single node Kafka cluster are received by the connector and applied to a target CockroachDB datatabase.

This change is Reviewable

@codecov-commenter
Copy link

codecov-commenter commented Mar 15, 2024

Codecov Report

Attention: Patch coverage is 58.18966% with 97 lines in your changes are missing coverage. Please review.

Project coverage is 77.01%. Comparing base (b29765e) to head (8986e9c).

Files Patch % Lines
internal/source/kafka/conn.go 42.10% 31 Missing and 2 partials ⚠️
internal/source/kafka/config.go 45.76% 20 Missing and 12 partials ⚠️
internal/source/kafka/consumer.go 72.72% 14 Missing and 7 partials ⚠️
internal/source/kafka/provider.go 77.77% 3 Missing and 3 partials ⚠️
internal/cmd/kafka/kafka.go 62.50% 2 Missing and 1 partial ⚠️
internal/source/kafka/kafka.go 0.00% 2 Missing ⚠️

❗ Your organization needs to install the Codecov GitHub app to enable full functionality.

Additional details and impacted files
@@            Coverage Diff             @@
##           master     #770      +/-   ##
==========================================
- Coverage   78.42%   77.01%   -1.41%     
==========================================
  Files         201      207       +6     
  Lines        9769    10001     +232     
==========================================
+ Hits         7661     7702      +41     
- Misses       1440     1616     +176     
- Partials      668      683      +15     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

@sravotto sravotto marked this pull request as ready for review March 15, 2024 13:54
Copy link
Member

@BramGruneir BramGruneir left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall, this is great, bit it needs a bit more work.

Also, I get that this is just the first pass here, but you're going to be limited by running only a single instance of replicator. Ideally, we put the topics in staging and take out a lease against them to allow for more than one replicator instance to push data.
I can see how this can be easily be done by running multiple instances of ConsumeClaim. Or we can use the mark function to determine if a value has been already processed. Lots of options here.

Reviewed 2 of 2 files at r1, 13 of 13 files at r2, 4 of 4 files at r3, all commit messages.
Reviewable status: all files reviewed, 21 unresolved discussions (waiting on @bobvawter and @sravotto)


-- commits line 11 at r2:
sent not send


internal/source/kafka/config.go line 74 at r2 (raw file):

	f.IntVar(&c.batchSize, "batchSize", 100, "messages to accumulate before committing to the target")
	f.StringArrayVar(&c.brokers, "broker", nil, "address of Kafka broker(s)")
	f.StringVar(&c.from, "from", "", "accept messages at or newer than this timestamp")

From and To or Min and Max? Also, perhaps it's worth adding Timestamp to the end of it for clarity: FromTimestamp or MinTimestamp

I think Min and Max might be better.

Just because from could be from where or from when?


internal/source/kafka/config.go line 74 at r2 (raw file):

	f.IntVar(&c.batchSize, "batchSize", 100, "messages to accumulate before committing to the target")
	f.StringArrayVar(&c.brokers, "broker", nil, "address of Kafka broker(s)")
	f.StringVar(&c.from, "from", "", "accept messages at or newer than this timestamp")

only accept messages with timestamps at or newer than this timestamp, this is an inclusive lower limit


internal/source/kafka/config.go line 76 at r2 (raw file):

	f.StringVar(&c.from, "from", "", "accept messages at or newer than this timestamp")
	f.StringVar(&c.group, "group", "", "the Kafka consumer group id")
	f.BoolVar(&c.oldest, "oldest", false, "start from the oldest message available")

Mention that this is in lieu of --From (and vice versa in --From)


internal/source/kafka/config.go line 76 at r2 (raw file):

	f.StringVar(&c.from, "from", "", "accept messages at or newer than this timestamp")
	f.StringVar(&c.group, "group", "", "the Kafka consumer group id")
	f.BoolVar(&c.oldest, "oldest", false, "start from the oldest message available")

Should this also be set to true by default?


internal/source/kafka/config.go line 78 at r2 (raw file):

	f.BoolVar(&c.oldest, "oldest", false, "start from the oldest message available")
	f.StringVar(&c.strategy, "strategy", "sticky", "Kafka consumer group re-balance strategy")
	f.StringVar(&c.to, "to", "", "accept messages at or older than this timestamp")

only accept messages with timestamps before this one, this is an exclusive upper limit


internal/source/kafka/config.go line 80 at r2 (raw file):

	f.StringVar(&c.to, "to", "", "accept messages at or older than this timestamp")
	f.StringArrayVar(&c.topics, "topic", nil, "the topic(s) that the consumer should use")
	f.StringVar(&c.version, "kafkaVersion", "3.6.0", "Kafka version")

Why is this necessary?


internal/source/kafka/config.go line 131 at r2 (raw file):

	}
	if hlc.Compare(from, to) > 0 {
		return errors.New("from timestamp must earlier than to timestamp")

this message is unclear


internal/source/kafka/conn.go line 35 at r2 (raw file):

//
//	note: we get resolved timestamps on all the partitions,
//	      so we should be able to leverage that.

There may be some complication here, as the messages are only ordered per partition right? Or is it by topic? So we may need to wait on writing a resolved timestamp until after we've grabbed all the other messages


internal/source/kafka/conn.go line 38 at r2 (raw file):

//
// TODO (silvano): support Avro format, schema registry.
// TODO (silvano): add metrics.

Please add github issues for these todos and link the issue number here


internal/source/kafka/conn.go line 62 at r2 (raw file):

// are allocated to each process based on the chosen rebalance strategy.
func (c *Conn) Start(ctx *stopper.Context) error {
	version, err := sarama.ParseKafkaVersion(c.config.version)

This check should be in preflight.


internal/source/kafka/conn.go line 74 at r2 (raw file):

	switch c.config.strategy {
	case "sticky":

This initial check should be in preflight. And please make it an enum.


internal/source/kafka/conn.go line 133 at r2 (raw file):

// getOffsets get the most recent offsets at the given time
// for all the topics and partitions.
// TODO (silvano) : add testing

Add github issue and ref to it here please


internal/source/kafka/conn.go line 136 at r2 (raw file):

func (c *Conn) getOffsets(nanos int64) ([]*partitionState, error) {
	res := make([]*partitionState, 0)
	client, err := sarama.NewClient(c.config.brokers, c.saramaConfig)

Is there a pool or is this common practice?


internal/source/kafka/consumer.go line 79 at r2 (raw file):

}

// ConsumeClaim process new messages for the topic/partition specified in the claim.

processes


internal/source/kafka/consumer.go line 96 at r2 (raw file):

		case message, ok := <-claim.Messages():
			if !ok {
				log.Printf("message channel was closed")

These log messages should have the topic, partition and maybe even the current offset in them

Also, shouldn't these logs be debug?


internal/source/kafka/consumer.go line 118 at r2 (raw file):

		case <-time.After(time.Second):
			// Periodically flush a batch, and mark the latest message for each topic/partition as read.
			if toProcess, err = c.accept(ctx, toProcess); err != nil {

How does Read differ from Mark?


internal/source/kafka/consumer.go line 133 at r2 (raw file):

) map[string]*sarama.ConsumerMessage {
	for _, message := range consumed {
		session.MarkMessage(message, "")

If we can use kafka to Mark messages.... then we can avoid staging completely... hmm...


internal/source/kafka/consumer.go line 146 at r2 (raw file):

		return toProcess, nil
	}
	log.Printf("flushing %d", toProcess.Count())

debug?


internal/source/kafka/provider.go line 24 at r2 (raw file):

	"github.com/cockroachdb/cdc-sink/internal/sequencer/chaos"
	"github.com/cockroachdb/cdc-sink/internal/sequencer/immediate"
	scriptSeq "github.com/cockroachdb/cdc-sink/internal/sequencer/script"

you used scriptSequencer earlier, please pick one
And for /script you used scriptRuntime


internal/util/hlc/hlc_test.go line 69 at r1 (raw file):

	a.False(rng.Contains(zero))
	a.False(rng.Contains(nine))

Can you test almost 10? {9, max}


internal/util/hlc/hlc_test.go line 74 at r1 (raw file):

	a.True(rng.Contains(fifteen))
	a.True(rng.Contains(almostTwenty))
	a.True(rng.Contains(twenty))

Should this be false, exclusive of max?

Copy link
Contributor Author

@sravotto sravotto left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The way ConsumerGroup works allow multiple instances of replicator. Each instance would have its own set of (non-overlapping) partitions. And if one replicator dies, its partitions will be allocated to the remaining ones. It works fine with immediate, but we need some coordination with the resolved timestamps across replication instances if we want to enforce transaction consistency.

Reviewable status: 10 of 19 files reviewed, 21 unresolved discussions (waiting on @bobvawter and @BramGruneir)


-- commits line 11 at r2:

Previously, BramGruneir (Bram Gruneir) wrote…

sent not send

Done.


internal/source/kafka/config.go line 74 at r2 (raw file):

Previously, BramGruneir (Bram Gruneir) wrote…

From and To or Min and Max? Also, perhaps it's worth adding Timestamp to the end of it for clarity: FromTimestamp or MinTimestamp

I think Min and Max might be better.

Just because from could be from where or from when?

Done.


internal/source/kafka/config.go line 74 at r2 (raw file):

Previously, BramGruneir (Bram Gruneir) wrote…

only accept messages with timestamps at or newer than this timestamp, this is an inclusive lower limit

Done.


internal/source/kafka/config.go line 76 at r2 (raw file):

Previously, BramGruneir (Bram Gruneir) wrote…

Mention that this is in lieu of --From (and vice versa in --From)

I decided to remove oldest, since we have min,max. By default we'll start from the oldest available in Kafka.


internal/source/kafka/config.go line 76 at r2 (raw file):

Previously, BramGruneir (Bram Gruneir) wrote…

Should this also be set to true by default?

Done.


internal/source/kafka/config.go line 78 at r2 (raw file):

Previously, BramGruneir (Bram Gruneir) wrote…

only accept messages with timestamps before this one, this is an exclusive upper limit

Done.


internal/source/kafka/config.go line 80 at r2 (raw file):

Previously, BramGruneir (Bram Gruneir) wrote…

Why is this necessary?

Removed for now. We'll add it later if it becomes necessary.


internal/source/kafka/config.go line 131 at r2 (raw file):

Previously, BramGruneir (Bram Gruneir) wrote…

this message is unclear

Done.


internal/source/kafka/conn.go line 35 at r2 (raw file):

Previously, BramGruneir (Bram Gruneir) wrote…

There may be some complication here, as the messages are only ordered per partition right? Or is it by topic? So we may need to wait on writing a resolved timestamp until after we've grabbed all the other messages

Indeed. We have to receive the resolved timestamps from all the partitions.


internal/source/kafka/conn.go line 38 at r2 (raw file):

Previously, BramGruneir (Bram Gruneir) wrote…

Please add github issues for these todos and link the issue number here

Done.


internal/source/kafka/conn.go line 62 at r2 (raw file):

Previously, BramGruneir (Bram Gruneir) wrote…

This check should be in preflight.

Done. Removed version for now.


internal/source/kafka/conn.go line 74 at r2 (raw file):

Previously, BramGruneir (Bram Gruneir) wrote…

This initial check should be in preflight. And please make it an enum.

Done.


internal/source/kafka/conn.go line 133 at r2 (raw file):

Previously, BramGruneir (Bram Gruneir) wrote…

Add github issue and ref to it here please

Done.


internal/source/kafka/conn.go line 136 at r2 (raw file):

Previously, BramGruneir (Bram Gruneir) wrote…

Is there a pool or is this common practice?

This is just used to get the offsets at the start. We are using a consumer group to get messages.


internal/source/kafka/consumer.go line 79 at r2 (raw file):

Previously, BramGruneir (Bram Gruneir) wrote…

processes

Done.


internal/source/kafka/consumer.go line 96 at r2 (raw file):

Previously, BramGruneir (Bram Gruneir) wrote…

These log messages should have the topic, partition and maybe even the current offset in them

Also, shouldn't these logs be debug?

Done.


internal/source/kafka/consumer.go line 118 at r2 (raw file):

Previously, BramGruneir (Bram Gruneir) wrote…

How does Read differ from Mark?

Changed the comment a bit. Mark tells kafka that we consumed the message and it's ok to move the offset.


internal/source/kafka/consumer.go line 133 at r2 (raw file):

Previously, BramGruneir (Bram Gruneir) wrote…

If we can use kafka to Mark messages.... then we can avoid staging completely... hmm...

I think we need staging in transaction consistent mode, we have to make sure we get resolved timestamps from all the partitions for a topic.


internal/source/kafka/consumer.go line 146 at r2 (raw file):

Previously, BramGruneir (Bram Gruneir) wrote…

debug?

Done.


internal/util/hlc/hlc_test.go line 69 at r1 (raw file):

Previously, BramGruneir (Bram Gruneir) wrote…

Can you test almost 10? {9, max}

Done.


internal/util/hlc/hlc_test.go line 74 at r1 (raw file):

Previously, BramGruneir (Bram Gruneir) wrote…

Should this be false, exclusive of max?

No, the range we created is inclusive of 20 (see RangeIncluding above). I added a comment to make it clearer.

@sravotto sravotto force-pushed the sr8_kafka branch 2 times, most recently from 601a361 to b4d9858 Compare March 19, 2024 13:51
Copy link
Member

@BramGruneir BramGruneir left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense.

@bobvawter, can you give this a review?

:lgtm:

Reviewed 4 of 9 files at r4, 5 of 5 files at r5, all commit messages.
Reviewable status: :shipit: complete! all files reviewed, all discussions resolved (waiting on @bobvawter)

Copy link
Contributor

@bobvawter bobvawter left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewed 1 of 2 files at r1, 3 of 13 files at r2, 1 of 4 files at r3, 1 of 9 files at r4, 4 of 5 files at r5, 3 of 3 files at r6, all commit messages.
Reviewable status: all files reviewed, 8 unresolved discussions (waiting on @sravotto)


internal/source/kafka/config.go line 48 at r3 (raw file):

	TargetSchema ident.Schema

	batchSize int      // How many messages to accumulate before committing to the target

If these fields are exported, you can take this whole object and publish it to the Diagnostics endpoint as a trivially JSON-ish object.


internal/source/kafka/consumer.go line 80 at r6 (raw file):

func (c *Handler) Cleanup(session sarama.ConsumerGroupSession) error {
	if session.Context().Err() != nil {
		log.Errorf("Session terminated with an error: %s", session.Context().Err())

Use log.WithError(err).Level("....") here and elsewhere. That allows the log handler to access the structure (and stack) of the error message. Also, errors are typically used with the %v verb, which allows the object to format itself.

Also, anywhere you call an external API that returns an error should have an errors.WithStack() on it.


internal/source/kafka/consumer.go line 141 at r6 (raw file):

		session.MarkMessage(message, "")
	}
	return make(map[string]*sarama.ConsumerMessage)

Why does this return a new map?


internal/source/kafka/consumer.go line 147 at r6 (raw file):

func (c *Handler) accept(
	ctx context.Context, toProcess *types.MultiBatch,
) (*types.MultiBatch, error) {

Why does this method return a batch?


internal/source/kafka/consumer.go line 164 at r6 (raw file):

	var payload payload
	dec := json.NewDecoder(bytes.NewReader(msg.Value))
	dec.DisallowUnknownFields()

Consider Postel's law.


internal/source/kafka/integration_test.go line 53 at r6 (raw file):

// TestKafka verifies that we can process simple messages from Kafka.
// The kafka messages are generated by a CockroachDB changefeed in JSON format.
func TestKafka(t *testing.T) {

I'd like to see a version of this test that's driven by a seqtest.Generator. It will generate a non-trivial sequence of mutations and also validate that the data in the target tables has maintained the correct order of updates. You should be able to use the GenerateInto(batch) and CheckConsistent() methods to bookend the Kafka transport code.


internal/source/kafka/provider.go line 46 at r6 (raw file):

	ctx *stopper.Context,
	acc *apply.Acceptor,
	imm *immediate.Immediate,

Inject switcher.Switcher instead and leave it in ModeImmediate. It will build a complete sequencer stack for you and provides a faster on-ramp to supporting more that just immediate mode.


internal/util/hlc/hlc_test.go line 58 at r6 (raw file):

	zero := New(0, 0)
	nine := New(9, 0)
	almostTen := New(9, math.MaxInt32)

Write these as ten.Before()

This change adds a utility function to check if a range contains
a given timestamp.
This change adds a source connector that consumes CockroachDB
changefeeds sent through a Kafka cluster.
A potential use case is to replay events stored within Kafka, after
restoring a backup to reduce the data loss and recovery time from a failure.

The connector currently supports events in JSON format with envelope=wrapped.
Deletes are supported if the changefeed is created with the diff option.

The connector leverages a Kafka consumer group, a set of consumers which
cooperate to consume data from some topics. The partitions of all the topics
are divided among the consumers in the group. As new group members arrive
and old members leave, the partitions are re-assigned so that each member
receives a proportional share of the partitions.
This allows multiple instance of the replicator process to concurrently
consume messages from a topic, provided that there are sufficient partitions
@sravotto sravotto force-pushed the sr8_kafka branch 2 times, most recently from 0b52b43 to 188d0e8 Compare March 20, 2024 19:22
Copy link
Contributor Author

@sravotto sravotto left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please take another look.

Reviewable status: 6 of 19 files reviewed, 8 unresolved discussions (waiting on @bobvawter and @BramGruneir)


internal/source/kafka/config.go line 48 at r3 (raw file):

Previously, bobvawter (Bob Vawter) wrote…

If these fields are exported, you can take this whole object and publish it to the Diagnostics endpoint as a trivially JSON-ish object.

Done.


internal/source/kafka/consumer.go line 80 at r6 (raw file):

Previously, bobvawter (Bob Vawter) wrote…

Use log.WithError(err).Level("....") here and elsewhere. That allows the log handler to access the structure (and stack) of the error message. Also, errors are typically used with the %v verb, which allows the object to format itself.

Also, anywhere you call an external API that returns an error should have an errors.WithStack() on it.

Done.


internal/source/kafka/consumer.go line 141 at r6 (raw file):

Previously, bobvawter (Bob Vawter) wrote…

Why does this return a new map?

Done.


internal/source/kafka/consumer.go line 147 at r6 (raw file):

Previously, bobvawter (Bob Vawter) wrote…

Why does this method return a batch?

Done.


internal/source/kafka/consumer.go line 164 at r6 (raw file):

Previously, bobvawter (Bob Vawter) wrote…

Consider Postel's law.

Done.


internal/source/kafka/integration_test.go line 53 at r6 (raw file):

Previously, bobvawter (Bob Vawter) wrote…

I'd like to see a version of this test that's driven by a seqtest.Generator. It will generate a non-trivial sequence of mutations and also validate that the data in the target tables has maintained the correct order of updates. You should be able to use the GenerateInto(batch) and CheckConsistent() methods to bookend the Kafka transport code.

Filed #789


internal/source/kafka/provider.go line 46 at r6 (raw file):

Previously, bobvawter (Bob Vawter) wrote…

Inject switcher.Switcher instead and leave it in ModeImmediate. It will build a complete sequencer stack for you and provides a faster on-ramp to supporting more that just immediate mode.

Done.


internal/util/hlc/hlc_test.go line 58 at r6 (raw file):

Previously, bobvawter (Bob Vawter) wrote…

Write these as ten.Before()

Done.

Copy link
Contributor

@bobvawter bobvawter left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

:lgtm: w/ a couple of nits

Reviewed 1 of 13 files at r2, 13 of 13 files at r7, all commit messages.
Reviewable status: all files reviewed, 2 unresolved discussions (waiting on @sravotto)


internal/source/kafka/conn.go line 69 at r7 (raw file):

// are allocated to each process based on the chosen rebalance strategy.
func (c *Conn) Start(ctx *stopper.Context) (err error) {
	/**

Switch block comment to line comments and reflow.


internal/source/kafka/provider.go line 59 at r7 (raw file):

	mode := notify.VarOf(switcher.ModeImmediate)
	sw = sw.WithMode(mode)
	// seq, err := scriptSeq.Wrap(ctx, sw)

Dead code.

A simple integration test is part of this commit. It verifies that change events
originated by a CockroachDB changefeed, and routed via a single node Kafka
cluster are received by the connector and applied to a target CockroachDB datatabase.
Copy link
Contributor Author

@sravotto sravotto left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks!

Reviewable status: 17 of 19 files reviewed, 2 unresolved discussions (waiting on @bobvawter)


internal/source/kafka/conn.go line 69 at r7 (raw file):

Previously, bobvawter (Bob Vawter) wrote…

Switch block comment to line comments and reflow.

Done.


internal/source/kafka/provider.go line 59 at r7 (raw file):

Previously, bobvawter (Bob Vawter) wrote…

Dead code.

Done.

@sravotto sravotto added this pull request to the merge queue Mar 21, 2024
Merged via the queue into master with commit d54d057 Mar 21, 2024
45 of 46 checks passed
@sravotto sravotto deleted the sr8_kafka branch March 21, 2024 20:22
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants