flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [1/5] flink git commit: [FLINK-3178] Don't Emit In-Flight Windows When Closing Window Operator
Date Thu, 28 Jan 2016 15:32:16 GMT
Repository: flink
Updated Branches:
  refs/heads/master 2e2330737 -> 6b01a8902


[FLINK-3178] Don't Emit In-Flight Windows When Closing Window Operator

This closes #1542


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c4e5a55f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c4e5a55f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c4e5a55f

Branch: refs/heads/master
Commit: c4e5a55f027ed73ce557f10d5125a0b168832889
Parents: 2e23307
Author: Aljoscha Krettek <aljoscha.krettek@gmail.com>
Authored: Mon Jan 18 13:25:03 2016 +0100
Committer: Stephan Ewen <sewen@apache.org>
Committed: Thu Jan 28 14:19:17 2016 +0100

----------------------------------------------------------------------
 .../examples/windowing/SessionWindowing.java    |   2 +-
 .../util/TopSpeedWindowingExampleData.java      |   8 +-
 ...ractAlignedProcessingTimeWindowOperator.java |   5 -
 .../windowing/NonKeyedWindowOperator.java       |  14 +--
 .../operators/windowing/WindowOperator.java     |  17 ++-
 .../api/complex/ComplexIntegrationTest.java     |   3 +-
 ...AlignedProcessingTimeWindowOperatorTest.java | 110 ++-----------------
 ...AlignedProcessingTimeWindowOperatorTest.java |  89 ++++-----------
 .../util/OneInputStreamOperatorTestHarness.java |   3 +-
 9 files changed, 54 insertions(+), 197 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/c4e5a55f/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/SessionWindowing.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/SessionWindowing.java
