flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mbala...@apache.org
Subject [25/34] incubator-flink git commit: [streaming] Make all predefined policies cloneable and introduce an activating wrapper for cloneable eviction policies
Date Fri, 05 Dec 2014 17:26:30 GMT
[streaming] Make all predefined policies cloneable and introduce an activating wrapper for
cloneable eviction policies


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/70079c1d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/70079c1d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/70079c1d

Branch: refs/heads/master
Commit: 70079c1d49fb81336c8846e4058301bd1b6b6b8e
Parents: cd7bfd3
Author: Jonas Traub (powibol) <jon@s-traub.com>
Authored: Thu Nov 20 14:45:04 2014 +0100
Committer: mbalassi <mbalassi@apache.org>
Committed: Fri Dec 5 16:45:09 2014 +0100

----------------------------------------------------------------------
 .../ActiveCloneableEvictionPolicyWrapper.java   | 62 ++++++++++++++++++++
 .../windowing/policy/CountEvictionPolicy.java   |  6 +-
 .../windowing/policy/CountTriggerPolicy.java    |  6 +-
 .../api/windowing/policy/DeltaPolicy.java       |  7 ++-
 .../api/windowing/policy/PunctuationPolicy.java | 10 +++-
 .../windowing/policy/TimeEvictionPolicy.java    |  8 ++-
 .../api/windowing/policy/TimeTriggerPolicy.java | 12 +++-
 .../policy/TumblingEvictionPolicy.java          |  7 ++-
 8 files changed, 108 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/70079c1d/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/ActiveCloneableEvictionPolicyWrapper.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/ActiveCloneableEvictionPolicyWrapper.java
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/ActiveCloneableEvictionPolicyWrapper.java
new file mode 100644
index 0000000..29ba9eb
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/ActiveCloneableEvictionPolicyWrapper.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.windowing.policy;
+
+/**
+ * The {@link ActiveEvictionPolicy} wraps around a non active
+ * {@link EvictionPolicy}. It forwards all calls to
+ * {@link ActiveEvictionPolicy#notifyEvictionWithFakeElement(Object, int)} to
+ * {@link EvictionPolicy#notifyEviction(Object, boolean, int)} while the
+ * triggered parameter will be set to true.
+ * 
+ * This class additionally implements the clone method and can wrap around
+ * {@link CloneableEvictionPolicy} to make it active.
+ * 
+ * @param <DATA>
+ *            The data type handled by this policy
+ */
+public class ActiveCloneableEvictionPolicyWrapper<DATA> extends ActiveEvictionPolicyWrapper<DATA>
+		implements CloneableEvictionPolicy<DATA> {
+
+	/**
+	 * Auto generated version ID
+	 */
+	private static final long serialVersionUID = 1520261575300622769L;
+	CloneableEvictionPolicy<DATA> nestedPolicy;
+
+	/**
+	 * Creates a wrapper which activates the eviction policy which is wrapped
+	 * in. This means that the nested policy will get called on fake elements as
+	 * well as on real elements.
+	 * 
+	 * This specialized version of the {@link ActiveEvictionPolicyWrapper} works
+	 * with {@link CloneableEvictionPolicy} and is thereby cloneable as well.
+	 * 
+	 * @param nestedPolicy
+	 *            The policy which should be activated/wrapped in.
+	 */
+	public ActiveCloneableEvictionPolicyWrapper(CloneableEvictionPolicy<DATA> nestedPolicy)
{
+		super(nestedPolicy);
+		this.nestedPolicy = nestedPolicy;
+	}
+
+	@Override
+	public ActiveCloneableEvictionPolicyWrapper<DATA> clone() {
+		return new ActiveCloneableEvictionPolicyWrapper<DATA>(nestedPolicy.clone());
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/70079c1d/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/CountEvictionPolicy.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/CountEvictionPolicy.java
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/CountEvictionPolicy.java
index 1d8175c..a9e7b3f 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/CountEvictionPolicy.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/CountEvictionPolicy.java
@@ -27,7 +27,7 @@ package org.apache.flink.streaming.api.windowing.policy;
  * @param <IN>
  *            the type of the incoming data points
  */
-public class CountEvictionPolicy<IN> implements EvictionPolicy<IN> {
+public class CountEvictionPolicy<IN> implements CloneableEvictionPolicy<IN> {
 
 	/**
 	 * Auto generated version id
@@ -112,4 +112,8 @@ public class CountEvictionPolicy<IN> implements EvictionPolicy<IN>
{
 		}
 	}
 
+	@Override
+	public CountEvictionPolicy<IN> clone() {
+		return new CountEvictionPolicy<IN>(maxElements, deleteOnEviction, counter);
+	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/70079c1d/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/CountTriggerPolicy.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/CountTriggerPolicy.java
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/CountTriggerPolicy.java
index ca0058e..d5ae932 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/CountTriggerPolicy.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/CountTriggerPolicy.java
@@ -23,7 +23,7 @@ package org.apache.flink.streaming.api.windowing.policy;
  * @param <IN>
  *            The type of the data points which are handled by this policy
  */
-public class CountTriggerPolicy<IN> implements TriggerPolicy<IN> {
+public class CountTriggerPolicy<IN> implements CloneableTriggerPolicy<IN> {
 
 	/**
 	 * Auto generated version ID
@@ -80,4 +80,8 @@ public class CountTriggerPolicy<IN> implements TriggerPolicy<IN>
{
 		}
 	}
 
+	@Override
+	public CountTriggerPolicy<IN> clone() {
+		return new CountTriggerPolicy<IN>(max, counter);
+	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/70079c1d/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/DeltaPolicy.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/DeltaPolicy.java
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/DeltaPolicy.java
index 418f989..77bc692 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/DeltaPolicy.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/DeltaPolicy.java
@@ -39,7 +39,8 @@ import org.apache.flink.streaming.api.windowing.deltafunction.DeltaFunction;
  * @param <DATA>
  *            The type of the data points which are handled by this policy
  */
-public class DeltaPolicy<DATA> implements TriggerPolicy<DATA>, EvictionPolicy<DATA>
{
+public class DeltaPolicy<DATA> implements CloneableTriggerPolicy<DATA>,
+		CloneableEvictionPolicy<DATA> {
 
 	/**
 	 * Auto generated version ID
@@ -104,4 +105,8 @@ public class DeltaPolicy<DATA> implements TriggerPolicy<DATA>,
EvictionPolicy<DA
 		return evictCount;
 	}
 
+	@Override
+	public DeltaPolicy<DATA> clone() {
+		return new DeltaPolicy<DATA>(deltaFuntion, triggerDataPoint, threshold);
+	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/70079c1d/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/PunctuationPolicy.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/PunctuationPolicy.java
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/PunctuationPolicy.java
index 0ec9a57..ee8a3cb 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/PunctuationPolicy.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/PunctuationPolicy.java
@@ -28,7 +28,8 @@ import org.apache.flink.streaming.api.windowing.extractor.Extractor;
  * deleted in case the punctuation is detected.
  * 
  * By default this policy does not react on fake elements. Wrap it in an
- * {@link ActiveEvictionPolicyWrapper} to make it react on punctuation even in fake elements.
+ * {@link ActiveEvictionPolicyWrapper} to make it react on punctuation even in
+ * fake elements.
  * 
  * @param <IN>
  *            The type of the input data handled by this policy. An
@@ -37,7 +38,8 @@ import org.apache.flink.streaming.api.windowing.extractor.Extractor;
  *            The type of the punctuation. An {@link Extractor} can be used to
  *            extract DATA for IN.
  */
-public class PunctuationPolicy<IN, DATA> implements TriggerPolicy<IN>, EvictionPolicy<IN>
{
+public class PunctuationPolicy<IN, DATA> implements CloneableTriggerPolicy<IN>,
+		CloneableEvictionPolicy<IN> {
 
 	/**
 	 * auto generated version id
@@ -107,4 +109,8 @@ public class PunctuationPolicy<IN, DATA> implements TriggerPolicy<IN>,
EvictionP
 		}
 	}
 
+	@Override
+	public PunctuationPolicy<IN, DATA> clone() {
+		return new PunctuationPolicy<IN, DATA>(punctuation, extractor);
+	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/70079c1d/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/TimeEvictionPolicy.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/TimeEvictionPolicy.java
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/TimeEvictionPolicy.java
index a78cc4e..d9acc84 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/TimeEvictionPolicy.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/TimeEvictionPolicy.java
@@ -31,7 +31,8 @@ import org.apache.flink.streaming.api.invokable.util.TimeStamp;
  *            The type of the incoming data points which are processed by this
  *            policy.
  */
-public class TimeEvictionPolicy<DATA> implements ActiveEvictionPolicy<DATA> {
+public class TimeEvictionPolicy<DATA> implements ActiveEvictionPolicy<DATA>,
+		CloneableEvictionPolicy<DATA> {
 
 	/**
 	 * auto generated version id
@@ -106,4 +107,9 @@ public class TimeEvictionPolicy<DATA> implements ActiveEvictionPolicy<DATA>
{
 		return counter;
 	}
 
+	@Override
+	public TimeEvictionPolicy<DATA> clone() {
+		return new TimeEvictionPolicy<DATA>(granularity, timestamp);
+	}
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/70079c1d/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/TimeTriggerPolicy.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/TimeTriggerPolicy.java
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/TimeTriggerPolicy.java
index d50e285..ff93ac9 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/TimeTriggerPolicy.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/TimeTriggerPolicy.java
@@ -33,7 +33,8 @@ import org.apache.flink.streaming.api.windowing.extractor.Extractor;
  *            The type of the incoming data points which are processed by this
  *            policy.
  */
-public class TimeTriggerPolicy<DATA> implements ActiveTriggerPolicy<DATA> {
+public class TimeTriggerPolicy<DATA> implements ActiveTriggerPolicy<DATA>,
+		CloneableTriggerPolicy<DATA> {
 
 	/**
 	 * auto generated version id
@@ -115,8 +116,8 @@ public class TimeTriggerPolicy<DATA> implements ActiveTriggerPolicy<DATA>
{
 	}
 
 	/**
-	 * This method checks if we missed a window end. If this is the
-	 * case we trigger the missed windows using fake elements.
+	 * This method checks if we missed a window end. If this is the case we
+	 * trigger the missed windows using fake elements.
 	 */
 	@SuppressWarnings("unchecked")
 	@Override
@@ -190,4 +191,9 @@ public class TimeTriggerPolicy<DATA> implements ActiveTriggerPolicy<DATA>
{
 		}
 	}
 
+	@Override
+	public TimeTriggerPolicy<DATA> clone() {
+		return new TimeTriggerPolicy<DATA>(granularity, timestamp, 0, longToDATAExtractor);
+	}
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/70079c1d/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/TumblingEvictionPolicy.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/TumblingEvictionPolicy.java
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/TumblingEvictionPolicy.java
index dc7f5b0..9a2e3c9 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/TumblingEvictionPolicy.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/TumblingEvictionPolicy.java
@@ -29,7 +29,7 @@ package org.apache.flink.streaming.api.windowing.policy;
  * @param <DATA>
  *            The type of the data points which is handled by this policy
  */
-public class TumblingEvictionPolicy<DATA> implements EvictionPolicy<DATA> {
+public class TumblingEvictionPolicy<DATA> implements CloneableEvictionPolicy<DATA>
{
 
 	/**
 	 * Auto generated version ID
@@ -82,4 +82,9 @@ public class TumblingEvictionPolicy<DATA> implements EvictionPolicy<DATA>
{
 			return 0;
 		}
 	}
+
+	@Override
+	public TumblingEvictionPolicy<DATA> clone() {
+		return new TumblingEvictionPolicy<DATA>(counter);
+	}
 }


Mime
View raw message