[FLINK-3209] Remove Unused ProcessingTime, EventTime and AbstractTime
Only keep Time for specifying time durations/intervals.
This closes #1512
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/cf75f424
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/cf75f424
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/cf75f424
Branch: refs/heads/master
Commit: cf75f424a2cc788cedbc0137f73d12e758a6c084
Parents: 4280e1f
Author: Aljoscha Krettek <aljoscha.krettek@gmail.com>
Authored: Tue Dec 8 11:17:08 2015 +0100
Committer: Fabian Hueske <fhueske@apache.org>
Committed: Mon Jan 25 18:28:11 2016 +0100
----------------------------------------------------------------------
.../streaming/api/datastream/DataStream.java | 6 +-
.../streaming/api/datastream/KeyedStream.java | 6 +-
.../windowing/assigners/SlidingTimeWindows.java | 4 +-
.../assigners/TumblingTimeWindows.java | 4 +-
.../api/windowing/evictors/TimeEvictor.java | 4 +-
.../api/windowing/time/AbstractTime.java | 98 --------------------
.../streaming/api/windowing/time/EventTime.java | 62 -------------
.../api/windowing/time/ProcessingTime.java | 63 -------------
.../streaming/api/windowing/time/Time.java | 53 ++++++++---
.../triggers/ContinuousEventTimeTrigger.java | 4 +-
.../ContinuousProcessingTimeTrigger.java | 4 +-
.../flink/streaming/api/scala/DataStream.scala | 6 +-
.../flink/streaming/api/scala/KeyedStream.scala | 6 +-
13 files changed, 60 insertions(+), 260 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/cf75f424/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
index 7ab3e2b..254af19 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
@@ -64,7 +64,7 @@ import org.apache.flink.streaming.api.windowing.assigners.SlidingTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
import org.apache.flink.streaming.api.windowing.evictors.CountEvictor;
-import org.apache.flink.streaming.api.windowing.time.AbstractTime;
+import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.triggers.CountTrigger;
import org.apache.flink.streaming.api.windowing.triggers.PurgingTrigger;
import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
@@ -640,7 +640,7 @@ public class DataStream<T> {
*
* @param size The size of the window.
*/
- public AllWindowedStream<T, TimeWindow> timeWindowAll(AbstractTime size) {
+ public AllWindowedStream<T, TimeWindow> timeWindowAll(Time size) {
return windowAll(TumblingTimeWindows.of(size));
}
@@ -660,7 +660,7 @@ public class DataStream<T> {
*
* @param size The size of the window.
*/
- public AllWindowedStream<T, TimeWindow> timeWindowAll(AbstractTime size, AbstractTime
slide) {
+ public AllWindowedStream<T, TimeWindow> timeWindowAll(Time size, Time slide) {
return windowAll(SlidingTimeWindows.of(size, slide));
}
http://git-wip-us.apache.org/repos/asf/flink/blob/cf75f424/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
index d4a3a77..9b567f8 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
@@ -37,7 +37,7 @@ import org.apache.flink.streaming.api.windowing.assigners.SlidingTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
import org.apache.flink.streaming.api.windowing.evictors.CountEvictor;
-import org.apache.flink.streaming.api.windowing.time.AbstractTime;
+import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.triggers.CountTrigger;
import org.apache.flink.streaming.api.windowing.triggers.PurgingTrigger;
import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
@@ -162,7 +162,7 @@ public class KeyedStream<T, KEY> extends DataStream<T> {
*
* @param size The size of the window.
*/
- public WindowedStream<T, KEY, TimeWindow> timeWindow(AbstractTime size) {
+ public WindowedStream<T, KEY, TimeWindow> timeWindow(Time size) {
return window(TumblingTimeWindows.of(size));
}
@@ -177,7 +177,7 @@ public class KeyedStream<T, KEY> extends DataStream<T> {
*
* @param size The size of the window.
*/
- public WindowedStream<T, KEY, TimeWindow> timeWindow(AbstractTime size, AbstractTime
slide) {
+ public WindowedStream<T, KEY, TimeWindow> timeWindow(Time size, Time slide) {
return window(SlidingTimeWindows.of(size, slide));
}
http://git-wip-us.apache.org/repos/asf/flink/blob/cf75f424/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingTimeWindows.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingTimeWindows.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingTimeWindows.java
index 4077452..d517f6a 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingTimeWindows.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingTimeWindows.java
@@ -21,7 +21,7 @@ import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.windowing.time.AbstractTime;
+import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger;
import org.apache.flink.streaming.api.windowing.triggers.Trigger;
import org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger;
@@ -98,7 +98,7 @@ public class SlidingTimeWindows extends WindowAssigner<Object, TimeWindow>
{
* @param slide The slide interval of the generated windows.
* @return The time policy.
*/
- public static SlidingTimeWindows of(AbstractTime size, AbstractTime slide) {
+ public static SlidingTimeWindows of(Time size, Time slide) {
return new SlidingTimeWindows(size.toMilliseconds(), slide.toMilliseconds());
}
http://git-wip-us.apache.org/repos/asf/flink/blob/cf75f424/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingTimeWindows.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingTimeWindows.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingTimeWindows.java
index 7e5a11f..0efc940 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingTimeWindows.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingTimeWindows.java
@@ -21,7 +21,7 @@ import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.windowing.time.AbstractTime;
+import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger;
import org.apache.flink.streaming.api.windowing.triggers.Trigger;
import org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger;
@@ -83,7 +83,7 @@ public class TumblingTimeWindows extends WindowAssigner<Object, TimeWindow>
{
* @param size The size of the generated windows.
* @return The time policy.
*/
- public static TumblingTimeWindows of(AbstractTime size) {
+ public static TumblingTimeWindows of(Time size) {
return new TumblingTimeWindows(size.toMilliseconds());
}
http://git-wip-us.apache.org/repos/asf/flink/blob/cf75f424/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/evictors/TimeEvictor.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/evictors/TimeEvictor.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/evictors/TimeEvictor.java
index 5776d8d..49d7786 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/evictors/TimeEvictor.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/evictors/TimeEvictor.java
@@ -19,7 +19,7 @@ package org.apache.flink.streaming.api.windowing.evictors;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Iterables;
-import org.apache.flink.streaming.api.windowing.time.AbstractTime;
+import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.Window;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
@@ -67,7 +67,7 @@ public class TimeEvictor<W extends Window> implements Evictor<Object,
W> {
*
* @param windowSize The amount of time for which to keep elements.
*/
- public static <W extends Window> TimeEvictor<W> of(AbstractTime windowSize)
{
+ public static <W extends Window> TimeEvictor<W> of(Time windowSize) {
return new TimeEvictor<>(windowSize.toMilliseconds());
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/cf75f424/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/time/AbstractTime.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/time/AbstractTime.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/time/AbstractTime.java
deleted file mode 100644
index 3f8fb60..0000000
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/time/AbstractTime.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.windowing.time;
-
-import org.apache.flink.streaming.api.TimeCharacteristic;
-
-import java.util.concurrent.TimeUnit;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-/**
- * Base class for {@link Time} implementations.
- */
-public abstract class AbstractTime {
-
- /** The time unit for this policy's time interval */
- private final TimeUnit unit;
-
- /** The size of the windows generated by this policy */
- private final long size;
-
-
- protected AbstractTime(long size, TimeUnit unit) {
- this.unit = checkNotNull(unit, "time unit may not be null");
- this.size = size;
- }
-
- // ------------------------------------------------------------------------
- // Properties
- // ------------------------------------------------------------------------
-
- /**
- * Gets the time unit for this policy's time interval.
- * @return The time unit for this policy's time interval.
- */
- public TimeUnit getUnit() {
- return unit;
- }
-
- /**
- * Gets the length of this policy's time interval.
- * @return The length of this policy's time interval.
- */
- public long getSize() {
- return size;
- }
-
- /**
- * Converts the time interval to milliseconds.
- * @return The time interval in milliseconds.
- */
- public long toMilliseconds() {
- return unit.toMillis(size);
- }
-
- // ------------------------------------------------------------------------
- // Utilities
- // ------------------------------------------------------------------------
-
- public abstract AbstractTime makeSpecificBasedOnTimeCharacteristic(TimeCharacteristic characteristic);
-
- @Override
- public int hashCode() {
- return 31 * (int) (size ^ (size >>> 32)) + unit.hashCode();
- }
-
- @Override
- public boolean equals(Object obj) {
- if (obj != null && obj.getClass() == getClass()) {
- AbstractTime that = (AbstractTime) obj;
- return this.size == that.size && this.unit.equals(that.unit);
- }
- else {
- return false;
- }
- }
-
- @Override
- public String toString() {
- return getClass().getSimpleName() + " (" + size + ' ' + unit.name() + ')';
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/cf75f424/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/time/EventTime.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/time/EventTime.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/time/EventTime.java
deleted file mode 100644
index 6a4349c..0000000
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/time/EventTime.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.windowing.time;
-
-import org.apache.flink.api.common.InvalidProgramException;
-import org.apache.flink.streaming.api.TimeCharacteristic;
-
-import java.util.concurrent.TimeUnit;
-
-/**
- * The definition of an event time interval for windowing. See
- * {@link org.apache.flink.streaming.api.TimeCharacteristic#EventTime} for a definition
- * of event time.
- */
-public final class EventTime extends AbstractTime {
-
- /** Instantiation only via factory method */
- private EventTime(long size, TimeUnit unit) {
- super(size, unit);
- }
-
- @Override
- public EventTime makeSpecificBasedOnTimeCharacteristic(TimeCharacteristic characteristic)
{
- if (characteristic == TimeCharacteristic.EventTime || characteristic == TimeCharacteristic.IngestionTime)
{
- return this;
- }
- else {
- throw new InvalidProgramException(
- "Cannot use EventTime policy in a dataflow that runs on " + characteristic);
- }
- }
- // ------------------------------------------------------------------------
- // Factory
- // ------------------------------------------------------------------------
-
- /**
- * Creates an event time policy describing an event time interval.
- *
- * @param size The size of the generated windows.
- * @param unit The init (seconds, milliseconds) of the time interval.
- * @return The event time policy.
- */
- public static EventTime of(long size, TimeUnit unit) {
- return new EventTime(size, unit);
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/cf75f424/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/time/ProcessingTime.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/time/ProcessingTime.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/time/ProcessingTime.java
deleted file mode 100644
index 4be6ed0..0000000
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/time/ProcessingTime.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.windowing.time;
-
-import org.apache.flink.api.common.InvalidProgramException;
-import org.apache.flink.streaming.api.TimeCharacteristic;
-
-import java.util.concurrent.TimeUnit;
-
-/**
- * The definition of a processing time interval for windowing. See
- * {@link org.apache.flink.streaming.api.TimeCharacteristic#ProcessingTime} for a definition
- * of processing time.
- */
-public final class ProcessingTime extends AbstractTime {
-
- /** Instantiation only via factory method */
- private ProcessingTime(long size, TimeUnit unit) {
- super(size, unit);
- }
-
- @Override
- public ProcessingTime makeSpecificBasedOnTimeCharacteristic(TimeCharacteristic characteristic)
{
- if (characteristic == TimeCharacteristic.ProcessingTime) {
- return this;
- }
- else {
- throw new InvalidProgramException(
- "Cannot use ProcessingTime policy in a dataflow that runs on " + characteristic);
- }
- }
-
- // ------------------------------------------------------------------------
- // Factory
- // ------------------------------------------------------------------------
-
- /**
- * Creates a processing time policy describing a processing time interval.
- *
- * @param size The size of the generated windows.
- * @param unit The init (seconds, milliseconds) of the time interval.
- * @return The processing time policy.
- */
- public static ProcessingTime of(long size, TimeUnit unit) {
- return new ProcessingTime(size, unit);
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/cf75f424/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/time/Time.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/time/Time.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/time/Time.java
index e0e9202..c30fdf4 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/time/Time.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/time/Time.java
@@ -18,32 +18,55 @@
package org.apache.flink.streaming.api.windowing.time;
-import org.apache.flink.streaming.api.TimeCharacteristic;
-
import java.util.concurrent.TimeUnit;
+import static com.google.common.base.Preconditions.checkNotNull;
+
/**
* The definition of a time interval for windowing. The time characteristic referred
* to is the default time characteristic set on the execution environment.
*/
-public final class Time extends AbstractTime {
+public final class Time {
+
+ /** The time unit for this policy's time interval */
+ private final TimeUnit unit;
+
+ /** The size of the windows generated by this policy */
+ private final long size;
/** Instantiation only via factory method */
private Time(long size, TimeUnit unit) {
- super(size, unit);
+ this.unit = checkNotNull(unit, "time unit may not be null");
+ this.size = size;
+
+ }
+
+ // ------------------------------------------------------------------------
+ // Properties
+ // ------------------------------------------------------------------------
+
+ /**
+ * Gets the time unit for this policy's time interval.
+ * @return The time unit for this policy's time interval.
+ */
+ public TimeUnit getUnit() {
+ return unit;
}
- @Override
- public AbstractTime makeSpecificBasedOnTimeCharacteristic(TimeCharacteristic timeCharacteristic)
{
- switch (timeCharacteristic) {
- case ProcessingTime:
- return ProcessingTime.of(getSize(), getUnit());
- case IngestionTime:
- case EventTime:
- return EventTime.of(getSize(), getUnit());
- default:
- throw new IllegalArgumentException("Unknown time characteristic");
- }
+ /**
+ * Gets the length of this policy's time interval.
+ * @return The length of this policy's time interval.
+ */
+ public long getSize() {
+ return size;
+ }
+
+ /**
+ * Converts the time interval to milliseconds.
+ * @return The time interval in milliseconds.
+ */
+ public long toMilliseconds() {
+ return unit.toMillis(size);
}
// ------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/cf75f424/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousEventTimeTrigger.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousEventTimeTrigger.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousEventTimeTrigger.java
index 4b6af8f..0454e85 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousEventTimeTrigger.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousEventTimeTrigger.java
@@ -19,7 +19,7 @@ package org.apache.flink.streaming.api.windowing.triggers;
import com.google.common.annotations.VisibleForTesting;
import org.apache.flink.api.common.state.OperatorState;
-import org.apache.flink.streaming.api.windowing.time.AbstractTime;
+import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.Window;
/**
@@ -83,7 +83,7 @@ public class ContinuousEventTimeTrigger<W extends Window> implements
Trigger<Obj
* @param interval The time interval at which to fire.
* @param <W> The type of {@link Window Windows} on which this trigger can operate.
*/
- public static <W extends Window> ContinuousEventTimeTrigger<W> of(AbstractTime
interval) {
+ public static <W extends Window> ContinuousEventTimeTrigger<W> of(Time interval)
{
return new ContinuousEventTimeTrigger<>(interval.toMilliseconds());
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/cf75f424/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousProcessingTimeTrigger.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousProcessingTimeTrigger.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousProcessingTimeTrigger.java
index 66f9bda..3576394 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousProcessingTimeTrigger.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousProcessingTimeTrigger.java
@@ -19,7 +19,7 @@ package org.apache.flink.streaming.api.windowing.triggers;
import com.google.common.annotations.VisibleForTesting;
import org.apache.flink.api.common.state.OperatorState;
-import org.apache.flink.streaming.api.windowing.time.AbstractTime;
+import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.Window;
/**
@@ -99,7 +99,7 @@ public class ContinuousProcessingTimeTrigger<W extends Window> implements
Trigge
* @param interval The time interval at which to fire.
* @param <W> The type of {@link Window Windows} on which this trigger can operate.
*/
- public static <W extends Window> ContinuousProcessingTimeTrigger<W> of(AbstractTime
interval) {
+ public static <W extends Window> ContinuousProcessingTimeTrigger<W> of(Time
interval) {
return new ContinuousProcessingTimeTrigger<>(interval.toMilliseconds());
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/cf75f424/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
index fcfbfe8..28edc2d 100644
--- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
+++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
@@ -31,7 +31,7 @@ import org.apache.flink.streaming.api.datastream.{AllWindowedStream =>
JavaAllWi
import org.apache.flink.streaming.api.functions.sink.SinkFunction
import org.apache.flink.streaming.api.functions.{AscendingTimestampExtractor, TimestampExtractor}
import org.apache.flink.streaming.api.windowing.assigners._
-import org.apache.flink.streaming.api.windowing.time.AbstractTime
+import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.{GlobalWindow, TimeWindow, Window}
import org.apache.flink.streaming.util.serialization.SerializationSchema
import org.apache.flink.util.Collector
@@ -532,7 +532,7 @@ class DataStream[T](javaStream: JavaStream[T]) {
*
* @param size The size of the window.
*/
- def timeWindowAll(size: AbstractTime): AllWindowedStream[T, TimeWindow] = {
+ def timeWindowAll(size: Time): AllWindowedStream[T, TimeWindow] = {
val assigner = TumblingTimeWindows.of(size).asInstanceOf[WindowAssigner[T, TimeWindow]]
windowAll(assigner)
}
@@ -551,7 +551,7 @@ class DataStream[T](javaStream: JavaStream[T]) {
*
* @param size The size of the window.
*/
- def timeWindowAll(size: AbstractTime, slide: AbstractTime): AllWindowedStream[T, TimeWindow]
= {
+ def timeWindowAll(size: Time, slide: Time): AllWindowedStream[T, TimeWindow] = {
val assigner = SlidingTimeWindows.of(size, slide).asInstanceOf[WindowAssigner[T, TimeWindow]]
windowAll(assigner)
}
http://git-wip-us.apache.org/repos/asf/flink/blob/cf75f424/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala
b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala
index 9f5c069..59c5693 100644
--- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala
+++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala
@@ -26,7 +26,7 @@ import org.apache.flink.streaming.api.functions.aggregation.{ComparableAggregato
import org.apache.flink.streaming.api.operators.StreamGroupedReduce
import org.apache.flink.streaming.api.scala.function.StatefulFunction
import org.apache.flink.streaming.api.windowing.assigners._
-import org.apache.flink.streaming.api.windowing.time.AbstractTime
+import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.{GlobalWindow, TimeWindow, Window}
import org.apache.flink.util.Collector
@@ -58,7 +58,7 @@ class KeyedStream[T, K](javaStream: KeyedJavaStream[T, K]) extends DataStream[T]
*
* @param size The size of the window.
*/
- def timeWindow(size: AbstractTime): WindowedStream[T, K, TimeWindow] = {
+ def timeWindow(size: Time): WindowedStream[T, K, TimeWindow] = {
val assigner = TumblingTimeWindows.of(size).asInstanceOf[WindowAssigner[T, TimeWindow]]
window(assigner)
}
@@ -92,7 +92,7 @@ class KeyedStream[T, K](javaStream: KeyedJavaStream[T, K]) extends DataStream[T]
*
* @param size The size of the window.
*/
- def timeWindow(size: AbstractTime, slide: AbstractTime): WindowedStream[T, K, TimeWindow]
= {
+ def timeWindow(size: Time, slide: Time): WindowedStream[T, K, TimeWindow] = {
val assigner = SlidingTimeWindows.of(size, slide).asInstanceOf[WindowAssigner[T, TimeWindow]]
window(assigner)
}
|