flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From trohrm...@apache.org
Subject [1/2] flink git commit: [FLINK-5033] [cep] Advance time with incoming watermarks at CEP operator
Date Thu, 10 Nov 2016 13:21:16 GMT
Repository: flink
Updated Branches:
  refs/heads/master 616c4f5e4 -> 6516938b9


[FLINK-5033] [cep] Advance time with incoming watermarks at CEP operator

Before the time was only advanced if the CEP had some events buffered. If the priority queue
was empty, then an incoming watermark did not advance the time. This led to missing timeouts
and pruning possibilities. The PR fixes this problem.

This closes #2771.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/029fda24
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/029fda24
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/029fda24

Branch: refs/heads/master
Commit: 029fda24583c6655cd97d02773b5ca628cf9e8f1
Parents: 616c4f5
Author: Till Rohrmann <trohrmann@apache.org>
Authored: Tue Nov 8 15:14:36 2016 +0100
Committer: Till Rohrmann <trohrmann@apache.org>
Committed: Thu Nov 10 14:18:01 2016 +0100

----------------------------------------------------------------------
 .../main/java/org/apache/flink/cep/nfa/NFA.java |  10 +-
 .../AbstractCEPBasePatternOperator.java         |   9 ++
 .../operator/AbstractCEPPatternOperator.java    |  12 +-
 .../AbstractKeyedCEPPatternOperator.java        |  11 +-
 .../flink/cep/operator/CEPPatternOperator.java  |  22 ++-
 .../cep/operator/KeyedCEPPatternOperator.java   |  22 ++-
 .../cep/operator/TimeoutCEPPatternOperator.java |  35 ++--
 .../TimeoutKeyedCEPPatternOperator.java         |  39 +++--
 .../flink/cep/operator/CEPOperatorTest.java     | 161 ++++++++++++++++++-
 .../util/AbstractStreamOperatorTestHarness.java |  20 ++-
 10 files changed, 296 insertions(+), 45 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/029fda24/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
