Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: [Rhythm] Full RF1 read path #4478

Draft
wants to merge 3 commits into
base: main-rhythm
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 14 additions & 1 deletion modules/frontend/search_sharder.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ type SearchSharderConfig struct {
QueryIngestersUntil time.Duration `yaml:"query_ingesters_until,omitempty"`
IngesterShards int `yaml:"ingester_shards,omitempty"`
MaxSpansPerSpanSet uint32 `yaml:"max_spans_per_span_set,omitempty"`
RF1After time.Time `yaml:"rf1_after"`
}

type asyncSearchSharder struct {
Expand Down Expand Up @@ -143,13 +144,25 @@ func (s asyncSearchSharder) RoundTrip(pipelineRequest pipeline.Request) (pipelin

// blockMetas returns all relevant blockMetas given a start/end
func (s *asyncSearchSharder) blockMetas(start, end int64, tenantID string) []*backend.BlockMeta {
var rfCheck func(m *backend.BlockMeta) bool
if s.cfg.RF1After.IsZero() {
rfCheck = func(m *backend.BlockMeta) bool {
return m.ReplicationFactor == backend.DefaultReplicationFactor
}
} else {
rfCheck = func(m *backend.BlockMeta) bool {
return (m.ReplicationFactor == backend.DefaultReplicationFactor && m.StartTime.Before(s.cfg.RF1After)) ||
(m.ReplicationFactor == 1 && m.StartTime.After(s.cfg.RF1After))
}
}

// reduce metas to those in the requested range
allMetas := s.reader.BlockMetas(tenantID)
metas := make([]*backend.BlockMeta, 0, len(allMetas)/50) // divide by 50 for luck
for _, m := range allMetas {
if m.StartTime.Unix() <= end &&
m.EndTime.Unix() >= start &&
m.ReplicationFactor == backend.DefaultReplicationFactor { // This check skips generator blocks (RF=1)
rfCheck(m) {
metas = append(metas, m)
}
}
Expand Down
2 changes: 2 additions & 0 deletions modules/ingester/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ type Config struct {
CompleteBlockTimeout time.Duration `yaml:"complete_block_timeout"`
OverrideRingKey string `yaml:"override_ring_key"`
FlushAllOnShutdown bool `yaml:"flush_all_on_shutdown"`
FlushObjectStorage bool `yaml:"flush_object_storage"`

// This config is dynamically injected because defined outside the ingester config.
DedicatedColumns backend.DedicatedColumns `yaml:"-"`
Expand All @@ -49,6 +50,7 @@ func (cfg *Config) RegisterFlagsAndApplyDefaults(prefix string, f *flag.FlagSet)
cfg.FlushCheckPeriod = 10 * time.Second
cfg.FlushOpTimeout = 5 * time.Minute
cfg.FlushAllOnShutdown = false
cfg.FlushObjectStorage = true

f.DurationVar(&cfg.MaxTraceIdle, prefix+".trace-idle-period", 10*time.Second, "Duration after which to consider a trace complete if no spans have been received")
f.DurationVar(&cfg.MaxBlockDuration, prefix+".max-block-duration", 30*time.Minute, "Maximum duration which the head block can be appended to before cutting it.")
Expand Down
12 changes: 7 additions & 5 deletions modules/ingester/flush.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,11 +286,13 @@ func (i *Ingester) handleComplete(ctx context.Context, op *flushOp) (retry bool,

// add a flushOp for the block we just completed
// No delay
i.enqueue(&flushOp{
kind: opKindFlush,
userID: instance.instanceID,
blockID: op.blockID,
}, false)
if i.cfg.FlushObjectStorage {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to mark blocks flushed or change some logic so complete blocks are cleared at some point.

i.enqueue(&flushOp{
kind: opKindFlush,
userID: instance.instanceID,
blockID: op.blockID,
}, false)
}

return false, nil
}
Expand Down
16 changes: 9 additions & 7 deletions modules/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -489,13 +489,15 @@ func (i *Ingester) rediscoverLocalBlocks() error {
}

// Requeue needed flushes
for _, b := range newBlocks {
if b.FlushedTime().IsZero() {
i.enqueue(&flushOp{
kind: opKindFlush,
userID: t,
blockID: (uuid.UUID)(b.BlockMeta().BlockID),
}, i.replayJitter)
if i.cfg.FlushObjectStorage {
for _, b := range newBlocks {
if b.FlushedTime().IsZero() {
i.enqueue(&flushOp{
kind: opKindFlush,
userID: t,
blockID: (uuid.UUID)(b.BlockMeta().BlockID),
}, i.replayJitter)
}
}
}
}
Expand Down
1 change: 1 addition & 0 deletions modules/querier/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ type SearchConfig struct {

type TraceByIDConfig struct {
QueryTimeout time.Duration `yaml:"query_timeout"`
RF1After time.Time `yaml:"rf1_after"`
}

type MetricsConfig struct {
Expand Down
3 changes: 2 additions & 1 deletion modules/querier/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,8 @@ func (q *Querier) FindTraceByID(ctx context.Context, req *tempopb.TraceByIDReque
))

opts := common.DefaultSearchOptionsWithMaxBytes(maxBytes)
opts.BlockReplicationFactor = backend.DefaultReplicationFactor
opts.RF1After = q.cfg.TraceByID.RF1After

partialTraces, blockErrs, err := q.store.Find(ctx, userID, req.TraceID, req.BlockStart, req.BlockEnd, timeStart, timeEnd, opts)
if err != nil {
retErr := fmt.Errorf("error querying store in Querier.FindTraceByID: %w", err)
Expand Down
24 changes: 12 additions & 12 deletions tempodb/encoding/common/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package common

import (
"context"
"time"

"github.com/go-kit/log"

Expand Down Expand Up @@ -35,14 +36,14 @@ type Searcher interface {
}

type SearchOptions struct {
ChunkSizeBytes uint32 // Buffer size to read from backend storage.
StartPage int // Controls searching only a subset of the block. Which page to begin searching at.
TotalPages int // Controls searching only a subset of the block. How many pages to search.
MaxBytes int // Max allowable trace size in bytes. Traces exceeding this are not searched.
PrefetchTraceCount int // How many traces to prefetch async.
ReadBufferCount int
ReadBufferSize int
BlockReplicationFactor int // Only blocks with this replication factor will be searched. Set to 1 to search generator blocks (RF=1).
ChunkSizeBytes uint32 // Buffer size to read from backend storage.
StartPage int // Controls searching only a subset of the block. Which page to begin searching at.
TotalPages int // Controls searching only a subset of the block. How many pages to search.
MaxBytes int // Max allowable trace size in bytes. Traces exceeding this are not searched.
PrefetchTraceCount int // How many traces to prefetch async.
ReadBufferCount int
ReadBufferSize int
RF1After time.Time // Only blocks with this replication factor == 1 will be searched after this date.
}

// DefaultSearchOptions is used in a lot of places such as local ingester searches. It is important
Expand All @@ -52,10 +53,9 @@ type SearchOptions struct {
// tempodb.SearchConfig{}.ApplyToOptions(&searchOpts). we should consolidate these.
func DefaultSearchOptions() SearchOptions {
return SearchOptions{
ReadBufferCount: 32,
ReadBufferSize: 1024 * 1024,
ChunkSizeBytes: 4 * 1024 * 1024,
BlockReplicationFactor: backend.DefaultReplicationFactor,
ReadBufferCount: 32,
ReadBufferSize: 1024 * 1024,
ChunkSizeBytes: 4 * 1024 * 1024,
}
}

Expand Down
17 changes: 11 additions & 6 deletions tempodb/tempodb.go
Original file line number Diff line number Diff line change
Expand Up @@ -309,13 +309,13 @@ func (rw *readerWriter) Find(ctx context.Context, tenantID string, id common.ID,
compactedBlocksSearched := 0

for _, b := range blocklist {
if includeBlock(b, id, blockStartBytes, blockEndBytes, timeStart, timeEnd, opts.BlockReplicationFactor) {
if includeBlock(b, id, blockStartBytes, blockEndBytes, timeStart, timeEnd, opts.RF1After) {
copiedBlocklist = append(copiedBlocklist, b)
blocksSearched++
}
}
for _, c := range compactedBlocklist {
if includeCompactedBlock(c, id, blockStartBytes, blockEndBytes, rw.cfg.BlocklistPoll, timeStart, timeEnd, opts.BlockReplicationFactor) {
if includeCompactedBlock(c, id, blockStartBytes, blockEndBytes, rw.cfg.BlocklistPoll, timeStart, timeEnd, opts.RF1After) {
copiedBlocklist = append(copiedBlocklist, &c.BlockMeta)
compactedBlocksSearched++
}
Expand Down Expand Up @@ -616,7 +616,7 @@ func (rw *readerWriter) pollBlocklist() {
}

// includeBlock indicates whether a given block should be included in a backend search
func includeBlock(b *backend.BlockMeta, _ common.ID, blockStart, blockEnd []byte, timeStart, timeEnd int64, replicationFactor int) bool {
func includeBlock(b *backend.BlockMeta, _ common.ID, blockStart, blockEnd []byte, timeStart, timeEnd int64, rf1After time.Time) bool {
// todo: restore this functionality once it works. min/max ids are currently not recorded
// https://github.com/grafana/tempo/issues/1903
// correctly in a block
Expand All @@ -637,16 +637,21 @@ func includeBlock(b *backend.BlockMeta, _ common.ID, blockStart, blockEnd []byte
return false
}

return b.ReplicationFactor == uint32(replicationFactor)
if rf1After.IsZero() {
return b.ReplicationFactor == backend.DefaultReplicationFactor
}

return (b.StartTime.Before(rf1After) && b.ReplicationFactor == backend.DefaultReplicationFactor) ||
(b.StartTime.After(rf1After) && b.ReplicationFactor == 1)
}

// if block is compacted within lookback period, and is within shard ranges, include it in search
func includeCompactedBlock(c *backend.CompactedBlockMeta, id common.ID, blockStart, blockEnd []byte, poll time.Duration, timeStart, timeEnd int64, replicationFactor int) bool {
func includeCompactedBlock(c *backend.CompactedBlockMeta, id common.ID, blockStart, blockEnd []byte, poll time.Duration, timeStart, timeEnd int64, rf1After time.Time) bool {
lookback := time.Now().Add(-(2 * poll))
if c.CompactedTime.Before(lookback) {
return false
}
return includeBlock(&c.BlockMeta, id, blockStart, blockEnd, timeStart, timeEnd, replicationFactor)
return includeBlock(&c.BlockMeta, id, blockStart, blockEnd, timeStart, timeEnd, rf1After)
}

// createLegacyCache uses the config to return a cache and a list of roles.
Expand Down
4 changes: 2 additions & 2 deletions tempodb/tempodb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -416,7 +416,7 @@ func TestIncludeBlock(t *testing.T) {
e, err := tc.blockEnd.MarshalBinary()
require.NoError(t, err)

assert.Equal(t, tc.expected, includeBlock(tc.meta, tc.searchID, s, e, tc.start, tc.end, 0))
assert.Equal(t, tc.expected, includeBlock(tc.meta, tc.searchID, s, e, tc.start, tc.end, time.Time{}))
})
}
}
Expand Down Expand Up @@ -487,7 +487,7 @@ func TestIncludeCompactedBlock(t *testing.T) {
e, err := tc.blockEnd.MarshalBinary()
require.NoError(t, err)

assert.Equal(t, tc.expected, includeCompactedBlock(tc.meta, tc.searchID, s, e, blocklistPoll, tc.start, tc.end, 0))
assert.Equal(t, tc.expected, includeCompactedBlock(tc.meta, tc.searchID, s, e, blocklistPoll, tc.start, tc.end, time.Time{}))
})
}
}
Expand Down
Loading