From 55e83b43ebce875da5a4da0c7a19cbe919fa833b Mon Sep 17 00:00:00 2001 From: bigsheeper Date: Thu, 26 Dec 2024 15:41:31 +0800 Subject: [PATCH 1/2] fix: Fix index meta mutex contention Signed-off-by: bigsheeper --- internal/datacoord/index_meta.go | 26 +++++++++----------------- 1 file changed, 9 insertions(+), 17 deletions(-) diff --git a/internal/datacoord/index_meta.go b/internal/datacoord/index_meta.go index bc12f4f88da1c..7d903dd914f90 100644 --- a/internal/datacoord/index_meta.go +++ b/internal/datacoord/index_meta.go @@ -27,6 +27,7 @@ import ( "github.com/hashicorp/golang-lru/v2/expirable" "github.com/prometheus/client_golang/prometheus" "github.com/samber/lo" + "go.uber.org/atomic" "go.uber.org/zap" "google.golang.org/protobuf/proto" @@ -64,6 +65,8 @@ type indexMeta struct { // segmentID -> indexID -> segmentIndex segmentIndexes map[UniqueID]map[UniqueID]*model.SegmentIndex + + lastUpdateMetricTime atomic.Time } func newIndexTaskStats(s *model.SegmentIndex) *metricsinfo.IndexTaskStats { @@ -201,6 +204,10 @@ func (m *indexMeta) updateSegIndexMeta(segIdx *model.SegmentIndex, updateFunc fu } func (m *indexMeta) updateIndexTasksMetrics() { + if time.Since(m.lastUpdateMetricTime.Load()) < 120*time.Second { + return + } + defer m.lastUpdateMetricTime.Store(time.Now()) taskMetrics := make(map[UniqueID]map[commonpb.IndexState]int) for _, segIdx := range m.segmentBuildInfo.List() { if segIdx.IsDeleted { @@ -229,6 +236,7 @@ func (m *indexMeta) updateIndexTasksMetrics() { } } } + log.Ctx(m.ctx).Info("update index metric", zap.Int("collectionNum", len(taskMetrics))) } func checkParams(fieldIndex *model.Index, req *indexpb.CreateIndexRequest) bool { @@ -868,7 +876,7 @@ func (m *indexMeta) GetAllSegIndexes() map[int64]*model.SegmentIndex { tasks := m.segmentBuildInfo.List() segIndexes := make(map[int64]*model.SegmentIndex, len(tasks)) for buildID, segIndex := range tasks { - segIndexes[buildID] = model.CloneSegmentIndex(segIndex) + segIndexes[buildID] = segIndex } return segIndexes } @@ -965,22 +973,6 @@ func (m *indexMeta) CheckCleanSegmentIndex(buildID UniqueID) (bool, *model.Segme return true, nil } -func (m *indexMeta) GetMetasByNodeID(nodeID UniqueID) []*model.SegmentIndex { - m.RLock() - defer m.RUnlock() - - metas := make([]*model.SegmentIndex, 0) - for _, segIndex := range m.segmentBuildInfo.List() { - if segIndex.IsDeleted { - continue - } - if nodeID == segIndex.NodeID { - metas = append(metas, model.CloneSegmentIndex(segIndex)) - } - } - return metas -} - func (m *indexMeta) getSegmentsIndexStates(collectionID UniqueID, segmentIDs []UniqueID) map[int64]map[int64]*indexpb.SegmentIndexState { m.RLock() defer m.RUnlock() From 825c0ec61dc80685c83eb2d6931a8c0cf9ec1f93 Mon Sep 17 00:00:00 2001 From: bigsheeper Date: Fri, 27 Dec 2024 23:53:39 +0800 Subject: [PATCH 2/2] enhance: Prevent frequently updating metric Signed-off-by: bigsheeper --- internal/querycoordv2/task/scheduler.go | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/internal/querycoordv2/task/scheduler.go b/internal/querycoordv2/task/scheduler.go index 316f1a552be71..bb2c862b3ca38 100644 --- a/internal/querycoordv2/task/scheduler.go +++ b/internal/querycoordv2/task/scheduler.go @@ -170,7 +170,9 @@ type taskScheduler struct { channelTasks map[replicaChannelIndex]Task processQueue *taskQueue waitQueue *taskQueue - taskStats *expirable.LRU[UniqueID, Task] + + taskStats *expirable.LRU[UniqueID, Task] + lastUpdateMetricTime time.Time } func NewScheduler(ctx context.Context, @@ -285,13 +287,15 @@ func (scheduler *taskScheduler) Add(task Task) error { } scheduler.taskStats.Add(task.ID(), task) - scheduler.updateTaskMetrics() log.Ctx(task.Context()).Info("task added", zap.String("task", task.String())) task.RecordStartTs() return nil } func (scheduler *taskScheduler) updateTaskMetrics() { + if time.Since(scheduler.lastUpdateMetricTime) < 30*time.Second { + return + } segmentGrowNum, segmentReduceNum, segmentMoveNum := 0, 0, 0 channelGrowNum, channelReduceNum, channelMoveNum := 0, 0, 0 for _, task := range scheduler.segmentTasks { @@ -324,6 +328,7 @@ func (scheduler *taskScheduler) updateTaskMetrics() { metrics.QueryCoordTaskNum.WithLabelValues(metrics.ChannelGrowTaskLabel).Set(float64(channelGrowNum)) metrics.QueryCoordTaskNum.WithLabelValues(metrics.ChannelReduceTaskLabel).Set(float64(channelReduceNum)) metrics.QueryCoordTaskNum.WithLabelValues(metrics.ChannelMoveTaskLabel).Set(float64(channelMoveNum)) + scheduler.lastUpdateMetricTime = time.Now() } // check whether the task is valid to add,