Skip to content

Commit

Permalink
SeriesResponse: Use memory pooling
Browse files Browse the repository at this point in the history
Signed-off-by: Arve Knudsen <[email protected]>
  • Loading branch information
aknuds1 committed Dec 31, 2024
1 parent 5795543 commit 0d90028
Show file tree
Hide file tree
Showing 13 changed files with 1,448 additions and 280 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module github.com/grafana/mimir

go 1.22.7
go 1.23

// Please note that this directive is ignored when building with the Mimir build image,
// that will always use its bundled toolchain.
Expand Down
2 changes: 1 addition & 1 deletion pkg/querier/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func convertMatchersToLabelMatcher(matchers []*labels.Matcher) []storepb.LabelMa

// Implementation of storage.SeriesSet, based on individual responses from store client.
type blockQuerierSeriesSet struct {
series []*storepb.Series
series []*storepb.CustomSeries

// next response to process
next int
Expand Down
13 changes: 3 additions & 10 deletions pkg/querier/block_streaming.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ type storeGatewayStreamReader struct {
log log.Logger

chunkCountEstimateChan chan int
seriesChunksChan chan *storepb.StreamingChunksBatch
seriesChunksChan chan *storepb.CustomStreamingChunksBatch
chunksBatch []*storepb.StreamingChunks
errorChan chan error
err error
Expand Down Expand Up @@ -189,7 +189,7 @@ func (s *storeGatewayStreamReader) Close() {
func (s *storeGatewayStreamReader) StartBuffering() {
// Important: to ensure that the goroutine does not become blocked and leak, the goroutine must only ever write to errorChan at most once.
s.errorChan = make(chan error, 1)
s.seriesChunksChan = make(chan *storepb.StreamingChunksBatch, 1)
s.seriesChunksChan = make(chan *storepb.CustomStreamingChunksBatch, 1)
s.chunkCountEstimateChan = make(chan int, 1)

go func() {
Expand Down Expand Up @@ -244,7 +244,6 @@ func (s *storeGatewayStreamReader) readStream(log *spanlogger.SpanLogger) error
}

estimate := msg.GetStreamingChunksEstimate()
msg.FreeBuffer()
if estimate == nil {
return fmt.Errorf("expected to receive chunks estimate, but got message of type %T", msg.Result)
}
Expand All @@ -262,18 +261,15 @@ func (s *storeGatewayStreamReader) readStream(log *spanlogger.SpanLogger) error

batch := msg.GetStreamingChunks()
if batch == nil {
msg.FreeBuffer()
return fmt.Errorf("expected to receive streaming chunks, but got message of type %T", msg.Result)
}

if len(batch.Series) == 0 {
msg.FreeBuffer()
continue
}

totalSeries += len(batch.Series)
if totalSeries > s.expectedSeriesCount {
msg.FreeBuffer()
return fmt.Errorf("expected to receive only %v series, but received at least %v series", s.expectedSeriesCount, totalSeries)
}

Expand All @@ -287,11 +283,9 @@ func (s *storeGatewayStreamReader) readStream(log *spanlogger.SpanLogger) error
}
totalChunks += numChunks
if err := s.queryLimiter.AddChunks(numChunks); err != nil {
msg.FreeBuffer()
return err
}
if err := s.queryLimiter.AddChunkBytes(chunkBytes); err != nil {
msg.FreeBuffer()
return err
}

Expand All @@ -309,15 +303,14 @@ func (s *storeGatewayStreamReader) readStream(log *spanlogger.SpanLogger) error
safeSeries = append(safeSeries, &safe)
}
batch.Series = safeSeries
msg.FreeBuffer()

if err := s.sendBatch(batch); err != nil {
return err
}
}
}

