-
Notifications
You must be signed in to change notification settings - Fork 25
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
kafka: finding proper offset to start replay, given a min timestamp #824
Conversation
Codecov ReportAttention: Patch coverage is
❗ Your organization needs to install the Codecov GitHub app to enable full functionality. Additional details and impacted files@@ Coverage Diff @@
## master #824 +/- ##
==========================================
- Coverage 76.21% 75.73% -0.49%
==========================================
Files 219 224 +5
Lines 10634 10888 +254
==========================================
+ Hits 8105 8246 +141
- Misses 1812 1908 +96
- Partials 717 734 +17 ☔ View full report in Codecov by Sentry. |
3ccf8fa
to
4d564ec
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code looks good overall, just come comments.
Reviewed 6 of 10 files at r1, 7 of 7 files at r2, all commit messages.
Reviewable status: all files reviewed, 10 unresolved discussions (waiting on @bobvawter, @Github-advanced-security[bot], and @sravotto)
internal/source/kafka/config.go
line 100 at r2 (raw file):
f.StringVar(&c.MinTimestamp, "minTimestamp", "", "only accept unprocessed messages at or newer than this timestamp; this is an inclusive lower limit") f.DurationVar(&c.ResolvedInterval, "resolvedInterval", 5*time.Second, "interval between two resolved timestamps")
Can you add more details here please. Add a warning that this MUST match the interval set in the CDC statement. This is a weird configuration issue that people will mess up.
Call out the specific variable. For example, do you mean min_checkpoint_frequency or resolved?
Also, rename this config parameter to exactly match the name of the one it needs to match.
internal/source/kafka/consumer.go
line 162 at r2 (raw file):
var payload payload dec := json.NewDecoder(bytes.NewReader(msg.Value)) dec.UseNumber()
Why is this no longer needed?
internal/source/kafka/metrics.go
line 26 at r2 (raw file):
var ( mutationsErrorCount = promauto.NewCounterVec(prometheus.CounterOpts{ Name: "kafka_mutations_error_count",
Please add these metrics to the grafana dashboard.
internal/source/kafka/offset.go
line 108 at r1 (raw file):
Previously, github-advanced-security[bot] wrote…
Useless assignment to local variable
This definition of offset is never used.
please fix this
internal/source/kafka/offset.go
line 67 at r2 (raw file):
func (o *offsetSeeker) GetOffsets(topics []string, min hlc.Time) ([]*partitionState, error) { res := make([]*partitionState, 0) // TODO (silvano): make this parallel
Please add a github issue to track this.
internal/source/kafka/offset.go
line 110 at r2 (raw file):
last := loghead var offset int64 // Looking for a offset that is reasonably just before the given min
nit: for an offset
internal/source/kafka/offset.go
line 144 at r2 (raw file):
return 0, errors.WithStack(err) } if offset != sarama.OffsetOldest {
Why the break here? Just add a comment.
internal/source/kafka/offset.go
line 174 at r2 (raw file):
return 0, errors.Errorf("consumer for %s closed", topic) } if msg.Offset >= offsets.max {
Please add some comments so I can understand the logic here.
internal/source/kafka/payload.go
line 37 at r2 (raw file):
// asPayload extracts the mutation payload from a Kafka consumer message. func asPayload(msg *sarama.ConsumerMessage) (*payload, error) {
Add unit test please. I know it will be small, but still worth doing.
internal/source/kafka/mocks/client.go
line 73 at r1 (raw file):
Previously, github-advanced-security[bot] wrote…
Expression has no effect
This expression has no effect.
fix please
7a5bd12
to
280d357
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please take another look.
Reviewable status: 9 of 14 files reviewed, 9 unresolved discussions (waiting on @bobvawter, @BramGruneir, and @Github-advanced-security[bot])
internal/source/kafka/config.go
line 100 at r2 (raw file):
Previously, BramGruneir (Bram Gruneir) wrote…
Can you add more details here please. Add a warning that this MUST match the interval set in the CDC statement. This is a weird configuration issue that people will mess up.
Call out the specific variable. For example, do you mean min_checkpoint_frequency or resolved?
Also, rename this config parameter to exactly match the name of the one it needs to match.
Done. There are 2 settings in the changefeeds that need to be set. This is just an optimization, so it's not a MUST.
internal/source/kafka/consumer.go
line 162 at r2 (raw file):
Previously, BramGruneir (Bram Gruneir) wrote…
Why is this no longer needed?
moved to asPayload.
internal/source/kafka/metrics.go
line 26 at r2 (raw file):
Previously, BramGruneir (Bram Gruneir) wrote…
Please add these metrics to the grafana dashboard.
Will do this later
#829
internal/source/kafka/offset.go
line 108 at r1 (raw file):
Previously, github-advanced-security[bot] wrote…
Useless assignment to local variable
This definition of offset is never used.
Done.
internal/source/kafka/offset.go
line 67 at r2 (raw file):
Previously, BramGruneir (Bram Gruneir) wrote…
Please add a github issue to track this.
Done. #830
internal/source/kafka/offset.go
line 144 at r2 (raw file):
Previously, BramGruneir (Bram Gruneir) wrote…
Why the break here? Just add a comment.
Done.
internal/source/kafka/offset.go
line 174 at r2 (raw file):
Previously, BramGruneir (Bram Gruneir) wrote…
Please add some comments so I can understand the logic here.
Done. Turns out that there was a bug. If we don't find a suitable offset we should return offsetoldest.
internal/source/kafka/payload.go
line 37 at r2 (raw file):
Previously, BramGruneir (Bram Gruneir) wrote…
Add unit test please. I know it will be small, but still worth doing.
Done.
internal/source/kafka/mocks/client.go
line 73 at r1 (raw file):
Previously, github-advanced-security[bot] wrote…
Expression has no effect
This expression has no effect.
Done.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewed 5 of 5 files at r3, all commit messages.
Reviewable status: all files reviewed, 6 unresolved discussions (waiting on @bobvawter, @Github-advanced-security[bot], and @sravotto)
internal/source/kafka/config.go
line 100 at r2 (raw file):
Previously, sravotto (silvano) wrote…
Done. There are 2 settings in the changefeeds that need to be set. This is just an optimization, so it's not a MUST.
I'm still confused over this. The two values are linked. Do you mean the lowest of the two or or the largest?
internal/source/kafka/offset.go
line 67 at r2 (raw file):
Previously, sravotto (silvano) wrote…
Done. #830
Please link it to this todo
// TODO (silvano) #830: make this parallel
bf2f0db
to
2d33519
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let me know if it is clearer...
Reviewable status: 12 of 14 files reviewed, 6 unresolved discussions (waiting on @bobvawter, @BramGruneir, and @Github-advanced-security[bot])
internal/source/kafka/config.go
line 100 at r2 (raw file):
Previously, BramGruneir (Bram Gruneir) wrote…
I'm still confused over this. The two values are linked. Do you mean the lowest of the two or or the largest?
I added more information, however users should look at the change feed documentation.
internal/source/kafka/offset.go
line 67 at r2 (raw file):
Previously, BramGruneir (Bram Gruneir) wrote…
Please link it to this todo
// TODO (silvano) #830: make this parallel
Done.
This change improves the logic to find an adequate offset to start the event replay. We return the offset of the most recent resolved timestamp message that is stricly less than the minimum timestamp provided by the user. A new set of metrics have been added to track the total number of messages that have been replayed, and the number of messages read to find the resolved timestamp. A new mock client and consumer have been introduced to facilitate testing. Fixes #779, #778.
2d33519
to
95f4b61
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewed 2 of 2 files at r4, all commit messages.
Reviewable status: all files reviewed, 2 unresolved discussions (waiting on @bobvawter and @Github-advanced-security[bot])
This change improves the logic to find an adequate offset to start the event replay. We return the offset of the most recent resolved timestamp message that is stricly less than the minimum timestamp provided by the user.
A new set of metrics have been added to track the total number of messages that have been replayed, and the number of messages read to find the resolved timestamp.
A new mock client and consumer have been introduced to facilitate testing.
This change is