index 5ac638e..aefddb2 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
@@ -123,7 +123,7 @@ public class NFA<T> implements Serializable {
 	 * resulting event sequences are returned. If computations time out and timeout handling
is
 	 * activated, then the timed out event patterns are returned.
 	 *
-	 * @param event The current event to be processed
+	 * @param event The current event to be processed or null if only pruning shall be done
 	 * @param timestamp The timestamp of the current event
 	 * @return Tuple of the collection of matched patterns (e.g. the result of computations
which have
 	 * reached a final state) and the collection of timed out patterns (if timeout handling
is
@@ -141,7 +141,7 @@ public class NFA<T> implements Serializable {
 			final Collection<ComputationState<T>> newComputationStates;
 
 			if (!computationState.isStartState() &&
-				windowTime > 0 &&
+				windowTime > 0L &&
 				timestamp - computationState.getStartTimestamp() >= windowTime) {
 
 				if (handleTimeout) {
@@ -158,8 +158,10 @@ public class NFA<T> implements Serializable {
 				sharedBuffer.remove(computationState.getState(), computationState.getEvent(), computationState.getTimestamp());
 
 				newComputationStates = Collections.emptyList();
-			} else {
+			} else if (event != null) {
 				newComputationStates = computeNextStates(computationState, event, timestamp);
+			} else {
+				newComputationStates = Collections.singleton(computationState);
 			}
 
 			for (ComputationState<T> newComputationState: newComputationStates) {
@@ -179,7 +181,7 @@ public class NFA<T> implements Serializable {
 		}
 
 		// prune shared buffer based on window length
-		if(windowTime > 0) {
+		if(windowTime > 0L) {
 			long pruningTimestamp = timestamp - windowTime;
 
 			// sanity check to guard against underflows

http://git-wip-us.apache.org/repos/asf/flink/blob/029fda24/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractCEPBasePatternOperator.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractCEPBasePatternOperator.java
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractCEPBasePatternOperator.java
index 2f21346..a3497a6 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractCEPBasePatternOperator.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractCEPBasePatternOperator.java
@@ -96,4 +96,13 @@ public abstract class AbstractCEPBasePatternOperator<IN, OUT>
 	 * @param timestamp The timestamp of the event
 	 */
 	protected abstract void processEvent(NFA<IN> nfa, IN event, long timestamp);
+
+	/**
+	 * Advances the time for the given NFA to the given timestamp. This can lead to pruning
and
+	 * timeouts.
+	 *
+	 * @param nfa to advance the time for
+	 * @param timestamp to advance the time to
+	 */
+	protected abstract void advanceTime(NFA<IN> nfa, long timestamp);
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/029fda24/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractCEPPatternOperator.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractCEPPatternOperator.java
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractCEPPatternOperator.java
index 1c494ef..fe9aced 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractCEPPatternOperator.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractCEPPatternOperator.java
@@ -97,10 +97,14 @@ abstract public class AbstractCEPPatternOperator<IN, OUT> extends
AbstractCEPBas
 		// we do our own watermark handling, no super call. we will never be able to use
 		// the timer service like this, however.
 
-		while(!priorityQueue.isEmpty() && priorityQueue.peek().getTimestamp() <= mark.getTimestamp())
{
-			StreamRecord<IN> streamRecord = priorityQueue.poll();
-
-			processEvent(nfa, streamRecord.getValue(), streamRecord.getTimestamp());
+		if (priorityQueue.isEmpty()) {
+			advanceTime(nfa, mark.getTimestamp());
+		} else {
+			while (!priorityQueue.isEmpty() && priorityQueue.peek().getTimestamp() <= mark.getTimestamp())
{
+				StreamRecord<IN> streamRecord = priorityQueue.poll();
+
+				processEvent(nfa, streamRecord.getValue(), streamRecord.getTimestamp());
+			}
 		}
 
 		output.emitWatermark(mark);

http://git-wip-us.apache.org/repos/asf/flink/blob/029fda24/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
index 7541d8f..b5601ef 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
@@ -176,11 +176,16 @@ abstract public class AbstractKeyedCEPPatternOperator<IN, KEY, OUT>
extends Abst
 			PriorityQueue<StreamRecord<IN>> priorityQueue = getPriorityQueue();
 			NFA<IN> nfa = getNFA();
 
-			while (!priorityQueue.isEmpty() && priorityQueue.peek().getTimestamp() <= mark.getTimestamp())
{
-				StreamRecord<IN> streamRecord = priorityQueue.poll();
+			if (priorityQueue.isEmpty()) {
+					advanceTime(nfa, mark.getTimestamp());
+			} else {
+				while (!priorityQueue.isEmpty() && priorityQueue.peek().getTimestamp() <=
mark.getTimestamp()) {
+					StreamRecord<IN> streamRecord = priorityQueue.poll();
 
-				processEvent(nfa, streamRecord.getValue(), streamRecord.getTimestamp());
+					processEvent(nfa, streamRecord.getValue(), streamRecord.getTimestamp());
+				}
 			}
+
 			updateNFA(nfa);
 			updatePriorityQueue(priorityQueue);
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/029fda24/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CEPPatternOperator.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CEPPatternOperator.java
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CEPPatternOperator.java
index 561697d..57f27c2 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CEPPatternOperator.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CEPPatternOperator.java
@@ -25,6 +25,7 @@ import org.apache.flink.cep.nfa.compiler.NFACompiler;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 
 import java.util.Collection;
+import java.util.Iterator;
 import java.util.Map;
 
 /**
@@ -48,15 +49,28 @@ public class CEPPatternOperator<IN> extends AbstractCEPPatternOperator<IN,
Map<S
 
 		Collection<Map<String, IN>> matchedPatterns = patterns.f0;
 
-		if (!matchedPatterns.isEmpty()) {
+		emitMatchedSequences(matchedPatterns, timestamp);
+	}
+
+	@Override
+	protected void advanceTime(NFA<IN> nfa, long timestamp) {
+		Tuple2<Collection<Map<String, IN>>, Collection<Tuple2<Map<String,
IN>, Long>>> patterns = nfa.process(null, timestamp);
+
+		emitMatchedSequences(patterns.f0, timestamp);
+	}
+
+	private void emitMatchedSequences(Iterable<Map<String, IN>> matchedSequences,
long timestamp) {
+		Iterator<Map<String, IN>> iterator = matchedSequences.iterator();
+
+		if (iterator.hasNext()) {
 			StreamRecord<Map<String, IN>> streamRecord = new StreamRecord<Map<String,
IN>>(
 				null,
 				timestamp);
 
-			for (Map<String, IN> pattern: matchedPatterns) {
-				streamRecord.replace(pattern);
+			do {
+				streamRecord.replace(iterator.next());
 				output.collect(streamRecord);
-			}
+			} while (iterator.hasNext());
 		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/029fda24/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/KeyedCEPPatternOperator.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/KeyedCEPPatternOperator.java
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/KeyedCEPPatternOperator.java
index 62d82d9..4d8a907 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/KeyedCEPPatternOperator.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/KeyedCEPPatternOperator.java
@@ -26,6 +26,7 @@ import org.apache.flink.cep.nfa.compiler.NFACompiler;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 
 import java.util.Collection;
+import java.util.Iterator;
 import java.util.Map;
 
 /**
@@ -51,15 +52,28 @@ public class KeyedCEPPatternOperator<IN, KEY> extends AbstractKeyedCEPPatternOpe
 
 		Collection<Map<String, IN>> matchedPatterns = patterns.f0;
 
-		if (!matchedPatterns.isEmpty()) {
+		emitMatchedSequences(matchedPatterns, timestamp);
+	}
+
+	@Override
+	protected void advanceTime(NFA<IN> nfa, long timestamp) {
+		Tuple2<Collection<Map<String, IN>>, Collection<Tuple2<Map<String,
IN>, Long>>> patterns = nfa.process(null, timestamp);
+
+		emitMatchedSequences(patterns.f0, timestamp);
+	}
+
+	private void emitMatchedSequences(Iterable<Map<String, IN>> matchedSequences,
long timestamp) {
+		Iterator<Map<String, IN>> iterator = matchedSequences.iterator();
+
+		if (iterator.hasNext()) {
 			StreamRecord<Map<String, IN>> streamRecord = new StreamRecord<Map<String,
IN>>(
 				null,
 				timestamp);
 
-			for (Map<String, IN> pattern: matchedPatterns) {
-				streamRecord.replace(pattern);
+			do {
+				streamRecord.replace(iterator.next());
 				output.collect(streamRecord);
-			}
+			} while (iterator.hasNext());
 		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/029fda24/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/TimeoutCEPPatternOperator.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/TimeoutCEPPatternOperator.java
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/TimeoutCEPPatternOperator.java
index 9b0c951..9a04468 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/TimeoutCEPPatternOperator.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/TimeoutCEPPatternOperator.java
@@ -58,22 +58,37 @@ public class TimeoutCEPPatternOperator<IN> extends AbstractCEPPatternOperator<IN
 		Collection<Map<String, IN>> matchedPatterns = patterns.f0;
 		Collection<Tuple2<Map<String, IN>, Long>> partialPatterns = patterns.f1;
 
+		emitMatchedSequences(matchedPatterns, timestamp);
+		emitTimedOutSequences(partialPatterns, timestamp);
+	}
+
+	@Override
+	protected void advanceTime(NFA<IN> nfa, long timestamp) {
+		Tuple2<Collection<Map<String, IN>>, Collection<Tuple2<Map<String,
IN>, Long>>> patterns = nfa.process(null, timestamp);
+
+		emitMatchedSequences(patterns.f0, timestamp);
+		emitTimedOutSequences(patterns.f1, timestamp);
+	}
+
+	private void emitTimedOutSequences(Iterable<Tuple2<Map<String, IN>, Long>>
timedOutSequences, long timestamp) {
 		StreamRecord<Either<Tuple2<Map<String, IN>, Long>, Map<String, IN>>>
streamRecord = new StreamRecord<Either<Tuple2<Map<String, IN>, Long>, Map<String,
IN>>>(
 			null,
 			timestamp);
 
-		if (!matchedPatterns.isEmpty()) {
-			for (Map<String, IN> matchedPattern : matchedPatterns) {
-				streamRecord.replace(Either.Right(matchedPattern));
-				output.collect(streamRecord);
-			}
+		for (Tuple2<Map<String, IN>, Long> partialPattern: timedOutSequences) {
+			streamRecord.replace(Either.Left(partialPattern));
+			output.collect(streamRecord);
 		}
+	}
+
+	protected void emitMatchedSequences(Iterable<Map<String, IN>> matchedSequences,
long timestamp) {
+		StreamRecord<Either<Tuple2<Map<String, IN>, Long>, Map<String, IN>>>
streamRecord = new StreamRecord<Either<Tuple2<Map<String, IN>, Long>, Map<String,
IN>>>(
+			null,
+			timestamp);
 
-		if (!partialPatterns.isEmpty()) {
-			for (Tuple2<Map<String, IN>, Long> partialPattern: partialPatterns) {
-				streamRecord.replace(Either.Left(partialPattern));
-				output.collect(streamRecord);
-			}
+		for (Map<String, IN> matchedPattern : matchedSequences) {
+			streamRecord.replace(Either.Right(matchedPattern));
+			output.collect(streamRecord);
 		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/029fda24/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/TimeoutKeyedCEPPatternOperator.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/TimeoutKeyedCEPPatternOperator.java
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/TimeoutKeyedCEPPatternOperator.java
index 23b1a91..4d33435 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/TimeoutKeyedCEPPatternOperator.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/TimeoutKeyedCEPPatternOperator.java
@@ -48,25 +48,40 @@ public class TimeoutKeyedCEPPatternOperator<IN, KEY> extends AbstractKeyedCEPPat
 			event,
 			timestamp);
 
-		Collection<Map<String, IN>> matchedPatterns = patterns.f0;
-		Collection<Tuple2<Map<String, IN>, Long>> partialPatterns = patterns.f1;
+		Collection<Map<String, IN>> matchedSequences = patterns.f0;
+		Collection<Tuple2<Map<String, IN>, Long>> timedOutSequences = patterns.f1;
 
+		emitMatchedSequences(matchedSequences, timestamp);
+		emitTimedOutSequences(timedOutSequences, timestamp);
+	}
+
+	@Override
+	protected void advanceTime(NFA<IN> nfa, long timestamp) {
+		Tuple2<Collection<Map<String, IN>>, Collection<Tuple2<Map<String,
IN>, Long>>> patterns = nfa.process(null, timestamp);
+
+		emitMatchedSequences(patterns.f0, timestamp);
+		emitTimedOutSequences(patterns.f1, timestamp);
+	}
+
+	private void emitTimedOutSequences(Iterable<Tuple2<Map<String, IN>, Long>>
timedOutSequences, long timestamp) {
 		StreamRecord<Either<Tuple2<Map<String, IN>, Long>, Map<String, IN>>>
streamRecord = new StreamRecord<Either<Tuple2<Map<String, IN>, Long>, Map<String,
IN>>>(
 			null,
 			timestamp);
 
-		if (!matchedPatterns.isEmpty()) {
-			for (Map<String, IN> matchedPattern : matchedPatterns) {
-				streamRecord.replace(Either.Right(matchedPattern));
-				output.collect(streamRecord);
-			}
+		for (Tuple2<Map<String, IN>, Long> partialPattern: timedOutSequences) {
+			streamRecord.replace(Either.Left(partialPattern));
+			output.collect(streamRecord);
 		}
+	}
+
+	protected void emitMatchedSequences(Iterable<Map<String, IN>> matchedSequences,
long timestamp) {
+		StreamRecord<Either<Tuple2<Map<String, IN>, Long>, Map<String, IN>>>
streamRecord = new StreamRecord<Either<Tuple2<Map<String, IN>, Long>, Map<String,
IN>>>(
+			null,
+			timestamp);
 
-		if (!partialPatterns.isEmpty()) {
-			for (Tuple2<Map<String, IN>, Long> partialPattern: partialPatterns) {
-				streamRecord.replace(Either.Left(partialPattern));
-				output.collect(streamRecord);
-			}
+		for (Map<String, IN> matchedPattern : matchedSequences) {
+			streamRecord.replace(Either.Right(matchedPattern));
+			output.collect(streamRecord);
 		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/029fda24/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
index 6cffd9c..db17f6d 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
@@ -18,10 +18,13 @@
 
 package org.apache.flink.cep.operator;
 
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.FilterFunction;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeutils.base.IntSerializer;
 import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
 import org.apache.flink.cep.Event;
 import org.apache.flink.cep.SubEvent;
 import org.apache.flink.cep.nfa.NFA;
@@ -35,6 +38,7 @@ import org.apache.flink.streaming.api.windowing.time.Time;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.apache.flink.types.Either;
 import org.apache.flink.util.TestLogger;
 import org.junit.Rule;
 import org.junit.Test;
@@ -42,7 +46,9 @@ import org.junit.rules.TemporaryFolder;
 
 import static org.junit.Assert.*;
 
+import java.util.HashMap;
 import java.util.Map;
+import java.util.Queue;
 import java.util.concurrent.ConcurrentLinkedQueue;
 
 public class CEPOperatorTest extends TestLogger {
@@ -411,10 +417,161 @@ public class CEPOperatorTest extends TestLogger {
 		harness.close();
 	}
 
+	/**
+	 * Tests that the internal time of a CEP operator advances only given watermarks. See FLINK-5033
+	 */
+	@Test
+	public void testKeyedAdvancingTimeWithoutElements() throws Exception {
+		final KeySelector<Event, Integer> keySelector = new KeySelector<Event, Integer>()
{
+			private static final long serialVersionUID = -4873366487571254798L;
+
+			@Override
+			public Integer getKey(Event value) throws Exception {
+				return value.getId();
+			}
+		};
+		final Event startEvent = new Event(42, "start", 1.0);
+		final long watermarkTimestamp1 = 5L;
+		final long watermarkTimestamp2 = 13L;
+
+		final Map<String, Event> expectedSequence = new HashMap<>(2);
+		expectedSequence.put("start", startEvent);
+
+		OneInputStreamOperatorTestHarness<Event, Either<Tuple2<Map<String, Event>,
Long>, Map<String, Event>>> harness = new KeyedOneInputStreamOperatorTestHarness<>(
+			new TimeoutKeyedCEPPatternOperator<>(
+				Event.createTypeSerializer(),
+				false,
+				keySelector,
+				IntSerializer.INSTANCE,
+				new NFAFactory(true)),
+			keySelector,
+			BasicTypeInfo.INT_TYPE_INFO);
+
+		try {
+			harness.setup(
+				new KryoSerializer<>(
+					(Class<Either<Tuple2<Map<String, Event>, Long>, Map<String, Event>>>)
(Object) Either.class,
+					new ExecutionConfig()));
+			harness.open();
+
+			harness.processElement(new StreamRecord<>(startEvent, 3L));
+			harness.processWatermark(new Watermark(watermarkTimestamp1));
+			harness.processWatermark(new Watermark(watermarkTimestamp2));
+
+			Queue<Object> result = harness.getOutput();
+
+			assertEquals(3, result.size());
+
+			Object watermark1 = result.poll();
+
+			assertTrue(watermark1 instanceof Watermark);
+
+			assertEquals(watermarkTimestamp1, ((Watermark) watermark1).getTimestamp());
+
+			Object resultObject = result.poll();
+
+			assertTrue(resultObject instanceof StreamRecord);
+
+			StreamRecord<Either<Tuple2<Map<String, Event>, Long>, Map<String,
Event>>> streamRecord = (StreamRecord<Either<Tuple2<Map<String,Event>,Long>,Map<String,Event>>>)
resultObject;
+
+			assertTrue(streamRecord.getValue() instanceof Either.Left);
+
+			Either.Left<Tuple2<Map<String, Event>, Long>, Map<String, Event>>
left = (Either.Left<Tuple2<Map<String, Event>, Long>, Map<String, Event>>)
streamRecord.getValue();
+
+			Tuple2<Map<String, Event>, Long> leftResult = left.left();
+
+			assertEquals(watermarkTimestamp2, (long) leftResult.f1);
+			assertEquals(expectedSequence, leftResult.f0);
+
+			Object watermark2 = result.poll();
+
+			assertTrue(watermark2 instanceof Watermark);
+
+			assertEquals(watermarkTimestamp2, ((Watermark) watermark2).getTimestamp());
+		} finally {
+			harness.close();
+		}
+	}
+
+	/**
+	 * Tests that the internal time of a CEP operator advances only given watermarks. See FLINK-5033
+	 */
+	@Test
+	public void testAdvancingTimeWithoutElements() throws Exception {
+		final Event startEvent = new Event(42, "start", 1.0);
+		final long watermarkTimestamp1 = 5L;
+		final long watermarkTimestamp2 = 13L;
+
+		final Map<String, Event> expectedSequence = new HashMap<>(2);
+		expectedSequence.put("start", startEvent);
+
+		OneInputStreamOperatorTestHarness<Event, Either<Tuple2<Map<String, Event>,
Long>, Map<String, Event>>> harness = new OneInputStreamOperatorTestHarness<>(
+			new TimeoutCEPPatternOperator<>(
+				Event.createTypeSerializer(),
+				false,
+				new NFAFactory(true))
+		);
+
+		try {
+			harness.setup(
+				new KryoSerializer<>(
+					(Class<Either<Tuple2<Map<String, Event>, Long>, Map<String, Event>>>)
(Object) Either.class,
+					new ExecutionConfig()));
+			harness.open();
+
+			harness.processElement(new StreamRecord<>(startEvent, 3L));
+			harness.processWatermark(new Watermark(watermarkTimestamp1));
+			harness.processWatermark(new Watermark(watermarkTimestamp2));
+
+			Queue<Object> result = harness.getOutput();
+
+			assertEquals(3, result.size());
+
+			Object watermark1 = result.poll();
+
+			assertTrue(watermark1 instanceof Watermark);
+
+			assertEquals(watermarkTimestamp1, ((Watermark) watermark1).getTimestamp());
+
+			Object resultObject = result.poll();
+
+			assertTrue(resultObject instanceof StreamRecord);
+
+			StreamRecord<Either<Tuple2<Map<String, Event>, Long>, Map<String,
Event>>> streamRecord = (StreamRecord<Either<Tuple2<Map<String,Event>,Long>,Map<String,Event>>>)
resultObject;
+
+			assertTrue(streamRecord.getValue() instanceof Either.Left);
+
+			Either.Left<Tuple2<Map<String, Event>, Long>, Map<String, Event>>
left = (Either.Left<Tuple2<Map<String, Event>, Long>, Map<String, Event>>)
streamRecord.getValue();
+
+			Tuple2<Map<String, Event>, Long> leftResult = left.left();
+
+			assertEquals(watermarkTimestamp2, (long) leftResult.f1);
+			assertEquals(expectedSequence, leftResult.f0);
+
+			Object watermark2 = result.poll();
+
+			assertTrue(watermark2 instanceof Watermark);
+
+			assertEquals(watermarkTimestamp2, ((Watermark) watermark2).getTimestamp());
+		} finally {
+			harness.close();
+		}
+	}
+
 	private static class NFAFactory implements NFACompiler.NFAFactory<Event> {
 
 		private static final long serialVersionUID = 1173020762472766713L;
 
+		private final boolean handleTimeout;
+
+		private NFAFactory() {
+			this(false);
+		}
+
+		private NFAFactory(boolean handleTimeout) {
+			this.handleTimeout = handleTimeout;
+		}
+
 		@Override
 		public NFA<Event> createNFA() {
 
@@ -444,9 +601,9 @@ public class CEPOperatorTest extends TestLogger {
 					})
 					// add a window timeout to test whether timestamps of elements in the
 					// priority queue in CEP operator are correctly checkpointed/restored
-					.within(Time.milliseconds(10));
+					.within(Time.milliseconds(10L));
 
-			return NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+			return NFACompiler.compile(pattern, Event.createTypeSerializer(), handleTimeout);
 		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/029fda24/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
index c923b17..23a31d5 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
@@ -232,8 +232,16 @@ public class AbstractStreamOperatorTestHarness<OUT> {
 	 * Calls
 	 * {@link StreamOperator#setup(StreamTask, StreamConfig, Output)} ()}
 	 */
-	public void setup() throws Exception {
-		operator.setup(mockTask, config, new MockOutput());
+	public void setup() {
+		setup(null);
+	}
+
+	/**
+	 * Calls
+	 * {@link StreamOperator#setup(StreamTask, StreamConfig, Output)} ()}
+	 */
+	public void setup(TypeSerializer<OUT> outputSerializer) {
+		operator.setup(mockTask, config, new MockOutput(outputSerializer));
 		setupCalled = true;
 	}
 
@@ -493,6 +501,14 @@ public class AbstractStreamOperatorTestHarness<OUT> {
 
 		private TypeSerializer<OUT> outputSerializer;
 
+		MockOutput() {
+			this(null);
+		}
+
+		MockOutput(TypeSerializer<OUT> outputSerializer) {
+			this.outputSerializer = outputSerializer;
+		}
+
 		@Override
 		public void emitWatermark(Watermark mark) {
 			outputList.add(mark);


Mime
View raw message