Skip to content

Commit

Permalink
[FLINK-36938][Runtime] Provide hooks before and after the watermark p…
Browse files Browse the repository at this point in the history
…rocessing (#25824)
  • Loading branch information
Zakelly authored Dec 20, 2024
1 parent 1523f2c commit 7fa4f78
Show file tree
Hide file tree
Showing 5 changed files with 335 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ public RecordContext<K> buildContext(Object record, K key, boolean inheritEpoch)
key,
this::disposeContext,
KeyGroupRangeAssignment.assignToKeyGroup(key, maxParallelism),
inheritEpoch
inheritEpoch && currentContext != null
? epochManager.onEpoch(currentContext.getEpoch())
: epochManager.onRecord());
}
Expand All @@ -225,7 +225,7 @@ public RecordContext<K> buildContext(Object record, K key, boolean inheritEpoch)
key,
this::disposeContext,
KeyGroupRangeAssignment.assignToKeyGroup(key, maxParallelism),
inheritEpoch
inheritEpoch && currentContext != null
? epochManager.onEpoch(currentContext.getEpoch())
: epochManager.onRecord());
}
Expand Down Expand Up @@ -456,7 +456,11 @@ public void processNonRecord(
? null
: () -> {
try {
// We clear the current context since this is a non-record context.
RecordContext<K> previousContext = currentContext;
currentContext = null;
triggerAction.run();
currentContext = previousContext;
} catch (Exception e) {
exceptionHandler.handleException(
"Failed to process non-record.", e);
Expand All @@ -466,7 +470,10 @@ public void processNonRecord(
? null
: () -> {
try {
RecordContext<K> previousContext = currentContext;
currentContext = null;
finalAction.run();
currentContext = previousContext;
} catch (Exception e) {
exceptionHandler.handleException(
"Failed to process non-record.", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -327,16 +327,65 @@ protected void reportOrForwardLatencyMarker(LatencyMarker marker) {
// Watermark handling
// ------------------------------------------------------------------------

/**
* A hook that will be triggered when receiving a watermark. Some async state can safely go
* within this method. Return the watermark that should be normally processed.
*
* @param watermark the receiving watermark.
* @return the watermark that should be processed. Null if there is no need for following
* processing.
*/
public Watermark preProcessWatermark(Watermark watermark) throws Exception {
return watermark;
}

/**
* A hook that will be invoked after finishing advancing the watermark. It is not recommended to
* perform async state here. Only some synchronous logic is suggested.
*
* @param watermark the advanced watermark.
*/
public void postProcessWatermark(Watermark watermark) throws Exception {}

/**
* Process a watermark when receiving it. Do not override this method since the async processing
* is difficult to write. Please override the hooks, see {@link #preProcessWatermark(Watermark)}
* and {@link #postProcessWatermark(Watermark)}. The basic logic of processWatermark with hooks
* in sync form would be:
*
* <pre>
* Watermark watermark = preProcessWatermark(mark);
* if (watermark != null) {
* super.processWatermark(watermark);
* postProcessWatermark(watermark);
* }
* </pre>
*/
@Override
public void processWatermark(Watermark mark) throws Exception {
public final void processWatermark(Watermark mark) throws Exception {
if (!isAsyncStateProcessingEnabled()) {
// If async state processing is disabled, fallback to the super class.
super.processWatermark(mark);
Watermark watermark = preProcessWatermark(mark);
if (watermark != null) {
super.processWatermark(watermark);
postProcessWatermark(watermark);
}
return;
}
AtomicReference<Watermark> watermarkRef = new AtomicReference<>(null);
asyncExecutionController.processNonRecord(
timeServiceManager == null ? null : () -> timeServiceManager.advanceWatermark(mark),
() -> output.emitWatermark(mark));
() -> {
watermarkRef.set(preProcessWatermark(mark));
if (timeServiceManager != null && watermarkRef.get() != null) {
timeServiceManager.advanceWatermark(watermarkRef.get());
}
},
() -> {
if (watermarkRef.get() != null) {
output.emitWatermark(watermarkRef.get());
postProcessWatermark(watermarkRef.get());
}
});
}

@Override
Expand Down Expand Up @@ -364,15 +413,18 @@ protected void processWatermarkStatus(WatermarkStatus watermarkStatus, int index
wasIdle.set(combinedWatermark.isIdle());
// index is 0-based
if (combinedWatermark.updateStatus(index, watermarkStatus.isIdle())) {
watermarkRef.set(new Watermark(combinedWatermark.getCombinedWatermark()));
if (timeServiceManager != null) {
watermarkRef.set(
preProcessWatermark(
new Watermark(combinedWatermark.getCombinedWatermark())));
if (timeServiceManager != null && watermarkRef.get() != null) {
timeServiceManager.advanceWatermark(watermarkRef.get());
}
}
},
() -> {
if (watermarkRef.get() != null) {
output.emitWatermark(watermarkRef.get());
postProcessWatermark(watermarkRef.get());
}
if (wasIdle.get() != combinedWatermark.isIdle()) {
output.emitWatermarkStatus(watermarkStatus);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -260,15 +260,66 @@ protected void reportOrForwardLatencyMarker(LatencyMarker marker) {
// ------------------------------------------------------------------------
// Watermark handling
// ------------------------------------------------------------------------

/**
* A hook that will be triggered when receiving a watermark. Some async state can safely go
* within this method. Return the watermark that should be normally processed.
*
* @param watermark the receiving watermark.
* @return the watermark that should be processed. Null if there is no need for following
* processing.
*/
public Watermark preProcessWatermark(Watermark watermark) throws Exception {
return watermark;
}

/**
* A hook that will be invoked after finishing advancing the watermark. It is not recommended to
* perform async state here. Only some synchronous logic is suggested.
*
* @param watermark the advanced watermark.
*/
public void postProcessWatermark(Watermark watermark) throws Exception {}

/**
* Process a watermark when receiving it. Do not override this method since the async processing
* is difficult to write. Please override the hooks, see {@link #preProcessWatermark(Watermark)}
* and {@link #postProcessWatermark(Watermark)}. The basic logic of processWatermark with hooks
* in sync form would be:
*
* <pre>
* Watermark watermark = preProcessWatermark(mark);
* if (watermark != null) {
* super.processWatermark(watermark);
* postProcessWatermark(watermark);
* }
* </pre>
*/
@Override
public void processWatermark(Watermark mark) throws Exception {
public final void processWatermark(Watermark mark) throws Exception {
if (!isAsyncStateProcessingEnabled()) {
super.processWatermark(mark);
// If async state processing is disabled, fallback to the super class.
Watermark watermark = preProcessWatermark(mark);
if (watermark != null) {
super.processWatermark(watermark);
postProcessWatermark(watermark);
}
return;
}
AtomicReference<Watermark> watermarkRef = new AtomicReference<>(null);
asyncExecutionController.processNonRecord(
timeServiceManager == null ? null : () -> timeServiceManager.advanceWatermark(mark),
() -> output.emitWatermark(mark));
() -> {
watermarkRef.set(preProcessWatermark(mark));
if (timeServiceManager != null && watermarkRef.get() != null) {
timeServiceManager.advanceWatermark(watermarkRef.get());
}
},
() -> {
if (watermarkRef.get() != null) {
output.emitWatermark(watermarkRef.get());
postProcessWatermark(watermarkRef.get());
}
});
}

@Override
Expand All @@ -284,8 +335,10 @@ public void processWatermarkStatus(WatermarkStatus watermarkStatus, int inputId)
() -> {
wasIdle.set(combinedWatermark.isIdle());
if (combinedWatermark.updateStatus(inputId - 1, watermarkStatus.isIdle())) {
watermarkRef.set(new Watermark(combinedWatermark.getCombinedWatermark()));
if (timeServiceManager != null) {
watermarkRef.set(
preProcessWatermark(
new Watermark(combinedWatermark.getCombinedWatermark())));
if (timeServiceManager != null && watermarkRef.get() != null) {
timeServiceManager.advanceWatermark(watermarkRef.get());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.apache.flink.streaming.util.TestHarnessUtil;
import org.apache.flink.streaming.util.asyncprocessing.AsyncKeyedOneInputStreamOperatorTestHarness;
import org.apache.flink.streaming.util.asyncprocessing.AsyncKeyedTwoInputStreamOperatorTestHarness;
import org.apache.flink.util.function.FunctionWithException;
import org.apache.flink.util.function.ThrowingConsumer;

import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -282,6 +283,68 @@ void testWatermark() throws Exception {
}
}

@Test
void testWatermarkHooks() throws Exception {
final WatermarkTestingOperator testOperator = new WatermarkTestingOperator();

AtomicInteger counter = new AtomicInteger(0);
testOperator.setPreProcessFunction(
(watermark) -> {
testOperator.asyncProcessWithKey(
1L,
() -> {
testOperator.output(watermark.getTimestamp() + 1000L);
});
if (counter.incrementAndGet() % 2 == 0) {
return null;
} else {
return new Watermark(watermark.getTimestamp() + 1L);
}
});

testOperator.setPostProcessFunction(
(watermark) -> {
testOperator.output(watermark.getTimestamp() + 100L);
});

ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
KeySelector<Long, Integer> dummyKeySelector = l -> 0;
try (AsyncKeyedTwoInputStreamOperatorTestHarness<Integer, Long, Long, Long> testHarness =
AsyncKeyedTwoInputStreamOperatorTestHarness.create(
testOperator,
dummyKeySelector,
dummyKeySelector,
BasicTypeInfo.INT_TYPE_INFO,
1,
1,
0)) {
testHarness.setup();
testHarness.open();
testHarness.processElement1(1L, 1L);
testHarness.processElement1(3L, 3L);
testHarness.processElement1(4L, 4L);
testHarness.processWatermark1(new Watermark(2L));
testHarness.processWatermark2(new Watermark(2L));
expectedOutput.add(new StreamRecord<>(1002L));
expectedOutput.add(new StreamRecord<>(1L));
expectedOutput.add(new StreamRecord<>(3L));
expectedOutput.add(new Watermark(3L));
expectedOutput.add(new StreamRecord<>(103L));
testHarness.processWatermark1(new Watermark(4L));
testHarness.processWatermark2(new Watermark(4L));
expectedOutput.add(new StreamRecord<>(1004L));
testHarness.processWatermark1(new Watermark(5L));
testHarness.processWatermark2(new Watermark(5L));
expectedOutput.add(new StreamRecord<>(1005L));
expectedOutput.add(new StreamRecord<>(4L));
expectedOutput.add(new Watermark(6L));
expectedOutput.add(new StreamRecord<>(106L));

TestHarnessUtil.assertOutputEquals(
"Output was not correct", expectedOutput, testHarness.getOutput());
}
}

@Test
void testWatermarkStatus() throws Exception {
try (AsyncKeyedOneInputStreamOperatorTestHarness<Integer, Tuple2<Integer, String>, String>
Expand Down Expand Up @@ -498,6 +561,24 @@ private static class WatermarkTestingOperator extends AbstractAsyncStateStreamOp

private transient InternalTimerService<VoidNamespace> timerService;

private FunctionWithException<Watermark, Watermark, Exception> preProcessFunction;

private ThrowingConsumer<Watermark, Exception> postProcessFunction;

public void setPreProcessFunction(
FunctionWithException<Watermark, Watermark, Exception> preProcessFunction) {
this.preProcessFunction = preProcessFunction;
}

public void setPostProcessFunction(
ThrowingConsumer<Watermark, Exception> postProcessFunction) {
this.postProcessFunction = postProcessFunction;
}

public void output(Long o) {
output.collect(new StreamRecord<>(o));
}

@Override
public void open() throws Exception {
super.open();
Expand All @@ -506,6 +587,18 @@ public void open() throws Exception {
getInternalTimerService("test-timers", VoidNamespaceSerializer.INSTANCE, this);
}

@Override
public Watermark preProcessWatermark(Watermark watermark) throws Exception {
return preProcessFunction == null ? watermark : preProcessFunction.apply(watermark);
}

@Override
public void postProcessWatermark(Watermark watermark) throws Exception {
if (postProcessFunction != null) {
postProcessFunction.accept(watermark);
}
}

@Override
public void onEventTime(InternalTimer<Integer, VoidNamespace> timer) throws Exception {
output.collect(new StreamRecord<>(timer.getTimestamp()));
Expand Down
Loading

0 comments on commit 7fa4f78

Please sign in to comment.