flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From aljos...@apache.org
Subject [1/4] flink git commit: Fix test formatting and move WindowAssignerContext to WindowAssigner
Date Mon, 27 Jun 2016 15:50:34 GMT
Repository: flink
Updated Branches:
  refs/heads/master cb2b76dc4 -> 90cfe0a7b


Fix test formatting and move WindowAssignerContext to WindowAssigner


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/90cfe0a7
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/90cfe0a7
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/90cfe0a7

Branch: refs/heads/master
Commit: 90cfe0a7b499832cebc2a53f7b066f83dde17de5
Parents: 8803d15
Author: Aljoscha Krettek <aljoscha.krettek@gmail.com>
Authored: Mon Jun 27 13:56:27 2016 +0200
Committer: Aljoscha Krettek <aljoscha.krettek@gmail.com>
Committed: Mon Jun 27 17:16:12 2016 +0200

----------------------------------------------------------------------
 .../api/windowing/assigners/WindowAssigner.java | 21 +++++++
 .../assigners/WindowAssignerContext.java        | 37 ------------
 .../operators/windowing/WindowOperator.java     |  5 +-
 .../operators/windowing/WindowOperatorTest.java | 61 +++++++++++++-------
 .../util/OneInputStreamOperatorTestHarness.java | 33 ++---------
 5 files changed, 67 insertions(+), 90 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/90cfe0a7/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/WindowAssigner.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/WindowAssigner.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/WindowAssigner.java
index c25d6d9..9f487af 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/WindowAssigner.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/WindowAssigner.java
@@ -24,6 +24,8 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.windowing.triggers.Trigger;
 import org.apache.flink.streaming.api.windowing.windows.Window;
+import org.apache.flink.streaming.runtime.tasks.StreamTask;
+
 import java.io.Serializable;
 
 import java.util.Collection;
@@ -70,4 +72,23 @@ public abstract class WindowAssigner<T, W extends Window> implements
Serializabl
 	 * {@code false} otherwise.
 	 */
 	public abstract boolean isEventTime();
+
+	/**
+	 * A context provided to the {@link WindowAssigner} that allows it to query the
+	 * current processing time.
+	 *
+	 * <p>This is provided to the assigner by its containing
+	 * {@link org.apache.flink.streaming.runtime.operators.windowing.WindowOperator},
+	 * which, in turn, gets it from the containing
+	 * {@link org.apache.flink.streaming.runtime.tasks.StreamTask}.
+	 */
+	public abstract static class WindowAssignerContext {
+
+		/**
+		 * Returns the current processing time, as returned by
+		 * the {@link StreamTask#getCurrentProcessingTime()}.
+		 */
+		public abstract long getCurrentProcessingTime();
+
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/90cfe0a7/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/WindowAssignerContext.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/WindowAssignerContext.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/WindowAssignerContext.java
deleted file mode 100644
index e3f51a2..0000000
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/WindowAssignerContext.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.api.windowing.assigners;
-
-import org.apache.flink.streaming.runtime.tasks.StreamTask;
-
-/**
- * A context provided to the {@link WindowAssigner} that allows it to query the
- * current processing time. This is provided to the assigner by its containing
- * {@link org.apache.flink.streaming.runtime.operators.windowing.WindowOperator},
- * which, in turn, gets it from the containing
- * {@link org.apache.flink.streaming.runtime.tasks.StreamTask}.
- */
-public abstract class WindowAssignerContext {
-
-	/**
-	 * Returns the current processing time, as returned by
-	 * the {@link StreamTask#getCurrentProcessingTime()}.
-	 */
-	public abstract long getCurrentProcessingTime();
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/90cfe0a7/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
index ad01a5a..f06fd33 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
@@ -50,7 +50,6 @@ import org.apache.flink.streaming.api.operators.ChainingStrategy;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.operators.TimestampedCollector;
 import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.api.windowing.assigners.WindowAssignerContext;
 import org.apache.flink.streaming.api.windowing.assigners.MergingWindowAssigner;
 import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
 import org.apache.flink.streaming.api.windowing.triggers.Trigger;
@@ -161,7 +160,7 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
 
 	protected transient Context context = new Context(null, null);
 
-	protected transient WindowAssignerContext windowAssignerContext;
+	protected transient WindowAssigner.WindowAssignerContext windowAssignerContext;
 
 	// ------------------------------------------------------------------------
 	// State that needs to be checkpointed
@@ -248,7 +247,7 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
 
 		context = new Context(null, null);
 
-		windowAssignerContext = new WindowAssignerContext() {
+		windowAssignerContext = new WindowAssigner.WindowAssignerContext() {
 			@Override
 			public long getCurrentProcessingTime() {
 				return WindowOperator.this.getCurrentProcessingTime();

http://git-wip-us.apache.org/repos/asf/flink/blob/90cfe0a7/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
index 4f3ff63..8ba6da2 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
@@ -43,7 +43,7 @@ import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindow
 import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
 import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
 import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
-import org.apache.flink.streaming.api.windowing.assigners.WindowAssignerContext;
+import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
 import org.apache.flink.streaming.api.windowing.evictors.CountEvictor;
 import org.apache.flink.streaming.api.windowing.time.Time;
 import org.apache.flink.streaming.api.windowing.triggers.ContinuousEventTimeTrigger;
@@ -958,7 +958,7 @@ public class WindowOperatorTest {
 
 		testHarness.configureForKeyedStream(new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
 
-		long initialTime = 0L;
+		ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
 
 		testHarness.open();
 
@@ -966,23 +966,25 @@ public class WindowOperatorTest {
 
 		// timestamp is ignored in processing time
 		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), Long.MAX_VALUE));
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime
+ 7000));
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime
+ 7000));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 7000));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 7000));
 
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), initialTime
+ 7000));
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), initialTime
+ 7000));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), 7000));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), 7000));
 
 		testTimeProvider.setCurrentTime(5000);
 
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), initialTime
+ 7000));
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), initialTime
+ 7000));
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), initialTime
+ 7000));
+		expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 3), 2999));
+		expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 2), 2999));
+
+		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(),
new Tuple2ResultSortComparator());
+
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), 7000));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), 7000));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), 7000));
 
 		testTimeProvider.setCurrentTime(7000);
 
