Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kafka: finding proper offset to start replay, given a min timestamp #824

Merged
merged 1 commit into from
Apr 30, 2024

Conversation

sravotto
Copy link
Contributor

@sravotto sravotto commented Apr 16, 2024

This change improves the logic to find an adequate offset to start the event replay. We return the offset of the most recent resolved timestamp message that is stricly less than the minimum timestamp provided by the user.

A new set of metrics have been added to track the total number of messages that have been replayed, and the number of messages read to find the resolved timestamp.

A new mock client and consumer have been introduced to facilitate testing.


This change is Reviewable

internal/source/kafka/mocks/client.go Fixed Show fixed Hide fixed
internal/source/kafka/offset.go Fixed Show fixed Hide fixed
@codecov-commenter
Copy link

codecov-commenter commented Apr 16, 2024

Codecov Report

Attention: Patch coverage is 54.51389% with 131 lines in your changes are missing coverage. Please review.

Project coverage is 75.73%. Comparing base (1487d0a) to head (95f4b61).

Files Patch % Lines
internal/source/kafka/mocks/client.go 34.66% 48 Missing and 1 partial ⚠️
internal/source/kafka/offset.go 56.52% 29 Missing and 11 partials ⚠️
internal/source/kafka/mocks/consumer.go 35.13% 24 Missing ⚠️
internal/source/kafka/mocks/partition.go 82.97% 8 Missing ⚠️
internal/source/kafka/conn.go 0.00% 5 Missing ⚠️
internal/source/kafka/consumer.go 62.50% 2 Missing and 1 partial ⚠️
internal/source/kafka/payload.go 77.77% 1 Missing and 1 partial ⚠️

❗ Your organization needs to install the Codecov GitHub app to enable full functionality.

Additional details and impacted files
@@            Coverage Diff             @@
##           master     #824      +/-   ##
==========================================
- Coverage   76.21%   75.73%   -0.49%     
==========================================
  Files         219      224       +5     
  Lines       10634    10888     +254     
==========================================
+ Hits         8105     8246     +141     
- Misses       1812     1908      +96     
- Partials      717      734      +17     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

@sravotto sravotto force-pushed the sr8_kafka_offset branch 2 times, most recently from 3ccf8fa to 4d564ec Compare April 16, 2024 19:27
@sravotto sravotto marked this pull request as ready for review April 16, 2024 19:43
Copy link
Member

@BramGruneir BramGruneir left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code looks good overall, just come comments.

Reviewed 6 of 10 files at r1, 7 of 7 files at r2, all commit messages.
Reviewable status: all files reviewed, 10 unresolved discussions (waiting on @bobvawter, @Github-advanced-security[bot], and @sravotto)


internal/source/kafka/config.go line 100 at r2 (raw file):

	f.StringVar(&c.MinTimestamp, "minTimestamp", "",
		"only accept unprocessed messages at or newer than this timestamp; this is an inclusive lower limit")
	f.DurationVar(&c.ResolvedInterval, "resolvedInterval", 5*time.Second, "interval between two resolved timestamps")

Can you add more details here please. Add a warning that this MUST match the interval set in the CDC statement. This is a weird configuration issue that people will mess up.

Call out the specific variable. For example, do you mean min_checkpoint_frequency or resolved?

Also, rename this config parameter to exactly match the name of the one it needs to match.


internal/source/kafka/consumer.go line 162 at r2 (raw file):

	var payload payload
	dec := json.NewDecoder(bytes.NewReader(msg.Value))
	dec.UseNumber()

Why is this no longer needed?


internal/source/kafka/metrics.go line 26 at r2 (raw file):

