flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From fhue...@apache.org
Subject [2/6] flink git commit: [FLINK-3209] Remove Unused ProcessingTime, EventTime and AbstractTime
Date Mon, 25 Jan 2016 17:35:27 GMT
[FLINK-3209] Remove Unused ProcessingTime, EventTime and AbstractTime

Only keep Time for specifying time durations/intervals.

This closes #1512


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/cf75f424
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/cf75f424
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/cf75f424

Branch: refs/heads/master
Commit: cf75f424a2cc788cedbc0137f73d12e758a6c084
Parents: 4280e1f
Author: Aljoscha Krettek <aljoscha.krettek@gmail.com>
Authored: Tue Dec 8 11:17:08 2015 +0100
Committer: Fabian Hueske <fhueske@apache.org>
Committed: Mon Jan 25 18:28:11 2016 +0100

----------------------------------------------------------------------
 .../streaming/api/datastream/DataStream.java    |  6 +-
 .../streaming/api/datastream/KeyedStream.java   |  6 +-
 .../windowing/assigners/SlidingTimeWindows.java |  4 +-
 .../assigners/TumblingTimeWindows.java          |  4 +-
 .../api/windowing/evictors/TimeEvictor.java     |  4 +-
 .../api/windowing/time/AbstractTime.java        | 98 --------------------
 .../streaming/api/windowing/time/EventTime.java | 62 -------------
 .../api/windowing/time/ProcessingTime.java      | 63 -------------
 .../streaming/api/windowing/time/Time.java      | 53 ++++++++---
 .../triggers/ContinuousEventTimeTrigger.java    |  4 +-
 .../ContinuousProcessingTimeTrigger.java        |  4 +-
 .../flink/streaming/api/scala/DataStream.scala  |  6 +-
 .../flink/streaming/api/scala/KeyedStream.scala |  6 +-
 13 files changed, 60 insertions(+), 260 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/cf75f424/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
index 7ab3e2b..254af19 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
@@ -64,7 +64,7 @@ import org.apache.flink.streaming.api.windowing.assigners.SlidingTimeWindows;
 import org.apache.flink.streaming.api.windowing.assigners.TumblingTimeWindows;
 import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
 import org.apache.flink.streaming.api.windowing.evictors.CountEvictor;
-import org.apache.flink.streaming.api.windowing.time.AbstractTime;
+import org.apache.flink.streaming.api.windowing.time.Time;
 import org.apache.flink.streaming.api.windowing.triggers.CountTrigger;
 import org.apache.flink.streaming.api.windowing.triggers.PurgingTrigger;
 import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
@@ -640,7 +640,7 @@ public class DataStream<T> {
 	 *
 	 * @param size The size of the window.
 	 */
-	public AllWindowedStream<T, TimeWindow> timeWindowAll(AbstractTime size) {
+	public AllWindowedStream<T, TimeWindow> timeWindowAll(Time size) {
 		return windowAll(TumblingTimeWindows.of(size));
 	}
 
@@ -660,7 +660,7 @@ public class DataStream<T> {
 	 *
 	 * @param size The size of the window.
 	 */
-	public AllWindowedStream<T, TimeWindow> timeWindowAll(AbstractTime size, AbstractTime
slide) {
+	public AllWindowedStream<T, TimeWindow> timeWindowAll(Time size, Time slide) {
 		return windowAll(SlidingTimeWindows.of(size, slide));
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/cf75f424/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
index d4a3a77..9b567f8 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
@@ -37,7 +37,7 @@ import org.apache.flink.streaming.api.windowing.assigners.SlidingTimeWindows;
 import org.apache.flink.streaming.api.windowing.assigners.TumblingTimeWindows;
 import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
 import org.apache.flink.streaming.api.windowing.evictors.CountEvictor;
-import org.apache.flink.streaming.api.windowing.time.AbstractTime;
+import org.apache.flink.streaming.api.windowing.time.Time;
 import org.apache.flink.streaming.api.windowing.triggers.CountTrigger;
 import org.apache.flink.streaming.api.windowing.triggers.PurgingTrigger;
 import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
@@ -162,7 +162,7 @@ public class KeyedStream<T, KEY> extends DataStream<T> {
 	 *
 	 * @param size The size of the window.
 	 */
-	public WindowedStream<T, KEY, TimeWindow> timeWindow(AbstractTime size) {
+	public WindowedStream<T, KEY, TimeWindow> timeWindow(Time size) {
 		return window(TumblingTimeWindows.of(size));
 	}
 
@@ -177,7 +177,7 @@ public class KeyedStream<T, KEY> extends DataStream<T> {
 	 *
 	 * @param size The size of the window.
 	 */
-	public WindowedStream<T, KEY, TimeWindow> timeWindow(AbstractTime size, AbstractTime
slide) {
+	public WindowedStream<T, KEY, TimeWindow> timeWindow(Time size, Time slide) {
 		return window(SlidingTimeWindows.of(size, slide));
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/cf75f424/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingTimeWindows.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingTimeWindows.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingTimeWindows.java
index 4077452..d517f6a 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingTimeWindows.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingTimeWindows.java
@@ -21,7 +21,7 @@ import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.windowing.time.AbstractTime;
+import org.apache.flink.streaming.api.windowing.time.Time;
 import org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger;
 import org.apache.flink.streaming.api.windowing.triggers.Trigger;
 import org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger;
@@ -98,7 +98,7 @@ public class SlidingTimeWindows extends WindowAssigner<Object, TimeWindow>
{
 	 * @param slide The slide interval of the generated windows.
 	 * @return The time policy.
 	 */
-	public static SlidingTimeWindows of(AbstractTime size, AbstractTime slide) {
+	public static SlidingTimeWindows of(Time size, Time slide) {
 		return new SlidingTimeWindows(size.toMilliseconds(), slide.toMilliseconds());
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/cf75f424/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingTimeWindows.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingTimeWindows.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingTimeWindows.java
index 7e5a11f..0efc940 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingTimeWindows.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingTimeWindows.java
@@ -21,7 +21,7 @@ import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.windowing.time.AbstractTime;
+import org.apache.flink.streaming.api.windowing.time.Time;
 import org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger;
 import org.apache.flink.streaming.api.windowing.triggers.Trigger;
 import org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger;
@@ -83,7 +83,7 @@ public class TumblingTimeWindows extends WindowAssigner<Object, TimeWindow>
{
 	 * @param size The size of the generated windows.
 	 * @return The time policy.
 	 */
-	public static TumblingTimeWindows of(AbstractTime size) {
+	public static TumblingTimeWindows of(Time size) {
 		return new TumblingTimeWindows(size.toMilliseconds());
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/cf75f424/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/evictors/TimeEvictor.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/evictors/TimeEvictor.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/evictors/TimeEvictor.java
index 5776d8d..49d7786 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/evictors/TimeEvictor.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/evictors/TimeEvictor.java
@@ -19,7 +19,7 @@ package org.apache.flink.streaming.api.windowing.evictors;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Iterables;
-import org.apache.flink.streaming.api.windowing.time.AbstractTime;
+import org.apache.flink.streaming.api.windowing.time.Time;
 import org.apache.flink.streaming.api.windowing.windows.Window;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 
@@ -67,7 +67,7 @@ public class TimeEvictor<W extends Window> implements Evictor<Object,
W> {
 	 *
 	 * @param windowSize The amount of time for which to keep elements.
 	 */
-	public static <W extends Window> TimeEvictor<W> of(AbstractTime windowSize)
{
+	public static <W extends Window> TimeEvictor<W> of(Time windowSize) {
 		return new TimeEvictor<>(windowSize.toMilliseconds());
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/cf75f424/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/time/AbstractTime.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/time/AbstractTime.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/time/AbstractTime.java
deleted file mode 100644
index 3f8fb60..0000000
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/time/AbstractTime.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.windowing.time;
-
-import org.apache.flink.streaming.api.TimeCharacteristic;
-
-import java.util.concurrent.TimeUnit;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-/**
- * Base class for {@link Time} implementations.
- */
-public abstract class AbstractTime {
-
-	/** The time unit for this policy's time interval */
-	private final TimeUnit unit;
-	
-	/** The size of the windows generated by this policy */
-	private final long size;
-
-
-	protected AbstractTime(long size, TimeUnit unit) {
-		this.unit = checkNotNull(unit, "time unit may not be null");
-		this.size = size;
-	}
-
-	// ------------------------------------------------------------------------
-	//  Properties
-	// ------------------------------------------------------------------------
-
-	/**
-	 * Gets the time unit for this policy's time interval.
-	 * @return The time unit for this policy's time interval.
-	 */
-	public TimeUnit getUnit() {
-		return unit;
-	}
-
-	/**
-	 * Gets the length of this policy's time interval.
-	 * @return The length of this policy's time interval.
-	 */
-	public long getSize() {
-		return size;
-	}
-
-	/**
-	 * Converts the time interval to milliseconds.
-	 * @return The time interval in milliseconds.
-	 */
-	public long toMilliseconds() {
-		return unit.toMillis(size);
-	}
-	
-	// ------------------------------------------------------------------------
-	//  Utilities
-	// ------------------------------------------------------------------------
-
-	public abstract AbstractTime makeSpecificBasedOnTimeCharacteristic(TimeCharacteristic characteristic);
-
-	@Override
-	public int hashCode() {
-		return 31 * (int) (size ^ (size >>> 32)) + unit.hashCode();
-	}
-
-	@Override
-	public boolean equals(Object obj) {
-		if (obj != null && obj.getClass() == getClass()) {
-			AbstractTime that = (AbstractTime) obj;
-			return this.size == that.size && this.unit.equals(that.unit);
-		}
-		else {
-			return false;
-		}
-	}
-
-	@Override
-	public String toString() {
-		return getClass().getSimpleName() + " (" + size + ' ' + unit.name() + ')';
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/cf75f424/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/time/EventTime.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/time/EventTime.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/time/EventTime.java
deleted file mode 100644
index 6a4349c..0000000
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/time/EventTime.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.windowing.time;
-
-import org.apache.flink.api.common.InvalidProgramException;
-import org.apache.flink.streaming.api.TimeCharacteristic;
-
-import java.util.concurrent.TimeUnit;
-
-/**
- * The definition of an event time interval for windowing. See
- * {@link org.apache.flink.streaming.api.TimeCharacteristic#EventTime} for a definition
- * of event time.
- */
-public final class EventTime extends AbstractTime {
-
-	/** Instantiation only via factory method */
-	private EventTime(long size, TimeUnit unit) {
-		super(size, unit);
-	}
-
-	@Override
-	public EventTime makeSpecificBasedOnTimeCharacteristic(TimeCharacteristic characteristic)
{
-		if (characteristic == TimeCharacteristic.EventTime || characteristic == TimeCharacteristic.IngestionTime)
{
-			return this;
-		}
-		else {
-			throw new InvalidProgramException(
-					"Cannot use EventTime policy in a dataflow that runs on " + characteristic);
-		}
-	}
-	// ------------------------------------------------------------------------
-	//  Factory
-	// ------------------------------------------------------------------------
-
-	/**
-	 * Creates an event time policy describing an event time interval.
-	 *
-	 * @param size The size of the generated windows.
-	 * @param unit The init (seconds, milliseconds) of the time interval.
-	 * @return The event time policy.
-	 */
-	public static EventTime of(long size, TimeUnit unit) {
-		return new EventTime(size, unit);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/cf75f424/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/time/ProcessingTime.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/time/ProcessingTime.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/time/ProcessingTime.java
deleted file mode 100644
index 4be6ed0..0000000
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/time/ProcessingTime.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.windowing.time;
-
-import org.apache.flink.api.common.InvalidProgramException;
-import org.apache.flink.streaming.api.TimeCharacteristic;
-
-import java.util.concurrent.TimeUnit;
-
-/**
- * The definition of a processing time interval for windowing. See
- * {@link org.apache.flink.streaming.api.TimeCharacteristic#ProcessingTime} for a definition
- * of processing time.
- */
-public final class ProcessingTime extends AbstractTime {
-
-	/** Instantiation only via factory method */
-	private ProcessingTime(long size, TimeUnit unit) {
-		super(size, unit);
-	}
-
-	@Override
-	public ProcessingTime makeSpecificBasedOnTimeCharacteristic(TimeCharacteristic characteristic)
{
-		if (characteristic == TimeCharacteristic.ProcessingTime) {
-			return this;
-		}
-		else {
-			throw new InvalidProgramException(
-					"Cannot use ProcessingTime policy in a dataflow that runs on " + characteristic);
-		}
-	}
-
-	// ------------------------------------------------------------------------
-	//  Factory
-	// ------------------------------------------------------------------------
-
-	/**
-	 * Creates a processing time policy describing a processing time interval.
-	 *
-	 * @param size The size of the generated windows.
-	 * @param unit The init (seconds, milliseconds) of the time interval.
-	 * @return The processing time policy.
-	 */
-	public static ProcessingTime of(long size, TimeUnit unit) {
-		return new ProcessingTime(size, unit);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/cf75f424/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/time/Time.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/time/Time.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/time/Time.java
index e0e9202..c30fdf4 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/time/Time.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/time/Time.java
@@ -18,32 +18,55 @@
 
 package org.apache.flink.streaming.api.windowing.time;
 
-import org.apache.flink.streaming.api.TimeCharacteristic;
-
 import java.util.concurrent.TimeUnit;
 
+import static com.google.common.base.Preconditions.checkNotNull;
+
 /**
  * The definition of a time interval for windowing. The time characteristic referred
  * to is the default time characteristic set on the execution environment.
  */
-public final class Time extends AbstractTime {
+public final class Time {
+
+	/** The time unit for this policy's time interval */
+	private final TimeUnit unit;
+
+	/** The size of the windows generated by this policy */
+	private final long size;
 
 	/** Instantiation only via factory method */
 	private Time(long size, TimeUnit unit) {
-		super(size, unit);
+		this.unit = checkNotNull(unit, "time unit may not be null");
+		this.size = size;
+
+	}
+
+	// ------------------------------------------------------------------------
+	//  Properties
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Gets the time unit for this policy's time interval.
+	 * @return The time unit for this policy's time interval.
+	 */
+	public TimeUnit getUnit() {
+		return unit;
 	}
 
-	@Override
-	public AbstractTime makeSpecificBasedOnTimeCharacteristic(TimeCharacteristic timeCharacteristic)
{
-		switch (timeCharacteristic) {
-			case ProcessingTime:
-				return ProcessingTime.of(getSize(), getUnit());
-			case IngestionTime:
-			case EventTime:
-				return EventTime.of(getSize(), getUnit());
-			default:
-				throw new IllegalArgumentException("Unknown time characteristic");
-		}
+	/**
+	 * Gets the length of this policy's time interval.
+	 * @return The length of this policy's time interval.
+	 */
+	public long getSize() {
+		return size;
+	}
+
+	/**
+	 * Converts the time interval to milliseconds.
+	 * @return The time interval in milliseconds.
+	 */
+	public long toMilliseconds() {
+		return unit.toMillis(size);
 	}
 
 	// ------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/cf75f424/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousEventTimeTrigger.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousEventTimeTrigger.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousEventTimeTrigger.java
index 4b6af8f..0454e85 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousEventTimeTrigger.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousEventTimeTrigger.java
@@ -19,7 +19,7 @@ package org.apache.flink.streaming.api.windowing.triggers;
 
 import com.google.common.annotations.VisibleForTesting;
 import org.apache.flink.api.common.state.OperatorState;
-import org.apache.flink.streaming.api.windowing.time.AbstractTime;
+import org.apache.flink.streaming.api.windowing.time.Time;
 import org.apache.flink.streaming.api.windowing.windows.Window;
 
 /**
@@ -83,7 +83,7 @@ public class ContinuousEventTimeTrigger<W extends Window> implements
Trigger<Obj
 	 * @param interval The time interval at which to fire.
 	 * @param <W> The type of {@link Window Windows} on which this trigger can operate.
 	 */
-	public static <W extends Window> ContinuousEventTimeTrigger<W> of(AbstractTime
interval) {
+	public static <W extends Window> ContinuousEventTimeTrigger<W> of(Time interval)
{
 		return new ContinuousEventTimeTrigger<>(interval.toMilliseconds());
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/cf75f424/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousProcessingTimeTrigger.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousProcessingTimeTrigger.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousProcessingTimeTrigger.java
index 66f9bda..3576394 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousProcessingTimeTrigger.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousProcessingTimeTrigger.java
@@ -19,7 +19,7 @@ package org.apache.flink.streaming.api.windowing.triggers;
 
 import com.google.common.annotations.VisibleForTesting;
 import org.apache.flink.api.common.state.OperatorState;
-import org.apache.flink.streaming.api.windowing.time.AbstractTime;
+import org.apache.flink.streaming.api.windowing.time.Time;
 import org.apache.flink.streaming.api.windowing.windows.Window;
 
 /**
@@ -99,7 +99,7 @@ public class ContinuousProcessingTimeTrigger<W extends Window> implements
Trigge
 	 * @param interval The time interval at which to fire.
 	 * @param <W> The type of {@link Window Windows} on which this trigger can operate.
 	 */
-	public static <W extends Window> ContinuousProcessingTimeTrigger<W> of(AbstractTime
interval) {
+	public static <W extends Window> ContinuousProcessingTimeTrigger<W> of(Time
interval) {
 		return new ContinuousProcessingTimeTrigger<>(interval.toMilliseconds());
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/cf75f424/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
index fcfbfe8..28edc2d 100644
--- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
+++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
@@ -31,7 +31,7 @@ import org.apache.flink.streaming.api.datastream.{AllWindowedStream =>
JavaAllWi
 import org.apache.flink.streaming.api.functions.sink.SinkFunction
 import org.apache.flink.streaming.api.functions.{AscendingTimestampExtractor, TimestampExtractor}
 import org.apache.flink.streaming.api.windowing.assigners._
-import org.apache.flink.streaming.api.windowing.time.AbstractTime
+import org.apache.flink.streaming.api.windowing.time.Time
 import org.apache.flink.streaming.api.windowing.windows.{GlobalWindow, TimeWindow, Window}
 import org.apache.flink.streaming.util.serialization.SerializationSchema
 import org.apache.flink.util.Collector
@@ -532,7 +532,7 @@ class DataStream[T](javaStream: JavaStream[T]) {
    *
    * @param size The size of the window.
    */
-  def timeWindowAll(size: AbstractTime): AllWindowedStream[T, TimeWindow] = {
+  def timeWindowAll(size: Time): AllWindowedStream[T, TimeWindow] = {
     val assigner = TumblingTimeWindows.of(size).asInstanceOf[WindowAssigner[T, TimeWindow]]
     windowAll(assigner)
   }
@@ -551,7 +551,7 @@ class DataStream[T](javaStream: JavaStream[T]) {
    *
    * @param size The size of the window.
    */
-  def timeWindowAll(size: AbstractTime, slide: AbstractTime): AllWindowedStream[T, TimeWindow]
= {
+  def timeWindowAll(size: Time, slide: Time): AllWindowedStream[T, TimeWindow] = {
     val assigner = SlidingTimeWindows.of(size, slide).asInstanceOf[WindowAssigner[T, TimeWindow]]
     windowAll(assigner)
   }

http://git-wip-us.apache.org/repos/asf/flink/blob/cf75f424/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala
b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala
index 9f5c069..59c5693 100644
--- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala
+++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala
@@ -26,7 +26,7 @@ import org.apache.flink.streaming.api.functions.aggregation.{ComparableAggregato
 import org.apache.flink.streaming.api.operators.StreamGroupedReduce
 import org.apache.flink.streaming.api.scala.function.StatefulFunction
 import org.apache.flink.streaming.api.windowing.assigners._
-import org.apache.flink.streaming.api.windowing.time.AbstractTime
+import org.apache.flink.streaming.api.windowing.time.Time
 import org.apache.flink.streaming.api.windowing.windows.{GlobalWindow, TimeWindow, Window}
 import org.apache.flink.util.Collector
 
@@ -58,7 +58,7 @@ class KeyedStream[T, K](javaStream: KeyedJavaStream[T, K]) extends DataStream[T]
    *
    * @param size The size of the window.
    */
-  def timeWindow(size: AbstractTime): WindowedStream[T, K, TimeWindow] = {
+  def timeWindow(size: Time): WindowedStream[T, K, TimeWindow] = {
     val assigner = TumblingTimeWindows.of(size).asInstanceOf[WindowAssigner[T, TimeWindow]]
     window(assigner)
   }
@@ -92,7 +92,7 @@ class KeyedStream[T, K](javaStream: KeyedJavaStream[T, K]) extends DataStream[T]
    *
    * @param size The size of the window.
    */
-  def timeWindow(size: AbstractTime, slide: AbstractTime): WindowedStream[T, K, TimeWindow]
= {
+  def timeWindow(size: Time, slide: Time): WindowedStream[T, K, TimeWindow] = {
     val assigner = SlidingTimeWindows.of(size, slide).asInstanceOf[WindowAssigner[T, TimeWindow]]
     window(assigner)
   }


Mime
View raw message