Skip to content

Commit

Permalink
update count for other add/remove methods
Browse files Browse the repository at this point in the history
  • Loading branch information
Scottmitch committed Oct 3, 2022
1 parent 37df396 commit 6429556
Showing 1 changed file with 89 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,7 @@ private static final class SpscBlockingQueue<T> implements BlockingQueue<T> {
* it without parking.
*/
private static final int POLL_YIELD_COUNT =
getInteger("io.servicetalk.concurrent.internal.blockingIterableYieldCount", 1);
getInteger("io.servicetalk.concurrent.internal.blockingIterableYieldCount", 2);
/**
* Amount of nanoseconds to spin on {@link Thread#yield()} before calling {@link LockSupport#parkNanos(long)}.
* {@link LockSupport#parkNanos(long)} can be expensive and if the producer is generating data it is likely
Expand All @@ -300,7 +300,7 @@ private static final class SpscBlockingQueue<T> implements BlockingQueue<T> {
@Override
public boolean add(final T t) {
if (spscQueue.add(t)) {
producerSignalConsumer();
producerSignalAdded();
return true;
}
return false;
Expand All @@ -309,20 +309,26 @@ public boolean add(final T t) {
@Override
public boolean offer(final T t) {
if (spscQueue.offer(t)) {
producerSignalConsumer();
producerSignalAdded();
return true;
}
return false;
}

@Override
public T remove() {
return spscQueue.remove();
final T t = spscQueue.remove();
producerSignalRemoved(1);
return t;
}

@Override
public T poll() {
return spscQueue.poll();
final T t = spscQueue.poll();
if (t != null) {
producerSignalRemoved(1);
}
return t;
}

@Override
Expand Down Expand Up @@ -367,7 +373,7 @@ public int remainingCapacity() {
@Override
public boolean remove(final Object o) {
if (spscQueue.remove(o)) {
producerSignalConsumer();
producerSignalRemoved(1);
return true;
}
return false;
Expand All @@ -380,35 +386,59 @@ public boolean containsAll(final Collection<?> c) {

@Override
public boolean addAll(final Collection<? extends T> c) {
if (spscQueue.addAll(c)) {
producerSignalConsumer();
return true;
boolean added = false;
for (T t : c) {
if (add(t)) {
added = true;
}
}
return false;
return added;
}

@Override
public boolean removeAll(final Collection<?> c) {
if (spscQueue.removeAll(c)) {
producerSignalConsumer();
return true;
int removed = 0;
try {
for (Object t : c) {
if (spscQueue.remove(t)) {
++removed;
}
}
} finally {
producerSignalRemoved(removed);
}
return false;
return removed > 0;
}

@Override
public boolean retainAll(final Collection<?> c) {
if (spscQueue.retainAll(c)) {
producerSignalConsumer();
return true;
}
return false;
throw new UnsupportedOperationException();
}

@Override
public void clear() {
spscQueue.clear();
producerSignalConsumer();

ThreadStamp nextStamp = null;
for (;;) {
final ThreadStamp currStamp = threadStamp;
if (currStamp == null || currStamp.thread == PRODUCED_THREAD) {
if (threadStampUpdater.compareAndSet(this, currStamp, null)) {
break;
}
} else {
if (nextStamp == null) {
nextStamp = new ThreadStamp(currStamp.thread);
} else {
nextStamp.thread = currStamp.thread;
nextStamp.count = 0;
}

if (threadStampUpdater.compareAndSet(this, currStamp, nextStamp)) {
break;
}
}
}
}

@Override
Expand Down Expand Up @@ -445,10 +475,14 @@ public <T1> T1[] toArray(final T1[] a) {
public int drainTo(final Collection<? super T> c) {
int i = 0;
T item;
while ((item = poll()) != null) {
if (c.add(item)) {
++i;
try {
while ((item = spscQueue.poll()) != null) {
if (c.add(item)) {
++i;
}
}
} finally {
producerSignalRemoved(i);
}
return i;
}
Expand All @@ -457,10 +491,14 @@ public int drainTo(final Collection<? super T> c) {
public int drainTo(final Collection<? super T> c, final int maxElements) {
int i = 0;
T item;
while (i < maxElements && (item = poll()) != null) {
if (c.add(item)) {
++i;
try {
while (i < maxElements && (item = spscQueue.poll()) != null) {
if (c.add(item)) {
++i;
}
}
} finally {
producerSignalRemoved(i);
}
return i;
}
Expand All @@ -480,7 +518,30 @@ public String toString() {
return spscQueue.toString();
}

private void producerSignalConsumer() {
private void producerSignalRemoved(int i) {
ThreadStamp nextStamp = null;
for (;;) {
final ThreadStamp currStamp = threadStamp;
if (currStamp == null || currStamp.count <= i) {
if (threadStampUpdater.compareAndSet(this, currStamp, null)) {
break;
}
} else {
if (nextStamp == null) {
nextStamp = new ThreadStamp(currStamp.thread, currStamp.count - i);
} else {
nextStamp.thread = currStamp.thread;
nextStamp.count = currStamp.count - i;
}

if (threadStampUpdater.compareAndSet(this, currStamp, nextStamp)) {
break;
}
}
}
}

private void producerSignalAdded() {
ThreadStamp nextStamp = null;
for (;;) {
final ThreadStamp currStamp = threadStamp;
Expand Down Expand Up @@ -554,7 +615,7 @@ private T pollAndParkIgnoreTime(@SuppressWarnings("unused") final long timeout,
// Benchmarks show that park/unpark is expensive when producer is the EventLoop thread and
// unpark has to wakeup a thread that is parked. Yield has been shown to lower this cost
// on the EventLoop thread and increase throughput in these scenarios.
if (pollCount++ <= POLL_YIELD_COUNT) {
if (++pollCount <= POLL_YIELD_COUNT) {
Thread.yield();
} else {
LockSupport.park();
Expand Down

0 comments on commit 6429556

Please sign in to comment.