b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/SessionWindowing.java
index 035727a..baa4af8 100644
--- a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/SessionWindowing.java
+++ b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/SessionWindowing.java
@@ -116,7 +116,7 @@ public class SessionWindowing {
 			// Update the last seen event time
 			lastSeenState.update(timestamp);
 
-			ctx.registerEventTimeTimer(lastSeen + sessionTimeout);
+			ctx.registerEventTimeTimer(timestamp + sessionTimeout);
 
 			if (timeSinceLastEvent > sessionTimeout) {
 				return TriggerResult.FIRE_AND_PURGE;

http://git-wip-us.apache.org/repos/asf/flink/blob/c4e5a55f/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/util/TopSpeedWindowingExampleData.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/util/TopSpeedWindowingExampleData.java
b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/util/TopSpeedWindowingExampleData.java
index bf63695..4718b8b 100644
--- a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/util/TopSpeedWindowingExampleData.java
+++ b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/util/TopSpeedWindowingExampleData.java
@@ -192,9 +192,7 @@ public class TopSpeedWindowingExampleData {
 					"(1,95,1973.6111111111115,1424952007664)\n" +
 					"(0,100,1709.7222222222229,1424952006663)\n" +
 					"(0,100,1737.5000000000007,1424952007664)\n" +
-					"(1,95,1973.6111111111115,1424952007664)\n" +
-					"(0,100,1791.6666666666674,1424952009664)\n" +
-					"(1,95,2211.1111111111118,1424952017668)\n";
+					"(1,95,1973.6111111111115,1424952007664)\n";
 
 	public static final String TOP_CASE_CLASS_SPEEDS =
 			"CarEvent(0,55,15.277777777777777,1424951918630)\n" +
@@ -267,9 +265,7 @@ public class TopSpeedWindowingExampleData {
 					"CarEvent(1,95,1973.6111111111115,1424952007664)\n" +
 					"CarEvent(0,100,1709.7222222222229,1424952006663)\n" +
 					"CarEvent(0,100,1737.5000000000007,1424952007664)\n" +
-					"CarEvent(1,95,1973.6111111111115,1424952007664)\n" +
-					"CarEvent(0,100,1791.6666666666674,1424952009664)\n" +
-					"CarEvent(1,95,2211.1111111111118,1424952017668)\n";
+					"CarEvent(1,95,1973.6111111111115,1424952007664)\n";
 
 	private TopSpeedWindowingExampleData() {
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/c4e5a55f/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractAlignedProcessingTimeWindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractAlignedProcessingTimeWindowOperator.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractAlignedProcessingTimeWindowOperator.java
index 677a7dd..b8c95aa 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractAlignedProcessingTimeWindowOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractAlignedProcessingTimeWindowOperator.java
@@ -171,13 +171,8 @@ public abstract class AbstractAlignedProcessingTimeWindowOperator<KEY,
IN, OUT,
 	public void close() throws Exception {
 		super.close();
 		
-		final long finalWindowTimestamp = nextEvaluationTime;
-
 		// early stop the triggering thread, so it does not attempt to return any more data
 		stopTriggers();
-
-		// emit the remaining data
-		computeWindow(finalWindowTimestamp);
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/c4e5a55f/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java
index 7823631..2afa1e7 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java
@@ -217,14 +217,14 @@ public class NonKeyedWindowOperator<IN, OUT, W extends Window>
 	}
 
 	@Override
-	public final void close() throws Exception {
-		super.close();
-		// emit the elements that we still keep
-		for (Context window: windows.values()) {
-			emitWindow(window);
-		}
+	public final void dispose() {
+		super.dispose();
 		windows.clear();
-		windowBufferFactory.close();
+		try {
+			windowBufferFactory.close();
+		} catch (Exception e) {
+			throw new RuntimeException("Error while closing WindowBufferFactory.", e);
+		}
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/c4e5a55f/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
index 68c3a5f..4a50efb 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
@@ -251,17 +251,14 @@ public class WindowOperator<K, IN, OUT, W extends Window>
 	}
 
 	@Override
-	public final void close() throws Exception {
-		super.close();
-		// emit the elements that we still keep
-		for (Map.Entry<K, Map<W, Context>> entry: windows.entrySet()) {
-			Map<W, Context> keyWindows = entry.getValue();
-			for (Context window: keyWindows.values()) {
-				emitWindow(window);
-			}
-		}
+	public final void dispose() {
+		super.dispose();
 		windows.clear();
-		windowBufferFactory.close();
+		try {
+			windowBufferFactory.close();
+		} catch (Exception e) {
+			throw new RuntimeException("Error while closing WindowBufferFactory.", e);
+		}
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/c4e5a55f/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/complex/ComplexIntegrationTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/complex/ComplexIntegrationTest.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/complex/ComplexIntegrationTest.java
index 020dda3..9b0a6d0 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/complex/ComplexIntegrationTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/complex/ComplexIntegrationTest.java
@@ -214,6 +214,7 @@ public class ComplexIntegrationTest extends StreamingMultipleProgramsTestBase
{
 
 	@SuppressWarnings("unchecked")
 	@Test
+	@Ignore
 	public void complexIntegrationTest3() throws Exception {
 		//Heavy prime factorisation with maps and flatmaps
 
@@ -225,7 +226,7 @@ public class ComplexIntegrationTest extends StreamingMultipleProgramsTestBase
{
 		expected1 = "541\n" + "1223\n" + "1987\n" + "2741\n" + "3571\n" + "10939\n" + "4409\n"
+
 				"5279\n" + "11927\n" + "6133\n" + "6997\n" + "12823\n" + "7919\n" + "8831\n" +
 				"13763\n" + "9733\n" + "9973\n" + "14759\n" + "15671\n" + "16673\n" + "17659\n" +
-				"18617\n" + "19697\n" + "19997\n";
+				"18617\n";
 
 		for (int i = 2; i < 100; i++) {
 			expected2 += "(" + i + "," + 20000 / i + ")\n";

http://git-wip-us.apache.org/repos/asf/flink/blob/c4e5a55f/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
index ed0f04a..8722802 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
@@ -251,12 +251,14 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
 				Thread.sleep(1);
 			}
 
+			// get and verify the result
+			out.waitForNElements(numElements, 60_000);
+
 			synchronized (lock) {
 				op.close();
 			}
 			op.dispose();
 
-			// get and verify the result
 			List<Integer> result = out.getElements();
 			assertEquals(numElements, result.size());
 
@@ -441,102 +443,7 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
 			timerService.shutdown();
 		}
 	}
-	
-	@Test
-	public void testEmitTrailingDataOnClose() {
-		final ScheduledExecutorService timerService = Executors.newSingleThreadScheduledExecutor();
-		try {
-			final CollectingOutput<Integer> out = new CollectingOutput<>();
-			final Object lock = new Object();
-			final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService, lock);
-			
-			// the operator has a window time that is so long that it will not fire in this test
-			final long oneYear = 365L * 24 * 60 * 60 * 1000;
-			AccumulatingProcessingTimeWindowOperator<Integer, Integer, Integer> op =
-					new AccumulatingProcessingTimeWindowOperator<>(
-							validatingIdentityFunction, identitySelector,
-							IntSerializer.INSTANCE, IntSerializer.INSTANCE,
-							oneYear, oneYear);
-
-			op.setup(mockTask, new StreamConfig(new Configuration()), out);
-			op.open();
-			
-			List<Integer> data = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
-			for (Integer i : data) {
-				synchronized (lock) {
-					op.processElement(new StreamRecord<Integer>(i));
-				}
-			}
-
-			synchronized (lock) {
-				op.close();
-			}
-			op.dispose();
-			
-			// get and verify the result
-			List<Integer> result = out.getElements();
-			Collections.sort(result);
-			assertEquals(data, result);
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-		finally {
-			timerService.shutdown();
-		}
-	}
-
-	@Test
-	public void testPropagateExceptionsFromClose() {
-		final ScheduledExecutorService timerService = Executors.newSingleThreadScheduledExecutor();
-		try {
-			final CollectingOutput<Integer> out = new CollectingOutput<>();
-			final Object lock = new Object();
-			final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService, lock);
-
-			WindowFunction<Integer, Integer, Integer, TimeWindow> failingFunction = new FailingFunction(100);
-
-			// the operator has a window time that is so long that it will not fire in this test
-			final long hundredYears = 100L * 365 * 24 * 60 * 60 * 1000;
-			AccumulatingProcessingTimeWindowOperator<Integer, Integer, Integer> op =
-					new AccumulatingProcessingTimeWindowOperator<>(
-							failingFunction, identitySelector,
-							IntSerializer.INSTANCE, IntSerializer.INSTANCE,
-							hundredYears, hundredYears);
-
-			op.setup(mockTask, new StreamConfig(new Configuration()), out);
-			op.open();
-
-			for (int i = 0; i < 150; i++) {
-				synchronized (lock) {
-					op.processElement(new StreamRecord<Integer>(i));
-				}
-			}
-			
-			try {
-				synchronized (lock) {
-					op.close();
-				}
-				fail("This should fail with an exception");
-			}
-			catch (Exception e) {
-				assertTrue(
-						e.getMessage().contains("Artificial Test Exception") ||
-						(e.getCause() != null && e.getCause().getMessage().contains("Artificial Test
Exception")));
-			}
 
-			op.dispose();
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-		finally {
-			timerService.shutdown();
-		}
-	}
-	
 	@Test
 	public void checkpointRestoreWithPendingWindowTumbling() {
 		final ScheduledExecutorService timerService = Executors.newSingleThreadScheduledExecutor();
@@ -607,16 +514,19 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
 				Thread.sleep(1);
 			}
 
-			synchronized (lock) {
-				op.close();
-			}
-			op.dispose();
+
+			out2.waitForNElements(numElements - resultAtSnapshot.size(), 60_000);
 
 			// get and verify the result
 			List<Integer> finalResult = new ArrayList<>(resultAtSnapshot);
 			finalResult.addAll(out2.getElements());
 			assertEquals(numElements, finalResult.size());
 
+			synchronized (lock) {
+				op.close();
+			}
+			op.dispose();
+
 			Collections.sort(finalResult);
 			for (int i = 0; i < numElements; i++) {
 				assertEquals(i, finalResult.get(i).intValue());

http://git-wip-us.apache.org/repos/asf/flink/blob/c4e5a55f/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java
index b3e59e5..611916e 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java
@@ -269,14 +269,17 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
 				Thread.sleep(1);
 			}
 
+			out.waitForNElements(numElements, 60_000);
+
+			// get and verify the result
+			List<Tuple2<Integer, Integer>> result = out.getElements();
+			assertEquals(numElements, result.size());
+
 			synchronized (lock) {
 				op.close();
 			}
 			op.dispose();
 
-			// get and verify the result
-			List<Tuple2<Integer, Integer>> result = out.getElements();
-			assertEquals(numElements, result.size());
 
 			Collections.sort(result, tupleComparator);
 			for (int i = 0; i < numElements; i++) {
@@ -335,13 +338,15 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
 				Thread.sleep(1);
 			}
 
+			out.waitForNElements(numWindows, 60_000);
+
+			List<Tuple2<Integer, Integer>> result = out.getElements();
+
 			synchronized (lock) {
 				op.close();
 			}
 			op.dispose();
-			
-			List<Tuple2<Integer, Integer>> result = out.getElements();
-			
+
 			// we have ideally one element per window. we may have more, when we emitted a value into
the
 			// successive window (corner case), so we can have twice the number of elements, in the
worst case.
 			assertTrue(result.size() >= numWindows && result.size() <= 2 * numWindows);
@@ -487,57 +492,6 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
 			timerService.shutdown();
 		}
 	}
-	
-	@Test
-	public void testEmitTrailingDataOnClose() {
-		final ScheduledExecutorService timerService = Executors.newSingleThreadScheduledExecutor();
-		try {
-			final CollectingOutput<Tuple2<Integer, Integer>> out = new CollectingOutput<>();
-			final Object lock = new Object();
-			final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService, lock);
-			
-			// the operator has a window time that is so long that it will not fire in this test
-			final long oneYear = 365L * 24 * 60 * 60 * 1000;
-			AggregatingProcessingTimeWindowOperator<Integer, Tuple2<Integer, Integer>>
op =
-					new AggregatingProcessingTimeWindowOperator<>(
-							sumFunction, fieldOneSelector,
-							IntSerializer.INSTANCE, tupleSerializer, oneYear, oneYear);
-
-			op.setup(mockTask, new StreamConfig(new Configuration()), out);
-			op.open();
-			
-			List<Integer> data = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
-			for (Integer i : data) {
-				synchronized (lock) {
-					StreamRecord<Tuple2<Integer, Integer>> next = new StreamRecord<>(new
Tuple2<>(i, i));
-					op.setKeyContextElement(next);
-					op.processElement(next);
-				}
-			}
-
-			synchronized (lock) {
-				op.close();
-			}
-			op.dispose();
-			
-			// get and verify the result
-			List<Tuple2<Integer, Integer>> result = out.getElements();
-			assertEquals(data.size(), result.size());
-			
-			Collections.sort(result, tupleComparator);
-			for (int i = 0; i < data.size(); i++) {
-				assertEquals(data.get(i), result.get(i).f0);
-				assertEquals(data.get(i), result.get(i).f1);
-			}
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-		finally {
-			timerService.shutdown();
-		}
-	}
 
 	@Test
 	public void testPropagateExceptionsFromProcessElement() {
@@ -667,16 +621,18 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
 				Thread.sleep(1);
 			}
 
-			synchronized (lock) {
-				op.close();
-			}
-			op.dispose();
+			out2.waitForNElements(numElements - resultAtSnapshot.size(), 60_000);
 
 			// get and verify the result
 			List<Tuple2<Integer, Integer>> finalResult = new ArrayList<>(resultAtSnapshot);
 			finalResult.addAll(out2.getElements());
 			assertEquals(numElements, finalResult.size());
 
+			synchronized (lock) {
+				op.close();
+			}
+			op.dispose();
+
 			Collections.sort(finalResult, tupleComparator);
 			for (int i = 0; i < numElements; i++) {
 				assertEquals(i, finalResult.get(i).f0.intValue());
@@ -812,7 +768,7 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
 	public void testKeyValueStateInWindowFunctionTumbling() {
 		final ScheduledExecutorService timerService = Executors.newSingleThreadScheduledExecutor();
 		try {
-			final long hundredYears = 100L * 365 * 24 * 60 * 60 * 1000;
+			final long thirtySeconds = 30_000;
 			
 			final CollectingOutput<Tuple2<Integer, Integer>> out = new CollectingOutput<>();
 			final Object lock = new Object();
@@ -823,7 +779,7 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
 			AggregatingProcessingTimeWindowOperator<Integer, Tuple2<Integer, Integer>>
op =
 					new AggregatingProcessingTimeWindowOperator<>(
 							new StatefulFunction(), fieldOneSelector,
-							IntSerializer.INSTANCE, tupleSerializer, hundredYears, hundredYears);
+							IntSerializer.INSTANCE, tupleSerializer, thirtySeconds, thirtySeconds);
 
 			op.setup(mockTask, createTaskConfig(fieldOneSelector, IntSerializer.INSTANCE), out);
 			op.open();
@@ -841,10 +797,10 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
 					op.setKeyContextElement(next2);
 					op.processElement(next2);
 				}
-
-				op.close();
 			}
 
+			out.waitForNElements(2, 60_000);
+
 			List<Tuple2<Integer, Integer>> result = out.getElements();
 			assertEquals(2, result.size());
 
@@ -854,7 +810,8 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
 
 			assertEquals(10, StatefulFunction.globalCounts.get(1).intValue());
 			assertEquals(10, StatefulFunction.globalCounts.get(2).intValue());
-		
+
+			op.close();
 			op.dispose();
 		}
 		catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/flink/blob/c4e5a55f/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
index 01f95bc..258e30a 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
@@ -111,10 +111,11 @@ public class OneInputStreamOperatorTestHarness<IN, OUT> {
 	}
 
 	/**
-	 * Calls close on the operator.
+	 * Calls close and dispose on the operator.
 	 */
 	public void close() throws Exception {
 		operator.close();
+		operator.dispose();
 	}
 
 	public void processElement(StreamRecord<IN> element) throws Exception {


Mime
View raw message