func (s *storeGatewayStreamReader) sendBatch(c *storepb.StreamingChunksBatch) error {
func (s *storeGatewayStreamReader) sendBatch(c *storepb.CustomStreamingChunksBatch) error {
if err := s.ctx.Err(); err != nil {
// If the context is already cancelled, stop now for the same reasons as below.
// We do this extra check here to ensure that we don't get unlucky and continue to send to seriesChunksChan even if
Expand Down
168 changes: 111 additions & 57 deletions pkg/querier/block_streaming_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,28 +261,42 @@ func TestStoreGatewayStreamReader_HappyPaths(t *testing.T) {
"single series per batch": {
messages: batchesToMessages(
40,
storepb.StreamingChunksBatch{Series: []*storepb.StreamingChunks{{SeriesIndex: 0, Chunks: series0}}},
storepb.StreamingChunksBatch{Series: []*storepb.StreamingChunks{{SeriesIndex: 1, Chunks: series1}}},
storepb.StreamingChunksBatch{Series: []*storepb.StreamingChunks{{SeriesIndex: 2, Chunks: series2}}},
storepb.StreamingChunksBatch{Series: []*storepb.StreamingChunks{{SeriesIndex: 3, Chunks: series3}}},
storepb.StreamingChunksBatch{Series: []*storepb.StreamingChunks{{SeriesIndex: 4, Chunks: series4}}},
storepb.CustomStreamingChunksBatch{
StreamingChunksBatch: &storepb.StreamingChunksBatch{Series: []*storepb.StreamingChunks{{SeriesIndex: 0, Chunks: series0}}},
},
storepb.CustomStreamingChunksBatch{
StreamingChunksBatch: &storepb.StreamingChunksBatch{Series: []*storepb.StreamingChunks{{SeriesIndex: 1, Chunks: series1}}},
},
storepb.CustomStreamingChunksBatch{
StreamingChunksBatch: &storepb.StreamingChunksBatch{Series: []*storepb.StreamingChunks{{SeriesIndex: 2, Chunks: series2}}},
},
storepb.CustomStreamingChunksBatch{
StreamingChunksBatch: &storepb.StreamingChunksBatch{Series: []*storepb.StreamingChunks{{SeriesIndex: 3, Chunks: series3}}},
},
storepb.CustomStreamingChunksBatch{
StreamingChunksBatch: &storepb.StreamingChunksBatch{Series: []*storepb.StreamingChunks{{SeriesIndex: 4, Chunks: series4}}},
},
),
expectedChunksEstimate: 40,
},
"multiple series per batch": {
messages: batchesToMessages(
40,
storepb.StreamingChunksBatch{
Series: []*storepb.StreamingChunks{
{SeriesIndex: 0, Chunks: series0},
{SeriesIndex: 1, Chunks: series1},
{SeriesIndex: 2, Chunks: series2},
storepb.CustomStreamingChunksBatch{
StreamingChunksBatch: &storepb.StreamingChunksBatch{
Series: []*storepb.StreamingChunks{
{SeriesIndex: 0, Chunks: series0},
{SeriesIndex: 1, Chunks: series1},
{SeriesIndex: 2, Chunks: series2},
},
},
},
storepb.StreamingChunksBatch{
Series: []*storepb.StreamingChunks{
{SeriesIndex: 3, Chunks: series3},
{SeriesIndex: 4, Chunks: series4},
storepb.CustomStreamingChunksBatch{
StreamingChunksBatch: &storepb.StreamingChunksBatch{
Series: []*storepb.StreamingChunks{
{SeriesIndex: 3, Chunks: series3},
{SeriesIndex: 4, Chunks: series4},
},
},
},
),
Expand All @@ -291,21 +305,25 @@ func TestStoreGatewayStreamReader_HappyPaths(t *testing.T) {
"empty batches": {
messages: batchesToMessages(
40,
storepb.StreamingChunksBatch{
Series: []*storepb.StreamingChunks{
{SeriesIndex: 0, Chunks: series0},
{SeriesIndex: 1, Chunks: series1},
{SeriesIndex: 2, Chunks: series2},
storepb.CustomStreamingChunksBatch{
StreamingChunksBatch: &storepb.StreamingChunksBatch{
Series: []*storepb.StreamingChunks{
{SeriesIndex: 0, Chunks: series0},
{SeriesIndex: 1, Chunks: series1},
{SeriesIndex: 2, Chunks: series2},
},
},
},
storepb.StreamingChunksBatch{},
storepb.StreamingChunksBatch{
Series: []*storepb.StreamingChunks{
{SeriesIndex: 3, Chunks: series3},
{SeriesIndex: 4, Chunks: series4},
storepb.CustomStreamingChunksBatch{},
storepb.CustomStreamingChunksBatch{
StreamingChunksBatch: &storepb.StreamingChunksBatch{
Series: []*storepb.StreamingChunks{
{SeriesIndex: 3, Chunks: series3},
{SeriesIndex: 4, Chunks: series4},
},
},
},
storepb.StreamingChunksBatch{},
storepb.CustomStreamingChunksBatch{},
),
expectedChunksEstimate: 40,
},
Expand Down Expand Up @@ -375,10 +393,19 @@ func TestStoreGatewayStreamReader_AbortsWhenParentContextCancelled(t *testing.T)
for name, testCase := range testCases {
t.Run(name, func(t *testing.T) {
// Create multiple batches to ensure that the buffering goroutine becomes blocked waiting to send further chunks to GetChunks().
batches := []storepb.StreamingChunksBatch{
{Series: []*storepb.StreamingChunks{{SeriesIndex: 0, Chunks: []storepb.AggrChunk{createChunk(t, 1000, 1.23)}}}},
{Series: []*storepb.StreamingChunks{{SeriesIndex: 1, Chunks: []storepb.AggrChunk{createChunk(t, 1000, 4.56)}}}},
{Series: []*storepb.StreamingChunks{{SeriesIndex: 2, Chunks: []storepb.AggrChunk{createChunk(t, 1000, 7.89)}}}},
batches := []storepb.CustomStreamingChunksBatch{
{
StreamingChunksBatch: &storepb.StreamingChunksBatch{
Series: []*storepb.StreamingChunks{{SeriesIndex: 0, Chunks: []storepb.AggrChunk{createChunk(t, 1000, 1.23)}}}},
},
{
StreamingChunksBatch: &storepb.StreamingChunksBatch{
Series: []*storepb.StreamingChunks{{SeriesIndex: 1, Chunks: []storepb.AggrChunk{createChunk(t, 1000, 4.56)}}}},
},
{
StreamingChunksBatch: &storepb.StreamingChunksBatch{
Series: []*storepb.StreamingChunks{{SeriesIndex: 2, Chunks: []storepb.AggrChunk{createChunk(t, 1000, 7.89)}}}},
},
}

streamCtx := context.Background()
Expand All @@ -402,10 +429,19 @@ func TestStoreGatewayStreamReader_DoesNotAbortWhenStreamContextCancelled(t *test
test.VerifyNoLeak(t)

// Create multiple batches to ensure that the buffering goroutine becomes blocked waiting to send further chunks to GetChunks().
batches := []storepb.StreamingChunksBatch{
{Series: []*storepb.StreamingChunks{{SeriesIndex: 0, Chunks: []storepb.AggrChunk{createChunk(t, 1000, 1.23)}}}},
{Series: []*storepb.StreamingChunks{{SeriesIndex: 1, Chunks: []storepb.AggrChunk{createChunk(t, 1000, 4.56)}}}},
{Series: []*storepb.StreamingChunks{{SeriesIndex: 2, Chunks: []storepb.AggrChunk{createChunk(t, 1000, 7.89)}}}},
batches := []storepb.CustomStreamingChunksBatch{
{
StreamingChunksBatch: &storepb.StreamingChunksBatch{
Series: []*storepb.StreamingChunks{{SeriesIndex: 0, Chunks: []storepb.AggrChunk{createChunk(t, 1000, 1.23)}}}},
},
{
StreamingChunksBatch: &storepb.StreamingChunksBatch{
Series: []*storepb.StreamingChunks{{SeriesIndex: 1, Chunks: []storepb.AggrChunk{createChunk(t, 1000, 4.56)}}}},
},
{
StreamingChunksBatch: &storepb.StreamingChunksBatch{
Series: []*storepb.StreamingChunks{{SeriesIndex: 2, Chunks: []storepb.AggrChunk{createChunk(t, 1000, 7.89)}}}},
},
}

streamCtx, cancel := context.WithCancel(context.Background())
Expand All @@ -430,8 +466,11 @@ func TestStoreGatewayStreamReader_DoesNotAbortWhenStreamContextCancelled(t *test
}

func TestStoreGatewayStreamReader_ReadingSeriesOutOfOrder(t *testing.T) {
batches := []storepb.StreamingChunksBatch{
{Series: []*storepb.StreamingChunks{{SeriesIndex: 0, Chunks: []storepb.AggrChunk{createChunk(t, 1000, 1.23)}}}},
batches := []storepb.CustomStreamingChunksBatch{
{
StreamingChunksBatch: &storepb.StreamingChunksBatch{
Series: []*storepb.StreamingChunks{{SeriesIndex: 0, Chunks: []storepb.AggrChunk{createChunk(t, 1000, 1.23)}}}},
},
}

ctx := context.Background()
Expand All @@ -447,8 +486,11 @@ func TestStoreGatewayStreamReader_ReadingSeriesOutOfOrder(t *testing.T) {

func TestStoreGatewayStreamReader_ReadingMoreSeriesThanAvailable(t *testing.T) {
firstSeries := []storepb.AggrChunk{createChunk(t, 1000, 1.23)}
batches := []storepb.StreamingChunksBatch{
{Series: []*storepb.StreamingChunks{{SeriesIndex: 0, Chunks: firstSeries}}},
batches := []storepb.CustomStreamingChunksBatch{
{
StreamingChunksBatch: &storepb.StreamingChunksBatch{
Series: []*storepb.StreamingChunks{{SeriesIndex: 0, Chunks: firstSeries}}},
},
}

ctx := context.Background()
Expand All @@ -475,8 +517,11 @@ func TestStoreGatewayStreamReader_ReadingMoreSeriesThanAvailable(t *testing.T) {

func TestStoreGatewayStreamReader_ReceivedFewerSeriesThanExpected(t *testing.T) {
firstSeries := []storepb.AggrChunk{createChunk(t, 1000, 1.23)}
batches := []storepb.StreamingChunksBatch{
{Series: []*storepb.StreamingChunks{{SeriesIndex: 0, Chunks: firstSeries}}},
batches := []storepb.CustomStreamingChunksBatch{
{
StreamingChunksBatch: &storepb.StreamingChunksBatch{
Series: []*storepb.StreamingChunks{{SeriesIndex: 0, Chunks: firstSeries}}},
},
}

ctx := context.Background()
Expand Down Expand Up @@ -505,26 +550,32 @@ func TestStoreGatewayStreamReader_ReceivedFewerSeriesThanExpected(t *testing.T)
}

func TestStoreGatewayStreamReader_ReceivedMoreSeriesThanExpected(t *testing.T) {
testCases := map[string][]storepb.StreamingChunksBatch{
testCases := map[string][]storepb.CustomStreamingChunksBatch{
"extra series received as part of batch for last expected series": {
{
Series: []*storepb.StreamingChunks{
{SeriesIndex: 0, Chunks: []storepb.AggrChunk{createChunk(t, 1000, 1.23)}},
{SeriesIndex: 1, Chunks: []storepb.AggrChunk{createChunk(t, 1000, 4.56)}},
{SeriesIndex: 2, Chunks: []storepb.AggrChunk{createChunk(t, 1000, 7.89)}},
StreamingChunksBatch: &storepb.StreamingChunksBatch{
Series: []*storepb.StreamingChunks{
{SeriesIndex: 0, Chunks: []storepb.AggrChunk{createChunk(t, 1000, 1.23)}},
{SeriesIndex: 1, Chunks: []storepb.AggrChunk{createChunk(t, 1000, 4.56)}},
{SeriesIndex: 2, Chunks: []storepb.AggrChunk{createChunk(t, 1000, 7.89)}},
},
},
},
},
"extra series received as part of batch after batch containing last expected series": {
{
Series: []*storepb.StreamingChunks{
{SeriesIndex: 0, Chunks: []storepb.AggrChunk{createChunk(t, 1000, 1.23)}},
StreamingChunksBatch: &storepb.StreamingChunksBatch{
Series: []*storepb.StreamingChunks{
{SeriesIndex: 0, Chunks: []storepb.AggrChunk{createChunk(t, 1000, 1.23)}},
},
},
},
{
Series: []*storepb.StreamingChunks{
{SeriesIndex: 1, Chunks: []storepb.AggrChunk{createChunk(t, 1000, 4.56)}},
{SeriesIndex: 2, Chunks: []storepb.AggrChunk{createChunk(t, 1000, 7.89)}},
StreamingChunksBatch: &storepb.StreamingChunksBatch{
Series: []*storepb.StreamingChunks{
{SeriesIndex: 1, Chunks: []storepb.AggrChunk{createChunk(t, 1000, 4.56)}},
{SeriesIndex: 2, Chunks: []storepb.AggrChunk{createChunk(t, 1000, 7.89)}},
},
},
},
},
Expand Down Expand Up @@ -581,12 +632,15 @@ func TestStoreGatewayStreamReader_ChunksLimits(t *testing.T) {

for name, testCase := range testCases {
t.Run(name, func(t *testing.T) {
batches := []storepb.StreamingChunksBatch{
{Series: []*storepb.StreamingChunks{{SeriesIndex: 0, Chunks: []storepb.AggrChunk{
createChunk(t, 1000, 1.23),
createChunk(t, 1100, 1.23),
createChunk(t, 1200, 1.23),
}}}},
batches := []storepb.CustomStreamingChunksBatch{
{
StreamingChunksBatch: &storepb.StreamingChunksBatch{
Series: []*storepb.StreamingChunks{{SeriesIndex: 0, Chunks: []storepb.AggrChunk{
createChunk(t, 1000, 1.23),
createChunk(t, 1100, 1.23),
createChunk(t, 1200, 1.23),
}}}},
},
}

ctx := context.Background()
Expand Down Expand Up @@ -636,13 +690,13 @@ func createChunk(t *testing.T, time int64, value float64) storepb.AggrChunk {
}
}

func batchesToMessages(estimatedChunks uint64, batches ...storepb.StreamingChunksBatch) []*storepb.SeriesResponse {
func batchesToMessages(estimatedChunks uint64, batches ...storepb.CustomStreamingChunksBatch) []*storepb.SeriesResponse {
messages := make([]*storepb.SeriesResponse, len(batches)+1)

messages[0] = storepb.NewStreamingChunksEstimate(estimatedChunks)

for i, b := range batches {
messages[i+1] = storepb.NewStreamingChunksResponse(&b)
messages[i+1] = storepb.NewStreamingChunksResponse(b)
}

return messages
Expand Down
Loading

0 comments on commit 0d90028

Please sign in to comment.