flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From aljos...@apache.org
Subject flink git commit: [FLINK-5296] Expose the old AlignedWindowOperators through special assigners
Date Thu, 12 Jan 2017 15:41:13 GMT
Repository: flink
Updated Branches:
  refs/heads/master d7e862afe -> fc343e0c3


[FLINK-5296] Expose the old AlignedWindowOperators through special assigners

The user can use the deprecated AccumulatingProcessingTimeWindowOperator
and AggregatingProcessingTimeWindowOperator by using the
TumblingAlignedProcessingTimeWindows and the
SlidingAlignedProcessingTimeWindows introduced by this
commit. These operators are neither backwards compatibility
nor rescalable.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/fc343e0c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/fc343e0c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/fc343e0c

Branch: refs/heads/master
Commit: fc343e0c390991d1d647df89041b4d872e29b6d3
Parents: d7e862a
Author: kl0u <kkloudas@gmail.com>
Authored: Tue Dec 20 13:51:45 2016 +0100
Committer: Aljoscha Krettek <aljoscha.krettek@gmail.com>
Committed: Thu Jan 12 16:34:17 2017 +0100

----------------------------------------------------------------------
 .../api/datastream/WindowedStream.java          | 106 +++++++++++++++++++
 .../assigners/BaseAlignedWindowAssigner.java    |  72 +++++++++++++
 .../SlidingAlignedProcessingTimeWindows.java    |  63 +++++++++++
 .../TumblingAlignedProcessingTimeWindows.java   |  55 ++++++++++
 .../TumblingProcessingTimeWindows.java          |   1 +
 .../operators/windowing/WindowOperator.java     |   6 ++
 .../windowing/TimeWindowTranslationTest.java    |  54 ++++++++--
 .../api/scala/TimeWindowTranslationTest.scala   |  56 ++++++++--
 8 files changed, 400 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/fc343e0c/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
index e2f930a..704875b 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
@@ -41,15 +41,21 @@ import org.apache.flink.streaming.api.functions.windowing.PassThroughWindowFunct
 import org.apache.flink.streaming.api.functions.windowing.ReduceApplyWindowFunction;
 import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.windowing.assigners.BaseAlignedWindowAssigner;
 import org.apache.flink.streaming.api.windowing.assigners.MergingWindowAssigner;
+import org.apache.flink.streaming.api.windowing.assigners.SlidingAlignedProcessingTimeWindows;
 import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
+import org.apache.flink.streaming.api.windowing.assigners.TumblingAlignedProcessingTimeWindows;
 import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
 import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
 import org.apache.flink.streaming.api.windowing.evictors.Evictor;
 import org.apache.flink.streaming.api.windowing.time.Time;
 import org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger;
 import org.apache.flink.streaming.api.windowing.triggers.Trigger;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
 import org.apache.flink.streaming.api.windowing.windows.Window;
+import org.apache.flink.streaming.runtime.operators.windowing.AccumulatingProcessingTimeWindowOperator;
+import org.apache.flink.streaming.runtime.operators.windowing.AggregatingProcessingTimeWindowOperator;
 import org.apache.flink.streaming.runtime.operators.windowing.EvictingWindowOperator;
 import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction;
 import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueWindowFunction;
@@ -117,6 +123,10 @@ public class WindowedStream<T, K, W extends Window> {
 			throw new UnsupportedOperationException("A merging window assigner cannot be used with
a trigger that does not support merging.");
 		}
 
+		if (windowAssigner instanceof BaseAlignedWindowAssigner) {
+			throw new UnsupportedOperationException("Cannot use a " + windowAssigner.getClass().getSimpleName()
+ " with a custom trigger.");
+		}
+
 		this.trigger = trigger;
 		return this;
 	}
@@ -153,6 +163,10 @@ public class WindowedStream<T, K, W extends Window> {
 		if (windowAssigner instanceof MergingWindowAssigner) {
 			throw new UnsupportedOperationException("Cannot use a merging WindowAssigner with an Evictor.");
 		}
+
+		if (windowAssigner instanceof BaseAlignedWindowAssigner) {
+			throw new UnsupportedOperationException("Cannot use a " + windowAssigner.getClass().getSimpleName()
+ " with an Evictor.");
+		}
 		this.evictor = evictor;
 		return this;
 	}