var (
	mutationsErrorCount = promauto.NewCounterVec(prometheus.CounterOpts{
		Name: "kafka_mutations_error_count",

Please add these metrics to the grafana dashboard.


internal/source/kafka/offset.go line 108 at r1 (raw file):

Previously, github-advanced-security[bot] wrote…

Useless assignment to local variable

This definition of offset is never used.

Show more details

please fix this


internal/source/kafka/offset.go line 67 at r2 (raw file):

func (o *offsetSeeker) GetOffsets(topics []string, min hlc.Time) ([]*partitionState, error) {
	res := make([]*partitionState, 0)
	// TODO (silvano): make this parallel

Please add a github issue to track this.


internal/source/kafka/offset.go line 110 at r2 (raw file):

	last := loghead
	var offset int64
	// Looking for a offset that is reasonably just before the given min

nit: for an offset


internal/source/kafka/offset.go line 144 at r2 (raw file):

			return 0, errors.WithStack(err)
		}
		if offset != sarama.OffsetOldest {

Why the break here? Just add a comment.


internal/source/kafka/offset.go line 174 at r2 (raw file):

				return 0, errors.Errorf("consumer for %s closed", topic)
			}
			if msg.Offset >= offsets.max {

Please add some comments so I can understand the logic here.


internal/source/kafka/payload.go line 37 at r2 (raw file):

// asPayload extracts the mutation payload from a Kafka consumer message.
func asPayload(msg *sarama.ConsumerMessage) (*payload, error) {

Add unit test please. I know it will be small, but still worth doing.


internal/source/kafka/mocks/client.go line 73 at r1 (raw file):

Previously, github-advanced-security[bot] wrote…

Expression has no effect

This expression has no effect.

Show more details

fix please

@sravotto sravotto force-pushed the sr8_kafka_offset branch 2 times, most recently from 7a5bd12 to 280d357 Compare April 22, 2024 19:15
Copy link
Contributor Author

@sravotto sravotto left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please take another look.

Reviewable status: 9 of 14 files reviewed, 9 unresolved discussions (waiting on @bobvawter, @BramGruneir, and @Github-advanced-security[bot])


internal/source/kafka/config.go line 100 at r2 (raw file):

Previously, BramGruneir (Bram Gruneir) wrote…

Can you add more details here please. Add a warning that this MUST match the interval set in the CDC statement. This is a weird configuration issue that people will mess up.

Call out the specific variable. For example, do you mean min_checkpoint_frequency or resolved?

Also, rename this config parameter to exactly match the name of the one it needs to match.

Done. There are 2 settings in the changefeeds that need to be set. This is just an optimization, so it's not a MUST.


internal/source/kafka/consumer.go line 162 at r2 (raw file):

Previously, BramGruneir (Bram Gruneir) wrote…

Why is this no longer needed?

moved to asPayload.


internal/source/kafka/metrics.go line 26 at r2 (raw file):

Previously, BramGruneir (Bram Gruneir) wrote…

Please add these metrics to the grafana dashboard.

Will do this later
#829


internal/source/kafka/offset.go line 108 at r1 (raw file):

Previously, github-advanced-security[bot] wrote…

Useless assignment to local variable

This definition of offset is never used.

Show more details

Done.


internal/source/kafka/offset.go line 67 at r2 (raw file):

Previously, BramGruneir (Bram Gruneir) wrote…

Please add a github issue to track this.

Done. #830


internal/source/kafka/offset.go line 144 at r2 (raw file):

Previously, BramGruneir (Bram Gruneir) wrote…

Why the break here? Just add a comment.

Done.


internal/source/kafka/offset.go line 174 at r2 (raw file):

Previously, BramGruneir (Bram Gruneir) wrote…

Please add some comments so I can understand the logic here.

Done. Turns out that there was a bug. If we don't find a suitable offset we should return offsetoldest.


internal/source/kafka/payload.go line 37 at r2 (raw file):

Previously, BramGruneir (Bram Gruneir) wrote…

Add unit test please. I know it will be small, but still worth doing.

Done.


internal/source/kafka/mocks/client.go line 73 at r1 (raw file):

Previously, github-advanced-security[bot] wrote…

Expression has no effect

This expression has no effect.

Show more details

Done.

Copy link
Member

@BramGruneir BramGruneir left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewed 5 of 5 files at r3, all commit messages.
Reviewable status: all files reviewed, 6 unresolved discussions (waiting on @bobvawter, @Github-advanced-security[bot], and @sravotto)


internal/source/kafka/config.go line 100 at r2 (raw file):

Previously, sravotto (silvano) wrote…

Done. There are 2 settings in the changefeeds that need to be set. This is just an optimization, so it's not a MUST.

I'm still confused over this. The two values are linked. Do you mean the lowest of the two or or the largest?


internal/source/kafka/offset.go line 67 at r2 (raw file):

Previously, sravotto (silvano) wrote…

Done. #830

Please link it to this todo

// TODO (silvano) #830: make this parallel

@sravotto sravotto force-pushed the sr8_kafka_offset branch 3 times, most recently from bf2f0db to 2d33519 Compare April 30, 2024 16:24
Copy link
Contributor Author

@sravotto sravotto left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me know if it is clearer...

Reviewable status: 12 of 14 files reviewed, 6 unresolved discussions (waiting on @bobvawter, @BramGruneir, and @Github-advanced-security[bot])


internal/source/kafka/config.go line 100 at r2 (raw file):

Previously, BramGruneir (Bram Gruneir) wrote…

I'm still confused over this. The two values are linked. Do you mean the lowest of the two or or the largest?

I added more information, however users should look at the change feed documentation.


internal/source/kafka/offset.go line 67 at r2 (raw file):

Previously, BramGruneir (Bram Gruneir) wrote…

Please link it to this todo

// TODO (silvano) #830: make this parallel

Done.

This change improves the logic to find an adequate offset to start the event replay.
We return the offset of the most recent resolved timestamp message that is stricly
less than the minimum timestamp provided by the user.

A new set of metrics have been added to track the total number of messages
that have been replayed, and the number of messages read to find the resolved timestamp.

A new mock client and consumer have been introduced to facilitate testing.

Fixes #779, #778.
Copy link
Member

@BramGruneir BramGruneir left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

:lgtm:

Reviewed 2 of 2 files at r4, all commit messages.
Reviewable status: all files reviewed, 2 unresolved discussions (waiting on @bobvawter and @Github-advanced-security[bot])

@sravotto sravotto added this pull request to the merge queue Apr 30, 2024
Merged via the queue into master with commit a2144ad Apr 30, 2024
47 of 48 checks passed
@sravotto sravotto deleted the sr8_kafka_offset branch April 30, 2024 20:23
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants