Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

search always watch terminal #5946

Open
niuyueyang1996 opened this issue Dec 13, 2024 · 2 comments
Open

search always watch terminal #5946

niuyueyang1996 opened this issue Dec 13, 2024 · 2 comments
Labels
kind/bug Categorizes issue or PR as related to a bug.

Comments

@niuyueyang1996
Copy link
Contributor

What happened:

when is use
https://xxxxxx:7559/apis/search.karmada.io/v1alpha1/proxying/karmada/proxy/api/v1/pods?timeoutSeconds=10000&watch=true
watcher will terminal

we have 100000 pods

i see kubernetes/kubernetes#13969

this param will return all pods event

(c *cacheWatcher) processInterval will exec process func when initEvents send to result success,but is took 5s-6s.
if process func not be exec,watcher input chan will not have comsumer.

then watcher will be blockedWatcher,when etcd have changed
the func watcher.add(event, timer) will timeout and kill my watcher.

What you expected to happen:

How to reproduce it (as minimally and precisely as possible):

Anything else we need to know?:

Environment:

  • Karmada version:
  • kubectl-karmada or karmadactl version (the result of kubectl-karmada version or karmadactl version):
  • Others:
@niuyueyang1996 niuyueyang1996 added the kind/bug Categorizes issue or PR as related to a bug. label Dec 13, 2024
@XiShanYongYe-Chang
Copy link
Member

Hi @niuyueyang1996 Is the watch request terminated early?

Have you understood why the problem occurred in your analysis? I'm sorry I didn't understand it.

@niuyueyang1996
Copy link
Contributor Author

niuyueyang1996 commented Dec 13, 2024

when i watch some resource without resourceVersion.

k8s.io/apiserver/pkg/storage/cacher/cacher.go
in k8s 1.30
karmada go.mod
k8s.io/apiserver v0.30.2

will send all event to watcher


 (c *Cacher) Watch(ctx context.Context, key string, opts storage.ListOptions) (watch.Interface, error){
***
*

//this func
	go watcher.processInterval(ctx, cacheInterval, requiredResourceVersion)

}




func (c *cacheWatcher) processInterval(ctx context.Context, cacheInterval *watchCacheInterval, resourceVersion uint64) {
	defer utilruntime.HandleCrash()
	defer close(c.result)
	defer c.Stop()

	// Check how long we are processing initEvents.
	// As long as these are not processed, we are not processing
	// any incoming events, so if it takes long, we may actually
	// block all watchers for some time.
	// TODO: From the logs it seems that there happens processing
	// times even up to 1s which is very long. However, this doesn't
	// depend that much on the number of initEvents. E.g. from the
	// 2000-node Kubemark run we have logs like this, e.g.:
	// ... processing 13862 initEvents took 66.808689ms
	// ... processing 14040 initEvents took 993.532539ms
	// We should understand what is blocking us in those cases (e.g.
	// is it lack of CPU, network, or sth else) and potentially
	// consider increase size of result buffer in those cases.
	const initProcessThreshold = 500 * time.Millisecond
	startTime := time.Now()

	initEventCount := 0
	for {
		event, err := cacheInterval.Next()
		if err != nil {
			// An error indicates that the cache interval
			// has been invalidated and can no longer serve
			// events.
			//
			// Initially we considered sending an "out-of-history"
			// Error event in this case, but because historically
			// such events weren't sent out of the watchCache, we
			// decided not to. This is still ok, because on watch
			// closure, the watcher will try to re-instantiate the
			// watch and then will get an explicit "out-of-history"
			// window. There is potential for optimization, but for
			// now, in order to be on the safe side and not break
			// custom clients, the cost of it is something that we
			// are fully accepting.
			klog.Warningf("couldn't retrieve watch event to serve: %#v", err)
			return
		}
		if event == nil {
			break
		}
		c.sendWatchCacheEvent(event)

		// With some events already sent, update resourceVersion so that
		// events that were buffered and not yet processed won't be delivered
		// to this watcher second time causing going back in time.
		//
		// There is one case where events are not necessary ordered by
		// resourceVersion, being a case of watching from resourceVersion=0,
		// which at the beginning returns the state of each objects.
		// For the purpose of it, we need to max it with the resource version
		// that we have so far.
		if event.ResourceVersion > resourceVersion {
			resourceVersion = event.ResourceVersion
		}
		initEventCount++
	}

	if initEventCount > 0 {
		metrics.InitCounter.WithLabelValues(c.groupResource.String()).Add(float64(initEventCount))
	}
	processingTime := time.Since(startTime)
	if processingTime > initProcessThreshold {
		klog.V(2).Infof("processing %d initEvents of %s (%s) took %v", initEventCount, c.groupResource, c.identifier, processingTime)
	}



	c.process(ctx, resourceVersion)
}

c.process will consumer c.input.
if initEventCount not yet completed,c.process will not start, watcher input chan will not have consumer.

this func will send event to watcher from child cluster event.then the watcher input only have producer,will blocked.
(c *Cacher) dispatchEvent(event *watchCacheEvent)

startTime := time.Now()
			timeout := c.dispatchTimeoutBudget.takeAvailable()
			c.timer.Reset(timeout)

			// Send event to all blocked watchers. As long as timer is running,
			// `add` will wait for the watcher to unblock. After timeout,
			// `add` will not wait, but immediately close a still blocked watcher.
			// Hence, every watcher gets the chance to unblock itself while timer
			// is running, not only the first ones in the list.
			timer := c.timer
			for _, watcher := range c.blockedWatchers {
				if !watcher.add(event, timer) {
					// fired, clean the timer by set it to nil.
					timer = nil
				}
			}

watcher.add will have 100ms timer.
if timeout,will close the watcher.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
kind/bug Categorizes issue or PR as related to a bug.
Projects
Status: No status
Development

No branches or pull requests

2 participants