-		ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
-		expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 3), 2999));
-		expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 2), 2999));
 		expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 3), 5999));
 
 		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(),
new Tuple2ResultSortComparator());
@@ -1018,6 +1020,8 @@ public class WindowOperatorTest {
 
 		testHarness.configureForKeyedStream(new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
 
+		ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
+
 		testHarness.open();
 
 		// timestamp is ignored in processing time
@@ -1025,27 +1029,36 @@ public class WindowOperatorTest {
 		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), Long.MAX_VALUE));
 
 		testTimeProvider.setCurrentTime(1000);
+
+		expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 1), 999));
+
+		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(),
new Tuple2ResultSortComparator());
+
 		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), Long.MAX_VALUE));
 		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), Long.MAX_VALUE));
 
 		testTimeProvider.setCurrentTime(2000);
+
+		expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 3), 1999));
+		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(),
new Tuple2ResultSortComparator());
+
 		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), Long.MAX_VALUE));
 		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), Long.MAX_VALUE));
 
 		testTimeProvider.setCurrentTime(3000);
+
+		expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 3), 2999));
+		expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 2), 2999));
+
+		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(),
new Tuple2ResultSortComparator());
+
 		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), Long.MAX_VALUE));
 		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), Long.MAX_VALUE));
 		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), Long.MAX_VALUE));
 
 		testTimeProvider.setCurrentTime(7000);
 
-		ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
-		expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 1), 999));
-		expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 3), 1999));
-		expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 3), 2999));
 		expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 2), 3999));
-
-		expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 2), 2999));
 		expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 5), 3999));
 		expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 5), 4999));
 		expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 3), 5999));
@@ -1082,6 +1095,8 @@ public class WindowOperatorTest {
 
 		testHarness.configureForKeyedStream(new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
 
+		ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
+
 		testHarness.open();
 
 		// timestamp is ignored in processing time
@@ -1092,6 +1107,11 @@ public class WindowOperatorTest {
 		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 1002));//Long.MAX_VALUE));
 
 		testTimeProvider.setCurrentTime(5000);
+
+		expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 2), 3999));
+
+		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(),
new Tuple2ResultSortComparator());
+
 		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 5000));
 		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 5000));
 		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), 5000));
