flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mbala...@apache.org
Subject [02/34] incubator-flink git commit: [streaming] Created tumbling eviction policy. This policy evicts always all tuples after a trigger occurred. It can be used to prevent doubled computation between trigger and eviction policies whenever tumbling windows
Date Fri, 05 Dec 2014 17:26:07 GMT
[streaming] Created tumbling eviction policy. This policy evicts always all tuples after a
trigger occurred. It can be used to prevent doubled computation between trigger and eviction
policies whenever tumbling windows are used. (includes test case)


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/fb4a8396
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/fb4a8396
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/fb4a8396

Branch: refs/heads/master
Commit: fb4a8396b113ee2e6b2543614f6210c44e0587d9
Parents: a6cc8af
Author: Jonas Traub (powibol) <jon@s-traub.com>
Authored: Mon Oct 27 13:49:16 2014 +0100
Committer: mbalassi <mbalassi@apache.org>
Committed: Fri Dec 5 16:45:08 2014 +0100

----------------------------------------------------------------------
 .../policy/TumblingEvictionPolicy.java          | 85 ++++++++++++++++++++
 .../policy/TumblingEvictionPolicyTest.java      | 41 ++++++++++
 2 files changed, 126 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/fb4a8396/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/TumblingEvictionPolicy.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/TumblingEvictionPolicy.java
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/TumblingEvictionPolicy.java
new file mode 100644
index 0000000..dc7f5b0
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/TumblingEvictionPolicy.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.windowing.policy;
+
+/**
+ * This eviction policy deletes all elements from the buffer in case a trigger
+ * occurred. Therefore, it is the default eviction policy to be used for any
+ * tumbling window.
+ * 
+ * By default this policy does not react on fake elements. Wrap it in an
+ * {@link ActiveEvictionPolicyWrapper} to make it clearing the buffer even on
+ * fake elements.
+ * 
+ * @param <DATA>
+ *            The type of the data points which is handled by this policy
+ */
+public class TumblingEvictionPolicy<DATA> implements EvictionPolicy<DATA> {
+
+	/**
+	 * Auto generated version ID
+	 */
+	private static final long serialVersionUID = -4018019069267281155L;
+
+	/**
+	 * Counter for the current number of elements in the buffer
+	 */
+	private int counter = 0;
+
+	/**
+	 * This is the default constructor providing no special functionality. This
+	 * eviction policy deletes all elements from the buffer in case a trigger
+	 * occurred. Therefore, it is the default eviction policy to be used for any
+	 * tumbling window.
+	 */
+	public TumblingEvictionPolicy() {
+		// default constructor, no further logic needed
+	}
+
+	/**
+	 * This constructor allows to set a custom start value for the element
+	 * counter.
+	 * 
+	 * This eviction policy deletes all elements from the buffer in case a
+	 * trigger occurred. Therefore, it is the default eviction policy to be used
+	 * for any tumbling window.
+	 * 
+	 * @param startValue
+	 *            A start value for the element counter
+	 */
+	public TumblingEvictionPolicy(int startValue) {
+		this.counter = startValue;
+	}
+
+	/**
+	 * Deletes all elements from the buffer in case the trigger occurred.
+	 */
+	@Override
+	public int notifyEviction(Object datapoint, boolean triggered, int bufferSize) {
+		if (triggered) {
+			// The current data point will be part of the next window!
+			// Therefore the counter needs to be set to one already.
+			int tmpCounter = counter;
+			counter = 1;
+			return tmpCounter;
+		} else {
+			counter++;
+			return 0;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/fb4a8396/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/TumblingEvictionPolicyTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/TumblingEvictionPolicyTest.java
b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/TumblingEvictionPolicyTest.java
new file mode 100644
index 0000000..8cb03a7
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/TumblingEvictionPolicyTest.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.windowing.policy;
+
+import static org.junit.Assert.*;
+
+import org.junit.Test;
+
+public class TumblingEvictionPolicyTest {
+
+	@Test
+	public void testTumblingEviction() {
+		EvictionPolicy<Integer> policy = new TumblingEvictionPolicy<Integer>();
+
+		int counter = 0;
+
+		for (int i = 0; i < 10; i++) {
+			for (int j = 0; j < i; j++) {
+				assertEquals(0, policy.notifyEviction(0, false, counter++));
+			}
+			assertEquals(counter, policy.notifyEviction(0, true, counter));
+			counter = 1;
+		}
+	}
+
+}


Mime
View raw message