flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gyf...@apache.org
Subject [5/6] flink git commit: [streaming] Added cloneable versions of MultiEvictionPolicy and MultiTriggerPolicy
Date Thu, 19 Feb 2015 16:41:26 GMT
[streaming] Added cloneable versions of MultiEvictionPolicy and MultiTriggerPolicy


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/962db92a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/962db92a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/962db92a

Branch: refs/heads/master
Commit: 962db92afee9eb8e5efce46b5c3f9f57e017cb1d
Parents: 09490c4
Author: Jonas Traub (powibol) <jon@s-traub.com>
Authored: Thu Feb 19 14:54:15 2015 +0100
Committer: Gyula Fora <gyfora@apache.org>
Committed: Thu Feb 19 17:03:00 2015 +0100

----------------------------------------------------------------------
 .../policy/CloneableMultiEvictionPolicy.java    | 84 ++++++++++++++++++++
 .../policy/CloneableMultiTriggerPolicy.java     | 63 +++++++++++++++
 2 files changed, 147 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/962db92a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/CloneableMultiEvictionPolicy.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/CloneableMultiEvictionPolicy.java
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/CloneableMultiEvictionPolicy.java
new file mode 100644
index 0000000..d86b174
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/CloneableMultiEvictionPolicy.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.windowing.policy;
+
+import java.util.LinkedList;
+
+/**
+ * This policy does the same as {@link MultiEvictionPolicy}. Additionally it is
+ * cloneable and only cloneable policies can be passed to the constructor.
+ * 
+ * @param <DATA>
+ *            The type of data handled by this policy
+ */
+public class CloneableMultiEvictionPolicy<DATA> extends MultiEvictionPolicy<DATA>
implements
+		CloneableEvictionPolicy<DATA> {
+
+	/**
+	 * Default version id.
+	 */
+	private static final long serialVersionUID = 1L;
+
+	private CloneableEvictionPolicy<DATA>[] allPolicies;
+	private EvictionStrategy strategy;
+
+	/**
+	 * This policy does the same as {@link MultiTriggerPolicy}. Additionally it
+	 * is cloneable and only cloneable policies can be passed to the
+	 * constructor.
+	 * 
+	 * When using this constructor the MAX strategy is used by default. You can
+	 * select other strategies using
+	 * {@link CloneableMultiEvictionPolicy#CloneableMultiEvictionPolicy(org.apache.flink.streaming.api.windowing.policy.MultiEvictionPolicy.EvictionStrategy,
CloneableEvictionPolicy...)}
+	 * .
+	 * 
+	 * @param evictionPolicies
+	 *            some cloneable policies to be tied together.
+	 */
+	public CloneableMultiEvictionPolicy(CloneableEvictionPolicy<DATA>... evictionPolicies)
{
+		this(EvictionStrategy.MAX, evictionPolicies);
+	}
+
+	/**
+	 * This policy does the same as {@link MultiTriggerPolicy}. Additionally it
+	 * is cloneable and only cloneable policies can be passed to the
+	 * constructor.
+	 * 
+	 * @param strategy
+	 *            the strategy to be used. See {@link EvictionStrategy} for a
+	 *            list of possible options.
+	 * @param evictionPolicies
+	 *            some cloneable policies to be tied together.
+	 */
+	public CloneableMultiEvictionPolicy(EvictionStrategy strategy,
+			CloneableEvictionPolicy<DATA>... evictionPolicies) {
+		super(strategy, evictionPolicies);
+		this.allPolicies = evictionPolicies;
+		this.strategy = strategy;
+	}
+
+	@SuppressWarnings("unchecked")
+	public CloneableEvictionPolicy<DATA> clone() {
+		LinkedList<CloneableEvictionPolicy<DATA>> clonedPolicies = new LinkedList<CloneableEvictionPolicy<DATA>>();
+		for (int i = 0; i < allPolicies.length; i++) {
+			clonedPolicies.add(allPolicies[i].clone());
+		}
+		return new CloneableMultiEvictionPolicy<DATA>(strategy,
+				clonedPolicies.toArray(new CloneableEvictionPolicy[allPolicies.length]));
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/962db92a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/CloneableMultiTriggerPolicy.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/CloneableMultiTriggerPolicy.java
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/CloneableMultiTriggerPolicy.java
new file mode 100644
index 0000000..aaecefb
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/CloneableMultiTriggerPolicy.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.windowing.policy;
+
+import java.util.LinkedList;
+
+/**
+ * This policy does the same as {@link MultiTriggerPolicy}. Additionally it is
+ * cloneable and only cloneable policies can be passed to the constructor.
+ * 
+ * @param <DATA>
+ *            The type of data handled by this policy
+ */
+public class CloneableMultiTriggerPolicy<DATA> extends MultiTriggerPolicy<DATA>
implements
+		CloneableTriggerPolicy<DATA>, Cloneable {
+
+	/**
+	 * Default version id.
+	 */
+	private static final long serialVersionUID = 1L;
+
+	private CloneableTriggerPolicy<DATA>[] allPolicies;
+
+	/**
+	 * This policy does the same as {@link MultiTriggerPolicy}. Additionally it
+	 * is cloneable and only cloneable policies can be passed to the
+	 * constructor.
+	 * 
+	 * @param policies
+	 *            some cloneable policies to be tied together.
+	 */
+	public CloneableMultiTriggerPolicy(CloneableTriggerPolicy<DATA>... policies) {
+		super(policies);
+		this.allPolicies = policies;
+	}
+
+	@SuppressWarnings("unchecked")
+	public CloneableTriggerPolicy<DATA> clone() {
+		LinkedList<CloneableTriggerPolicy<DATA>> clonedPolicies = new LinkedList<CloneableTriggerPolicy<DATA>>();
+		for (int i = 0; i < allPolicies.length; i++) {
+			clonedPolicies.add(allPolicies[i].clone());
+		}
+		return new CloneableMultiTriggerPolicy<DATA>(
+				clonedPolicies.toArray(new CloneableTriggerPolicy[allPolicies.length]));
+
+	}
+
+}


Mime
View raw message