@@ -1100,14 +1120,11 @@ public class WindowOperatorTest {
 
 		testTimeProvider.setCurrentTime(10000);
 
-		ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
-		expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 2), 3999));
 		expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 2), 7999));
 		expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 3), 7999));
 
 		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(),
new Tuple2ResultSortComparator());
 
-
 		assertEquals(expectedOutput.size(), testHarness.getOutput().size());
 		for (Object elem : testHarness.getOutput()) {
 			if (elem instanceof StreamRecord) {
@@ -1214,7 +1231,7 @@ public class WindowOperatorTest {
 		ConcurrentLinkedQueue<Object> expected = new ConcurrentLinkedQueue<>();
 
 		long timestamp = Long.MAX_VALUE - 1750;
-		Collection<TimeWindow> windows = windowAssigner.assignWindows(new Tuple2<>("key2",
1), timestamp, new WindowAssignerContext() {
+		Collection<TimeWindow> windows = windowAssigner.assignWindows(new Tuple2<>("key2",
1), timestamp, new WindowAssigner.WindowAssignerContext() {
 			@Override
 			public long getCurrentProcessingTime() {
 				return operator.windowAssignerContext.getCurrentProcessingTime();

http://git-wip-us.apache.org/repos/asf/flink/blob/90cfe0a7/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
index 86137d4..63d22a5 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
@@ -35,6 +35,7 @@ import org.apache.flink.runtime.state.memory.MemoryStateBackend;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.operators.Triggerable;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.DefaultTimeServiceProvider;
 import org.apache.flink.streaming.runtime.tasks.StreamTask;
 import org.apache.flink.streaming.runtime.tasks.StreamTaskState;
 import org.apache.flink.streaming.runtime.tasks.TimeServiceProvider;
@@ -43,6 +44,7 @@ import org.mockito.stubbing.Answer;
 
 import java.util.Collection;
 import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.Executors;
 
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyLong;
@@ -85,7 +87,7 @@ public class OneInputStreamOperatorTestHarness<IN, OUT> {
 	}
 
 	public OneInputStreamOperatorTestHarness(OneInputStreamOperator<IN, OUT> operator,
ExecutionConfig executionConfig) {
-		this(operator, executionConfig, null);
+		this(operator, executionConfig, DefaultTimeServiceProvider.create(Executors.newSingleThreadScheduledExecutor()));
 	}
 
 	public OneInputStreamOperatorTestHarness(OneInputStreamOperator<IN, OUT> operator,
ExecutionConfig executionConfig,
@@ -127,31 +129,8 @@ public class OneInputStreamOperatorTestHarness<IN, OUT> {
 				final long execTime = (Long) invocation.getArguments()[0];
 				final Triggerable target = (Triggerable) invocation.getArguments()[1];
 
-				if (timeServiceProvider == null) {
-					Thread caller = new Thread() {
-						@Override
-						public void run() {
-							final long delay = execTime - mockTask.getCurrentProcessingTime();
-							if (delay > 0) {
-								try {
-									Thread.sleep(delay);
-								} catch (InterruptedException ignored) {
-								}
-							}
-
-							synchronized (checkpointLock) {
-								try {
-									target.trigger(execTime);
-								} catch (Exception ignored) {
-								}
-							}
-						}
-					};
-					caller.start();
-				} else {
-					timeServiceProvider.registerTimer(
+				timeServiceProvider.registerTimer(
 						execTime, new TriggerTask(checkpointLock, target, execTime));
-				}
 				return null;
 			}
 		}).when(mockTask).registerTimer(anyLong(), any(Triggerable.class));
@@ -159,9 +138,7 @@ public class OneInputStreamOperatorTestHarness<IN, OUT> {
 		doAnswer(new Answer<Long>() {
 			@Override
 			public Long answer(InvocationOnMock invocation) throws Throwable {
-				return timeServiceProvider == null ?
-					System.currentTimeMillis() :
-					timeServiceProvider.getCurrentProcessingTime();
+				return timeServiceProvider.getCurrentProcessingTime();
 			}
 		}).when(mockTask).getCurrentProcessingTime();
 	}


Mime
View raw message