@@ -187,6 +201,15 @@ public class WindowedStream<T, K, W extends Window> {
 
 		//clean the closure
 		function = input.getExecutionEnvironment().clean(function);
+
+		String callLocation = Utils.getCallLocationName();
+		String udfName = "WindowedStream." + callLocation;
+
+		SingleOutputStreamOperator<T> result = createFastTimeOperatorIfValid(function, input.getType(),
udfName);
+		if (result != null) {
+			return result;
+		}
+
 		LegacyWindowOperatorType legacyOpType = getLegacyWindowType(function);
 		return reduce(function, new PassThroughWindowFunction<K, W, T>(), legacyOpType);
 	}
@@ -421,6 +444,11 @@ public class WindowedStream<T, K, W extends Window> {
 			throw new UnsupportedOperationException("Fold cannot be used with a merging WindowAssigner.");
 		}
 
+		if (windowAssigner instanceof BaseAlignedWindowAssigner) {
+			throw new UnsupportedOperationException("Fold cannot be used with a " +
+				windowAssigner.getClass().getSimpleName() + " assigner.");
+		}
+
 		//clean the closures
 		function = input.getExecutionEnvironment().clean(function);
 		foldFunction = input.getExecutionEnvironment().clean(foldFunction);
@@ -512,6 +540,11 @@ public class WindowedStream<T, K, W extends Window> {
 		String callLocation = Utils.getCallLocationName();
 		String udfName = "WindowedStream." + callLocation;
 
+		SingleOutputStreamOperator<R> result = createFastTimeOperatorIfValid(function, resultType,
udfName);
+		if (result != null) {
+			return result;
+		}
+
 		LegacyWindowOperatorType legacyWindowOpType = getLegacyWindowType(function);
 		String opName;
 		KeySelector<T, K> keySel = input.getKeySelector();
@@ -977,6 +1010,79 @@ public class WindowedStream<T, K, W extends Window> {
 		return LegacyWindowOperatorType.NONE;
 	}
 
+	private <R> SingleOutputStreamOperator<R> createFastTimeOperatorIfValid(
+			Function function,
+			TypeInformation<R> resultType,
+			String functionName) {
+
+		if (windowAssigner instanceof SlidingAlignedProcessingTimeWindows && trigger ==
null && evictor == null) {
+			SlidingAlignedProcessingTimeWindows timeWindows = (SlidingAlignedProcessingTimeWindows)
windowAssigner;
+			final long windowLength = timeWindows.getSize();
+			final long windowSlide = timeWindows.getSlide();
+
+			String opName = "Fast " + timeWindows + " of " + functionName;
+
+			if (function instanceof ReduceFunction) {
+				@SuppressWarnings("unchecked")
+				ReduceFunction<T> reducer = (ReduceFunction<T>) function;
+
+				@SuppressWarnings("unchecked")
+				OneInputStreamOperator<T, R> op = (OneInputStreamOperator<T, R>)
+						new AggregatingProcessingTimeWindowOperator<>(
+								reducer, input.getKeySelector(), 
+								input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
+								input.getType().createSerializer(getExecutionEnvironment().getConfig()),
+								windowLength, windowSlide);
+				return input.transform(opName, resultType, op);
+			}
+			else if (function instanceof WindowFunction) {
+				@SuppressWarnings("unchecked")
+				WindowFunction<T, R, K, TimeWindow> wf = (WindowFunction<T, R, K, TimeWindow>)
function;
+
+				OneInputStreamOperator<T, R> op = new AccumulatingProcessingTimeWindowOperator<>(
+						wf, input.getKeySelector(),
+						input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
+						input.getType().createSerializer(getExecutionEnvironment().getConfig()),
+						windowLength, windowSlide);
+				return input.transform(opName, resultType, op);
+			}
+		} else if (windowAssigner instanceof TumblingAlignedProcessingTimeWindows && trigger
== null && evictor == null) {
+			TumblingAlignedProcessingTimeWindows timeWindows = (TumblingAlignedProcessingTimeWindows)
windowAssigner;
+			final long windowLength = timeWindows.getSize();
+			final long windowSlide = timeWindows.getSize();
+
+			String opName = "Fast " + timeWindows + " of " + functionName;
+
+			if (function instanceof ReduceFunction) {
+				@SuppressWarnings("unchecked")
+				ReduceFunction<T> reducer = (ReduceFunction<T>) function;
+
+				@SuppressWarnings("unchecked")
+				OneInputStreamOperator<T, R> op = (OneInputStreamOperator<T, R>)
+						new AggregatingProcessingTimeWindowOperator<>(
+								reducer,
+								input.getKeySelector(),
+								input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
+								input.getType().createSerializer(getExecutionEnvironment().getConfig()),
+								windowLength, windowSlide);
+				return input.transform(opName, resultType, op);
+			}
+			else if (function instanceof WindowFunction) {
+				@SuppressWarnings("unchecked")
+				WindowFunction<T, R, K, TimeWindow> wf = (WindowFunction<T, R, K, TimeWindow>)
function;
+
+				OneInputStreamOperator<T, R> op = new AccumulatingProcessingTimeWindowOperator<>(
+						wf, input.getKeySelector(),
+						input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
+						input.getType().createSerializer(getExecutionEnvironment().getConfig()),
+						windowLength, windowSlide);
+				return input.transform(opName, resultType, op);
+			}
+		}
+
+		return null;
+	}
+
 	public StreamExecutionEnvironment getExecutionEnvironment() {
 		return input.getExecutionEnvironment();
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/fc343e0c/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/BaseAlignedWindowAssigner.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/BaseAlignedWindowAssigner.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/BaseAlignedWindowAssigner.java
new file mode 100644
index 0000000..6ea4b8e
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/BaseAlignedWindowAssigner.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.windowing.assigners;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.windowing.triggers.Trigger;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+
+import java.util.Collection;
+
+/**
+ * A base {@link WindowAssigner} used to instantiate one of the deprecated
+ * {@link org.apache.flink.streaming.runtime.operators.windowing.AccumulatingProcessingTimeWindowOperator
+ * AccumulatingProcessingTimeWindowOperator} and
+ * {@link org.apache.flink.streaming.runtime.operators.windowing.AggregatingProcessingTimeWindowOperator
+ * AggregatingProcessingTimeWindowOperator}.
+ *
+ * <p>
+ * For assigner that extend this one, the user can check the {@link TumblingAlignedProcessingTimeWindows}
+ * and the {@link SlidingAlignedProcessingTimeWindows}.
+ * */
+public class BaseAlignedWindowAssigner extends WindowAssigner<Object, TimeWindow> {
+
+	private static final long serialVersionUID = -6214980179706960234L;
+
+	private final long size;
+
+	protected BaseAlignedWindowAssigner(long size) {
+		this.size = size;
+	}
+
+	public long getSize() {
+		return size;
+	}
+
+	@Override
+	public Collection<TimeWindow> assignWindows(Object element, long timestamp, WindowAssignerContext
context) {
+		throw new UnsupportedOperationException("This assigner should not be used with the WindowOperator.");
+	}
+
+	@Override
+	public Trigger<Object, TimeWindow> getDefaultTrigger(StreamExecutionEnvironment env)
{
+		return null;
+	}
+
+	@Override
+	public TypeSerializer<TimeWindow> getWindowSerializer(ExecutionConfig executionConfig)
{
+		throw new UnsupportedOperationException("This assigner should not be used with the WindowOperator.");
+	}
+
+	@Override
+	public boolean isEventTime() {
+		throw new UnsupportedOperationException("This assigner should not be used with the WindowOperator.");
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/fc343e0c/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingAlignedProcessingTimeWindows.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingAlignedProcessingTimeWindows.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingAlignedProcessingTimeWindows.java
new file mode 100644
index 0000000..743ee0b
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingAlignedProcessingTimeWindows.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.windowing.assigners;
+
+import org.apache.flink.streaming.api.windowing.time.Time;
+
+/**
+ * A processing time sliding {@link WindowAssigner window assigner} used to perform windowing
using the
+ * {@link org.apache.flink.streaming.runtime.operators.windowing.AccumulatingProcessingTimeWindowOperator
+ * AccumulatingProcessingTimeWindowOperator} and the
+ * {@link org.apache.flink.streaming.runtime.operators.windowing.AggregatingProcessingTimeWindowOperator
+ * AggregatingProcessingTimeWindowOperator}.
+ *
+ * <p>
+ * With this assigner, the {@code trigger} used is a
+ * {@link org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger
+ * ProcessingTimeTrigger} and no {@code evictor} can be specified.
+ *
+ * <p>
+ * <b>WARNING:</b> Bear in mind that no rescaling and no backwards compatibility
is supported.
+ * */
+public class SlidingAlignedProcessingTimeWindows extends BaseAlignedWindowAssigner {
+
+	private static final long serialVersionUID = 3695562702662473688L;
+
+	private final long slide;
+
+	public SlidingAlignedProcessingTimeWindows(long size, long slide) {
+		super(size);
+		this.slide = slide;
+	}
+
+	public long getSlide() {
+		return slide;
+	}
+
+	/**
+	 * Creates a new {@code SlidingAlignedProcessingTimeWindows} {@link WindowAssigner} that
assigns
+	 * elements to sliding time windows based on the element timestamp.
+	 *
+	 * @param size The size of the generated windows.
+	 * @param slide The slide interval of the generated windows.
+	 */
+	public static SlidingAlignedProcessingTimeWindows of(Time size, Time slide) {
+		return new SlidingAlignedProcessingTimeWindows(size.toMilliseconds(), slide.toMilliseconds());
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/fc343e0c/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingAlignedProcessingTimeWindows.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingAlignedProcessingTimeWindows.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingAlignedProcessingTimeWindows.java
new file mode 100644
index 0000000..007fae9
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingAlignedProcessingTimeWindows.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.windowing.assigners;
+
+import org.apache.flink.streaming.api.windowing.time.Time;
+
+/**
+ * A processing time tumbling {@link WindowAssigner window assigner} used to perform windowing
using the
+ * {@link org.apache.flink.streaming.runtime.operators.windowing.AccumulatingProcessingTimeWindowOperator
+ * AccumulatingProcessingTimeWindowOperator} and the
+ * {@link org.apache.flink.streaming.runtime.operators.windowing.AggregatingProcessingTimeWindowOperator
+ * AggregatingProcessingTimeWindowOperator}.
+ *
+ * <p>
+ * With this assigner, the {@code trigger} used is a
+ * {@link org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger
+ * ProcessingTimeTrigger} and no {@code evictor} can be specified.
+ *
+ * <p>
+ * <b>WARNING:</b> Bear in mind that no rescaling and no backwards compatibility
is supported.
+ * */
+public class TumblingAlignedProcessingTimeWindows extends BaseAlignedWindowAssigner {
+
+	private static final long serialVersionUID = -6217477609512299842L;
+
+	protected TumblingAlignedProcessingTimeWindows(long size) {
+		super(size);
+	}
+
+	/**
+	 * Creates a new {@code TumblingAlignedProcessingTimeWindows} {@link WindowAssigner} that
assigns
+	 * elements to time windows based on the element timestamp.
+	 *
+	 * @param size The size of the generated windows.
+	 */
+	public static TumblingAlignedProcessingTimeWindows of(Time size) {
+		return new TumblingAlignedProcessingTimeWindows(size.toMilliseconds());
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/fc343e0c/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingProcessingTimeWindows.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingProcessingTimeWindows.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingProcessingTimeWindows.java
index f1e9e11..f4fb620 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingProcessingTimeWindows.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingProcessingTimeWindows.java
@@ -108,6 +108,7 @@ public class TumblingProcessingTimeWindows extends WindowAssigner<Object,
TimeWi
 	public static TumblingProcessingTimeWindows of(Time size, Time offset) {
 		return new TumblingProcessingTimeWindows(size.toMilliseconds(), offset.toMilliseconds()
% size.toMilliseconds());
 	}
+
 	@Override
 	public TypeSerializer<TimeWindow> getWindowSerializer(ExecutionConfig executionConfig)
{
 		return new TimeWindow.Serializer();

http://git-wip-us.apache.org/repos/asf/flink/blob/fc343e0c/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
index 990162e..628d663 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
@@ -50,6 +50,7 @@ import org.apache.flink.streaming.api.operators.InternalTimer;
 import org.apache.flink.streaming.api.operators.InternalTimerService;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.api.windowing.assigners.BaseAlignedWindowAssigner;
 import org.apache.flink.streaming.api.windowing.assigners.MergingWindowAssigner;
 import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
 import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
@@ -214,6 +215,11 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
 
 		super(windowFunction);
 
+		checkArgument(!(windowAssigner instanceof BaseAlignedWindowAssigner),
+			"The " + windowAssigner.getClass().getSimpleName() + " cannot be used with a WindowOperator.
" +
+				"This assigner is only used with the AccumulatingProcessingTimeWindowOperator and " +
+				"the AggregatingProcessingTimeWindowOperator");
+
 		checkArgument(allowedLateness >= 0);
 
 		checkArgument(windowStateDescriptor == null || windowStateDescriptor.isSerializerInitialized(),

http://git-wip-us.apache.org/repos/asf/flink/blob/fc343e0c/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TimeWindowTranslationTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TimeWindowTranslationTest.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TimeWindowTranslationTest.java
index 8e37021..df65ca2 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TimeWindowTranslationTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TimeWindowTranslationTest.java
@@ -32,7 +32,9 @@ import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
 import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.transformations.OneInputTransformation;
+import org.apache.flink.streaming.api.windowing.assigners.SlidingAlignedProcessingTimeWindows;
 import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
+import org.apache.flink.streaming.api.windowing.assigners.TumblingAlignedProcessingTimeWindows;
 import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
 import org.apache.flink.streaming.api.windowing.time.Time;
 import org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger;
@@ -52,12 +54,53 @@ import java.util.concurrent.TimeUnit;
 public class TimeWindowTranslationTest {
 
 	/**
+	 * Verifies that calls to timeWindow() instantiate a regular
+	 * windowOperator instead of an aligned one.
+	 */
+	@Test
+	public void testAlignedWindowDeprecation() throws Exception {
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
+
+		DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello",
1), Tuple2.of("hello", 2));
+
+		DummyReducer reducer = new DummyReducer();
+
+		DataStream<Tuple2<String, Integer>> window1 = source
+				.keyBy(0)
+				.timeWindow(Time.of(1000, TimeUnit.MILLISECONDS), Time.of(100, TimeUnit.MILLISECONDS))
+				.reduce(reducer);
+
+		OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>
transform1 = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>)
window1.getTransformation();
+		OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>>
operator1 = transform1.getOperator();
+		Assert.assertTrue(operator1 instanceof WindowOperator);
+
+		DataStream<Tuple2<String, Integer>> window2 = source
+				.keyBy(0)
+				.timeWindow(Time.of(1000, TimeUnit.MILLISECONDS))
+				.apply(new WindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>,
Tuple, TimeWindow>() {
+					private static final long serialVersionUID = 1L;
+
+					@Override
+					public void apply(Tuple tuple,
+							TimeWindow window,
+							Iterable<Tuple2<String, Integer>> values,
+							Collector<Tuple2<String, Integer>> out) throws Exception {
+
+					}
+				});
+
+		OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>
transform2 = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>)
window2.getTransformation();
+		OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>>
operator2 = transform2.getOperator();
+		Assert.assertTrue(operator2 instanceof WindowOperator);
+	}
+
+	/**
 	 * These tests ensure that the fast aligned time windows operator is used if the
 	 * conditions are right.
 	 */
 	@Test
-	@Ignore
-	public void testReduceFastTimeWindows() throws Exception {
+	public void testReduceAlignedTimeWindows() throws Exception {
 		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 		env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
 
@@ -67,7 +110,7 @@ public class TimeWindowTranslationTest {
 
 		DataStream<Tuple2<String, Integer>> window1 = source
 				.keyBy(0)
-				.timeWindow(Time.of(1000, TimeUnit.MILLISECONDS), Time.of(100, TimeUnit.MILLISECONDS))
+				.window(SlidingAlignedProcessingTimeWindows.of(Time.of(1000, TimeUnit.MILLISECONDS),
Time.of(100, TimeUnit.MILLISECONDS)))
 				.reduce(reducer);
 
 		OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>
transform1 = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>)
window1.getTransformation();
@@ -80,8 +123,7 @@ public class TimeWindowTranslationTest {
 	 * conditions are right.
 	 */
 	@Test
-	@Ignore
-	public void testApplyFastTimeWindows() throws Exception {
+	public void testApplyAlignedTimeWindows() throws Exception {
 		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 		env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
 
@@ -89,7 +131,7 @@ public class TimeWindowTranslationTest {
 
 		DataStream<Tuple2<String, Integer>> window1 = source
 				.keyBy(0)
-				.timeWindow(Time.of(1000, TimeUnit.MILLISECONDS))
+				.window(TumblingAlignedProcessingTimeWindows.of(Time.of(1000, TimeUnit.MILLISECONDS)))
 				.apply(new WindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>,
Tuple, TimeWindow>() {
 					private static final long serialVersionUID = 1L;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/fc343e0c/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/TimeWindowTranslationTest.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/TimeWindowTranslationTest.scala
b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/TimeWindowTranslationTest.scala
index ff97656..104400f 100644
--- a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/TimeWindowTranslationTest.scala
+++ b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/TimeWindowTranslationTest.scala
@@ -25,7 +25,7 @@ import org.apache.flink.api.java.tuple.Tuple
 import org.apache.flink.streaming.api.TimeCharacteristic
 import org.apache.flink.streaming.api.scala.function.WindowFunction
 import org.apache.flink.streaming.api.transformations.OneInputTransformation
-import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows
+import org.apache.flink.streaming.api.windowing.assigners.{SlidingAlignedProcessingTimeWindows,
SlidingEventTimeWindows, TumblingAlignedProcessingTimeWindows}
 import org.apache.flink.streaming.api.windowing.time.Time
 import org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger
 import org.apache.flink.streaming.api.windowing.windows.TimeWindow
@@ -42,19 +42,62 @@ import org.junit.{Ignore, Test}
 class TimeWindowTranslationTest extends StreamingMultipleProgramsTestBase {
 
   /**
+    * Verifies that calls to timeWindow() instantiate a regular
+    * windowOperator instead of an aligned one.
+    */
+  @Test
+  def testAlignedWindowDeprecation(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
+
+    val source = env.fromElements(("hello", 1), ("hello", 2))
+
+    val reducer = new DummyReducer
+
+    val window1 = source
+      .keyBy(0)
+      .timeWindow(Time.seconds(1), Time.milliseconds(100))
+      .reduce(reducer)
+
+    val transform1 = window1.javaStream.getTransformation
+      .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]]
+
+    val operator1 = transform1.getOperator
+
+    assertTrue(operator1.isInstanceOf[WindowOperator[_, _, _, _, _]])
+
+    val window2 = source
+      .keyBy(0)
+      .timeWindow(Time.minutes(1))
+      .apply(new WindowFunction[(String, Int), (String, Int), Tuple, TimeWindow]() {
+        def apply(
+                   key: Tuple,
+                   window: TimeWindow,
+                   values: Iterable[(String, Int)],
+                   out: Collector[(String, Int)]) { }
+      })
+
+    val transform2 = window2.javaStream.getTransformation
+      .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]]
+
+    val operator2 = transform2.getOperator
+
+    assertTrue(operator2.isInstanceOf[WindowOperator[_, _, _, _, _]])
+  }
+
+  /**
     * These tests ensure that the fast aligned time windows operator is used if the
     * conditions are right.
     */
   @Test
-  @Ignore
-  def testReduceFastTimeWindows(): Unit = {
+  def testReduceAlignedTimeWindows(): Unit = {
     val env = StreamExecutionEnvironment.getExecutionEnvironment
 
     val source = env.fromElements(("hello", 1), ("hello", 2))
     
     val window1 = source
       .keyBy(0)
-      .timeWindow(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS))
+      .window(SlidingAlignedProcessingTimeWindows.of(Time.seconds(1), Time.milliseconds(100)))
       .reduce(new DummyReducer())
 
     val transform1 = window1.javaStream.getTransformation
@@ -70,8 +113,7 @@ class TimeWindowTranslationTest extends StreamingMultipleProgramsTestBase
{
     * conditions are right.
     */
   @Test
-  @Ignore
-  def testApplyFastTimeWindows(): Unit = {
+  def testApplyAlignedTimeWindows(): Unit = {
     val env = StreamExecutionEnvironment.getExecutionEnvironment
     env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
 
@@ -79,7 +121,7 @@ class TimeWindowTranslationTest extends StreamingMultipleProgramsTestBase
{
 
     val window1 = source
       .keyBy(0)
-      .timeWindow(Time.minutes(1))
+      .window(TumblingAlignedProcessingTimeWindows.of(Time.minutes(1)))
       .apply(new WindowFunction[(String, Int), (String, Int), Tuple, TimeWindow]() {
         def apply(
                    key: Tuple,


Mime
View raw message