-
Notifications
You must be signed in to change notification settings - Fork 25
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Kafka connector preview #770
Conversation
Codecov ReportAttention: Patch coverage is
❗ Your organization needs to install the Codecov GitHub app to enable full functionality. Additional details and impacted files@@ Coverage Diff @@
## master #770 +/- ##
==========================================
- Coverage 78.42% 77.01% -1.41%
==========================================
Files 201 207 +6
Lines 9769 10001 +232
==========================================
+ Hits 7661 7702 +41
- Misses 1440 1616 +176
- Partials 668 683 +15 ☔ View full report in Codecov by Sentry. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall, this is great, bit it needs a bit more work.
Also, I get that this is just the first pass here, but you're going to be limited by running only a single instance of replicator. Ideally, we put the topics in staging and take out a lease against them to allow for more than one replicator instance to push data.
I can see how this can be easily be done by running multiple instances of ConsumeClaim. Or we can use the mark function to determine if a value has been already processed. Lots of options here.
Reviewed 2 of 2 files at r1, 13 of 13 files at r2, 4 of 4 files at r3, all commit messages.
Reviewable status: all files reviewed, 21 unresolved discussions (waiting on @bobvawter and @sravotto)
-- commits
line 11 at r2:
sent not send
internal/source/kafka/config.go
line 74 at r2 (raw file):
f.IntVar(&c.batchSize, "batchSize", 100, "messages to accumulate before committing to the target") f.StringArrayVar(&c.brokers, "broker", nil, "address of Kafka broker(s)") f.StringVar(&c.from, "from", "", "accept messages at or newer than this timestamp")
From and To or Min and Max? Also, perhaps it's worth adding Timestamp to the end of it for clarity: FromTimestamp or MinTimestamp
I think Min and Max might be better.
Just because from could be from where or from when?
internal/source/kafka/config.go
line 74 at r2 (raw file):
f.IntVar(&c.batchSize, "batchSize", 100, "messages to accumulate before committing to the target") f.StringArrayVar(&c.brokers, "broker", nil, "address of Kafka broker(s)") f.StringVar(&c.from, "from", "", "accept messages at or newer than this timestamp")
only accept messages with timestamps at or newer than this timestamp, this is an inclusive lower limit
internal/source/kafka/config.go
line 76 at r2 (raw file):
f.StringVar(&c.from, "from", "", "accept messages at or newer than this timestamp") f.StringVar(&c.group, "group", "", "the Kafka consumer group id") f.BoolVar(&c.oldest, "oldest", false, "start from the oldest message available")
Mention that this is in lieu of --From (and vice versa in --From)
internal/source/kafka/config.go
line 76 at r2 (raw file):
f.StringVar(&c.from, "from", "", "accept messages at or newer than this timestamp") f.StringVar(&c.group, "group", "", "the Kafka consumer group id") f.BoolVar(&c.oldest, "oldest", false, "start from the oldest message available")
Should this also be set to true by default?
internal/source/kafka/config.go
line 78 at r2 (raw file):
f.BoolVar(&c.oldest, "oldest", false, "start from the oldest message available") f.StringVar(&c.strategy, "strategy", "sticky", "Kafka consumer group re-balance strategy") f.StringVar(&c.to, "to", "", "accept messages at or older than this timestamp")
only accept messages with timestamps before this one, this is an exclusive upper limit
internal/source/kafka/config.go
line 80 at r2 (raw file):
f.StringVar(&c.to, "to", "", "accept messages at or older than this timestamp") f.StringArrayVar(&c.topics, "topic", nil, "the topic(s) that the consumer should use") f.StringVar(&c.version, "kafkaVersion", "3.6.0", "Kafka version")
Why is this necessary?
internal/source/kafka/config.go
line 131 at r2 (raw file):
} if hlc.Compare(from, to) > 0 { return errors.New("from timestamp must earlier than to timestamp")
this message is unclear
internal/source/kafka/conn.go
line 35 at r2 (raw file):
// // note: we get resolved timestamps on all the partitions, // so we should be able to leverage that.
There may be some complication here, as the messages are only ordered per partition right? Or is it by topic? So we may need to wait on writing a resolved timestamp until after we've grabbed all the other messages
internal/source/kafka/conn.go
line 38 at r2 (raw file):
// // TODO (silvano): support Avro format, schema registry. // TODO (silvano): add metrics.
Please add github issues for these todos and link the issue number here
internal/source/kafka/conn.go
line 62 at r2 (raw file):
// are allocated to each process based on the chosen rebalance strategy. func (c *Conn) Start(ctx *stopper.Context) error { version, err := sarama.ParseKafkaVersion(c.config.version)
This check should be in preflight.
internal/source/kafka/conn.go
line 74 at r2 (raw file):
switch c.config.strategy { case "sticky":
This initial check should be in preflight. And please make it an enum.
internal/source/kafka/conn.go
line 133 at r2 (raw file):
// getOffsets get the most recent offsets at the given time // for all the topics and partitions. // TODO (silvano) : add testing
Add github issue and ref to it here please
internal/source/kafka/conn.go
line 136 at r2 (raw file):
func (c *Conn) getOffsets(nanos int64) ([]*partitionState, error) { res := make([]*partitionState, 0) client, err := sarama.NewClient(c.config.brokers, c.saramaConfig)
Is there a pool or is this common practice?
internal/source/kafka/consumer.go
line 79 at r2 (raw file):
} // ConsumeClaim process new messages for the topic/partition specified in the claim.
processes
internal/source/kafka/consumer.go
line 96 at r2 (raw file):
case message, ok := <-claim.Messages(): if !ok { log.Printf("message channel was closed")
These log messages should have the topic, partition and maybe even the current offset in them
Also, shouldn't these logs be debug?
internal/source/kafka/consumer.go
line 118 at r2 (raw file):
case <-time.After(time.Second): // Periodically flush a batch, and mark the latest message for each topic/partition as read. if toProcess, err = c.accept(ctx, toProcess); err != nil {
How does Read differ from Mark?
internal/source/kafka/consumer.go
line 133 at r2 (raw file):
) map[string]*sarama.ConsumerMessage { for _, message := range consumed { session.MarkMessage(message, "")
If we can use kafka to Mark messages.... then we can avoid staging completely... hmm...
internal/source/kafka/consumer.go
line 146 at r2 (raw file):
return toProcess, nil } log.Printf("flushing %d", toProcess.Count())
debug?
internal/source/kafka/provider.go
line 24 at r2 (raw file):
"github.com/cockroachdb/cdc-sink/internal/sequencer/chaos" "github.com/cockroachdb/cdc-sink/internal/sequencer/immediate" scriptSeq "github.com/cockroachdb/cdc-sink/internal/sequencer/script"
you used scriptSequencer earlier, please pick one
And for /script you used scriptRuntime
internal/util/hlc/hlc_test.go
line 69 at r1 (raw file):
a.False(rng.Contains(zero)) a.False(rng.Contains(nine))
Can you test almost 10? {9, max}
internal/util/hlc/hlc_test.go
line 74 at r1 (raw file):
a.True(rng.Contains(fifteen)) a.True(rng.Contains(almostTwenty)) a.True(rng.Contains(twenty))
Should this be false, exclusive of max?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The way ConsumerGroup works allow multiple instances of replicator. Each instance would have its own set of (non-overlapping) partitions. And if one replicator dies, its partitions will be allocated to the remaining ones. It works fine with immediate, but we need some coordination with the resolved timestamps across replication instances if we want to enforce transaction consistency.
Reviewable status: 10 of 19 files reviewed, 21 unresolved discussions (waiting on @bobvawter and @BramGruneir)
Previously, BramGruneir (Bram Gruneir) wrote…
sent not send
Done.
internal/source/kafka/config.go
line 74 at r2 (raw file):
Previously, BramGruneir (Bram Gruneir) wrote…
From and To or Min and Max? Also, perhaps it's worth adding Timestamp to the end of it for clarity: FromTimestamp or MinTimestamp
I think Min and Max might be better.
Just because from could be from where or from when?
Done.
internal/source/kafka/config.go
line 74 at r2 (raw file):
Previously, BramGruneir (Bram Gruneir) wrote…
only accept messages with timestamps at or newer than this timestamp, this is an inclusive lower limit
Done.
internal/source/kafka/config.go
line 76 at r2 (raw file):
Previously, BramGruneir (Bram Gruneir) wrote…
Mention that this is in lieu of --From (and vice versa in --From)
I decided to remove oldest, since we have min,max. By default we'll start from the oldest available in Kafka.
internal/source/kafka/config.go
line 76 at r2 (raw file):
Previously, BramGruneir (Bram Gruneir) wrote…
Should this also be set to true by default?
Done.
internal/source/kafka/config.go
line 78 at r2 (raw file):
Previously, BramGruneir (Bram Gruneir) wrote…
only accept messages with timestamps before this one, this is an exclusive upper limit
Done.
internal/source/kafka/config.go
line 80 at r2 (raw file):
Previously, BramGruneir (Bram Gruneir) wrote…
Why is this necessary?
Removed for now. We'll add it later if it becomes necessary.
internal/source/kafka/config.go
line 131 at r2 (raw file):
Previously, BramGruneir (Bram Gruneir) wrote…
this message is unclear
Done.
internal/source/kafka/conn.go
line 35 at r2 (raw file):
Previously, BramGruneir (Bram Gruneir) wrote…
There may be some complication here, as the messages are only ordered per partition right? Or is it by topic? So we may need to wait on writing a resolved timestamp until after we've grabbed all the other messages
Indeed. We have to receive the resolved timestamps from all the partitions.
internal/source/kafka/conn.go
line 38 at r2 (raw file):
Previously, BramGruneir (Bram Gruneir) wrote…
Please add github issues for these todos and link the issue number here
Done.
internal/source/kafka/conn.go
line 62 at r2 (raw file):
Previously, BramGruneir (Bram Gruneir) wrote…
This check should be in preflight.
Done. Removed version for now.
internal/source/kafka/conn.go
line 74 at r2 (raw file):
Previously, BramGruneir (Bram Gruneir) wrote…
This initial check should be in preflight. And please make it an enum.
Done.
internal/source/kafka/conn.go
line 133 at r2 (raw file):
Previously, BramGruneir (Bram Gruneir) wrote…
Add github issue and ref to it here please
Done.
internal/source/kafka/conn.go
line 136 at r2 (raw file):
Previously, BramGruneir (Bram Gruneir) wrote…
Is there a pool or is this common practice?
This is just used to get the offsets at the start. We are using a consumer group to get messages.
internal/source/kafka/consumer.go
line 79 at r2 (raw file):
Previously, BramGruneir (Bram Gruneir) wrote…
processes
Done.
internal/source/kafka/consumer.go
line 96 at r2 (raw file):
Previously, BramGruneir (Bram Gruneir) wrote…
These log messages should have the topic, partition and maybe even the current offset in them
Also, shouldn't these logs be debug?
Done.
internal/source/kafka/consumer.go
line 118 at r2 (raw file):
Previously, BramGruneir (Bram Gruneir) wrote…
How does Read differ from Mark?
Changed the comment a bit. Mark tells kafka that we consumed the message and it's ok to move the offset.
internal/source/kafka/consumer.go
line 133 at r2 (raw file):
Previously, BramGruneir (Bram Gruneir) wrote…
If we can use kafka to Mark messages.... then we can avoid staging completely... hmm...
I think we need staging in transaction consistent mode, we have to make sure we get resolved timestamps from all the partitions for a topic.
internal/source/kafka/consumer.go
line 146 at r2 (raw file):
Previously, BramGruneir (Bram Gruneir) wrote…
debug?
Done.
internal/util/hlc/hlc_test.go
line 69 at r1 (raw file):
Previously, BramGruneir (Bram Gruneir) wrote…
Can you test almost 10? {9, max}
Done.
internal/util/hlc/hlc_test.go
line 74 at r1 (raw file):
Previously, BramGruneir (Bram Gruneir) wrote…
Should this be false, exclusive of max?
No, the range we created is inclusive of 20 (see RangeIncluding above). I added a comment to make it clearer.
601a361
to
b4d9858
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Makes sense.
@bobvawter, can you give this a review?
Reviewed 4 of 9 files at r4, 5 of 5 files at r5, all commit messages.
Reviewable status: complete! all files reviewed, all discussions resolved (waiting on @bobvawter)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewed 1 of 2 files at r1, 3 of 13 files at r2, 1 of 4 files at r3, 1 of 9 files at r4, 4 of 5 files at r5, 3 of 3 files at r6, all commit messages.
Reviewable status: all files reviewed, 8 unresolved discussions (waiting on @sravotto)
internal/source/kafka/config.go
line 48 at r3 (raw file):
TargetSchema ident.Schema batchSize int // How many messages to accumulate before committing to the target
If these fields are exported, you can take this whole object and publish it to the Diagnostics endpoint as a trivially JSON-ish object.
internal/source/kafka/consumer.go
line 80 at r6 (raw file):
func (c *Handler) Cleanup(session sarama.ConsumerGroupSession) error { if session.Context().Err() != nil { log.Errorf("Session terminated with an error: %s", session.Context().Err())
Use log.WithError(err).Level("....")
here and elsewhere. That allows the log handler to access the structure (and stack) of the error message. Also, errors are typically used with the %v
verb, which allows the object to format itself.
Also, anywhere you call an external API that returns an error should have an errors.WithStack()
on it.
internal/source/kafka/consumer.go
line 141 at r6 (raw file):
session.MarkMessage(message, "") } return make(map[string]*sarama.ConsumerMessage)
Why does this return a new map?
internal/source/kafka/consumer.go
line 147 at r6 (raw file):
func (c *Handler) accept( ctx context.Context, toProcess *types.MultiBatch, ) (*types.MultiBatch, error) {
Why does this method return a batch?
internal/source/kafka/consumer.go
line 164 at r6 (raw file):
var payload payload dec := json.NewDecoder(bytes.NewReader(msg.Value)) dec.DisallowUnknownFields()
Consider Postel's law.
internal/source/kafka/integration_test.go
line 53 at r6 (raw file):
// TestKafka verifies that we can process simple messages from Kafka. // The kafka messages are generated by a CockroachDB changefeed in JSON format. func TestKafka(t *testing.T) {
I'd like to see a version of this test that's driven by a seqtest.Generator
. It will generate a non-trivial sequence of mutations and also validate that the data in the target tables has maintained the correct order of updates. You should be able to use the GenerateInto(batch)
and CheckConsistent()
methods to bookend the Kafka transport code.
internal/source/kafka/provider.go
line 46 at r6 (raw file):
ctx *stopper.Context, acc *apply.Acceptor, imm *immediate.Immediate,
Inject switcher.Switcher
instead and leave it in ModeImmediate
. It will build a complete sequencer stack for you and provides a faster on-ramp to supporting more that just immediate mode.
internal/util/hlc/hlc_test.go
line 58 at r6 (raw file):
zero := New(0, 0) nine := New(9, 0) almostTen := New(9, math.MaxInt32)
Write these as ten.Before()
This change adds a utility function to check if a range contains a given timestamp.
This change adds a source connector that consumes CockroachDB changefeeds sent through a Kafka cluster. A potential use case is to replay events stored within Kafka, after restoring a backup to reduce the data loss and recovery time from a failure. The connector currently supports events in JSON format with envelope=wrapped. Deletes are supported if the changefeed is created with the diff option. The connector leverages a Kafka consumer group, a set of consumers which cooperate to consume data from some topics. The partitions of all the topics are divided among the consumers in the group. As new group members arrive and old members leave, the partitions are re-assigned so that each member receives a proportional share of the partitions. This allows multiple instance of the replicator process to concurrently consume messages from a topic, provided that there are sufficient partitions
0b52b43
to
188d0e8
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please take another look.
Reviewable status: 6 of 19 files reviewed, 8 unresolved discussions (waiting on @bobvawter and @BramGruneir)
internal/source/kafka/config.go
line 48 at r3 (raw file):
Previously, bobvawter (Bob Vawter) wrote…
If these fields are exported, you can take this whole object and publish it to the Diagnostics endpoint as a trivially JSON-ish object.
Done.
internal/source/kafka/consumer.go
line 80 at r6 (raw file):
Previously, bobvawter (Bob Vawter) wrote…
Use
log.WithError(err).Level("....")
here and elsewhere. That allows the log handler to access the structure (and stack) of the error message. Also, errors are typically used with the%v
verb, which allows the object to format itself.Also, anywhere you call an external API that returns an error should have an
errors.WithStack()
on it.
Done.
internal/source/kafka/consumer.go
line 141 at r6 (raw file):
Previously, bobvawter (Bob Vawter) wrote…
Why does this return a new map?
Done.
internal/source/kafka/consumer.go
line 147 at r6 (raw file):
Previously, bobvawter (Bob Vawter) wrote…
Why does this method return a batch?
Done.
internal/source/kafka/consumer.go
line 164 at r6 (raw file):
Previously, bobvawter (Bob Vawter) wrote…
Consider Postel's law.
Done.
internal/source/kafka/integration_test.go
line 53 at r6 (raw file):
Previously, bobvawter (Bob Vawter) wrote…
I'd like to see a version of this test that's driven by a
seqtest.Generator
. It will generate a non-trivial sequence of mutations and also validate that the data in the target tables has maintained the correct order of updates. You should be able to use theGenerateInto(batch)
andCheckConsistent()
methods to bookend the Kafka transport code.
Filed #789
internal/source/kafka/provider.go
line 46 at r6 (raw file):
Previously, bobvawter (Bob Vawter) wrote…
Inject
switcher.Switcher
instead and leave it inModeImmediate
. It will build a complete sequencer stack for you and provides a faster on-ramp to supporting more that just immediate mode.
Done.
internal/util/hlc/hlc_test.go
line 58 at r6 (raw file):
Previously, bobvawter (Bob Vawter) wrote…
Write these as
ten.Before()
Done.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewed 1 of 13 files at r2, 13 of 13 files at r7, all commit messages.
Reviewable status: all files reviewed, 2 unresolved discussions (waiting on @sravotto)
internal/source/kafka/conn.go
line 69 at r7 (raw file):
// are allocated to each process based on the chosen rebalance strategy. func (c *Conn) Start(ctx *stopper.Context) (err error) { /**
Switch block comment to line comments and reflow.
internal/source/kafka/provider.go
line 59 at r7 (raw file):
mode := notify.VarOf(switcher.ModeImmediate) sw = sw.WithMode(mode) // seq, err := scriptSeq.Wrap(ctx, sw)
Dead code.
A simple integration test is part of this commit. It verifies that change events originated by a CockroachDB changefeed, and routed via a single node Kafka cluster are received by the connector and applied to a target CockroachDB datatabase.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks!
Reviewable status: 17 of 19 files reviewed, 2 unresolved discussions (waiting on @bobvawter)
internal/source/kafka/conn.go
line 69 at r7 (raw file):
Previously, bobvawter (Bob Vawter) wrote…
Switch block comment to line comments and reflow.
Done.
internal/source/kafka/provider.go
line 59 at r7 (raw file):
Previously, bobvawter (Bob Vawter) wrote…
Dead code.
Done.
This PR consists of 3 separate commits:
This change is