flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mbala...@apache.org
Subject [05/34] incubator-flink git commit: [streaming] Introduced a new invokable to allow flexible windowing based on trigger and eviction policies. Additionally created all required policy interfaces.
Date Fri, 05 Dec 2014 17:26:10 GMT
[streaming] Introduced a new invokable to allow flexible windowing based on trigger and eviction
policies. Additionally created all required policy interfaces.


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/36f1ee88
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/36f1ee88
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/36f1ee88

Branch: refs/heads/master
Commit: 36f1ee88cbadc21b185782026e6a6fc77d41b42a
Parents: a677c77
Author: Jonas Traub (powibol) <jon@s-traub.com>
Authored: Mon Oct 27 13:08:23 2014 +0100
Committer: mbalassi <mbalassi@apache.org>
Committed: Fri Dec 5 16:45:08 2014 +0100

----------------------------------------------------------------------
 .../invokable/operator/WindowingInvokable.java  | 284 +++++++++++++++++++
 .../windowing/policy/ActiveEvictionPolicy.java  |  52 ++++
 .../windowing/policy/ActiveTriggerCallback.java |  48 ++++
 .../windowing/policy/ActiveTriggerPolicy.java   |  81 ++++++
 .../api/windowing/policy/EvictionPolicy.java    |  47 +++
 .../api/windowing/policy/TriggerPolicy.java     |  54 ++++
 6 files changed, 566 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/36f1ee88/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowingInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowingInvokable.java
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowingInvokable.java
new file mode 100644
index 0000000..52b5edf
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowingInvokable.java
@@ -0,0 +1,284 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.invokable.operator;
+
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.invokable.StreamInvokable;
+import org.apache.flink.streaming.api.windowing.policy.ActiveEvictionPolicy;
+import org.apache.flink.streaming.api.windowing.policy.ActiveTriggerCallback;
+import org.apache.flink.streaming.api.windowing.policy.ActiveTriggerPolicy;
+import org.apache.flink.streaming.api.windowing.policy.EvictionPolicy;
+import org.apache.flink.streaming.api.windowing.policy.TriggerPolicy;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.NoSuchElementException;
+
+public class WindowingInvokable<IN> extends StreamInvokable<IN, Tuple2<IN, String[]>>
{
+
+	/**
+	 * Auto-generated serial version UID
+	 */
+	private static final long serialVersionUID = -8038984294071650730L;
+
+	private static final Logger LOG = LoggerFactory.getLogger(WindowingInvokable.class);
+
+	private LinkedList<TriggerPolicy<IN>> triggerPolicies;
+	private LinkedList<EvictionPolicy<IN>> evictionPolicies;
+	private LinkedList<ActiveTriggerPolicy<IN>> activeTriggerPolicies;
+	private LinkedList<ActiveEvictionPolicy<IN>> activeEvictionPolicies;
+	private LinkedList<Thread> activePolicyTreads = new LinkedList<Thread>();
+	private LinkedList<IN> buffer = new LinkedList<IN>();
+	private LinkedList<TriggerPolicy<IN>> currentTriggerPolicies = new LinkedList<TriggerPolicy<IN>>();
+	private ReduceFunction<IN> reducer;
+
+	public WindowingInvokable(ReduceFunction<IN> userFunction,
+			LinkedList<TriggerPolicy<IN>> triggerPolicies,
+			LinkedList<EvictionPolicy<IN>> evictionPolicies) {
+		super(userFunction);
+
+		this.reducer = userFunction;
+		this.triggerPolicies = triggerPolicies;
+		this.evictionPolicies = evictionPolicies;
+
+		activeTriggerPolicies = new LinkedList<ActiveTriggerPolicy<IN>>();
+		for (TriggerPolicy<IN> tp : triggerPolicies) {
+			if (tp instanceof ActiveTriggerPolicy) {
+				activeTriggerPolicies.add((ActiveTriggerPolicy<IN>) tp);
+			}
+		}
+
+		activeEvictionPolicies = new LinkedList<ActiveEvictionPolicy<IN>>();
+		for (EvictionPolicy<IN> ep : evictionPolicies) {
+			if (ep instanceof ActiveEvictionPolicy) {
+				activeEvictionPolicies.add((ActiveEvictionPolicy<IN>) ep);
+			}
+		}
+	}
+
+	@Override
+	public void open(org.apache.flink.configuration.Configuration parameters) throws Exception
{
+		super.open(parameters);
+		for (ActiveTriggerPolicy<IN> tp : activeTriggerPolicies) {
+			Runnable target=tp.createActiveTriggerRunnable(new WindowingCallback(tp));
+			if (target!=null){
+				Thread thread = new Thread(target);
+				activePolicyTreads.add(thread);
+				thread.start();
+			}
+		}
+	};
+
+	private class WindowingCallback implements ActiveTriggerCallback<IN> {
+		private ActiveTriggerPolicy<IN> policy;
+
+		public WindowingCallback(ActiveTriggerPolicy<IN> policy) {
+			this.policy = policy;
+		}
+
+		@Override
+		public void sendFakeElement(IN datapoint) {
+			processFakeElement(datapoint, this.policy);
+		}
+
+	}
+
+	@Override
+	protected void immutableInvoke() throws Exception {
+		// Prevent empty data streams
+		if ((reuse = recordIterator.next(reuse)) == null) {
+			throw new RuntimeException("DataStream must not be empty");
+		}
+
+		// Continuously run
+		while (reuse != null) {
+			processRealElement(reuse.getObject());
+
+			// Recreate the reuse-StremRecord object and load next StreamRecord
+			resetReuse();
+			reuse = recordIterator.next(reuse);
+		}
+
+		// Stop all remaining threads from policies
+		for (Thread t : activePolicyTreads) {
+			t.interrupt();
+		}
+
+		// finally trigger the buffer.
+		if (!buffer.isEmpty()) {
+			currentTriggerPolicies.clear();
+			for (TriggerPolicy<IN> policy : triggerPolicies) {
+				currentTriggerPolicies.add(policy);
+			}
+			callUserFunctionAndLogException();
+		}
+
+	}
+
+	@Override
+	protected void mutableInvoke() throws Exception {
+		if (LOG.isInfoEnabled()) {
+			LOG.info("There is currently no mutable implementation of this operator. Immutable version
is used.");
+		}
+		immutableInvoke();
+	}
+
+	/**
+	 * This method processed an arrived fake element The method is synchronized
+	 * to ensure that it cannot interleave with
+	 * {@link WindowingInvokable#processRealElement(Object)}
+	 * 
+	 * @param input
+	 *            a fake input element
+	 * @param currentPolicy
+	 *            the policy which produced this fake element
+	 */
+	private synchronized void processFakeElement(IN input, TriggerPolicy<IN> currentPolicy)
{
+		// Process the evictions and take care of double evictions
+		// In case there are multiple eviction policies present,
+		// only the one with the highest return value is recognized.
+		int currentMaxEviction = 0;
+		for (ActiveEvictionPolicy<IN> evictionPolicy : activeEvictionPolicies) {
+			// use temporary variable to prevent multiple calls to
+			// notifyEviction
+			int tmp = evictionPolicy.notifyEvictionWithFakeElement(input, buffer.size());
+			if (tmp > currentMaxEviction) {
+				currentMaxEviction = tmp;
+			}
+		}
+
+		for (int i = 0; i < currentMaxEviction; i++) {
+			try {
+				buffer.removeFirst();
+			} catch (NoSuchElementException e) {
+				// In case no more elements are in the buffer:
+				// Prevent failure and stop deleting.
+				break;
+			}
+		}
+
+		// Set the current trigger
+		currentTriggerPolicies.add(currentPolicy);
+
+		// emit
+		callUserFunctionAndLogException();
+
+		// clear the flag collection
+		currentTriggerPolicies.clear();
+	}
+
+	/**
+	 * This method processed an arrived real element The method is synchronized
+	 * to ensure that it cannot interleave with
+	 * {@link WindowingInvokable#processFakeElement(Object)}
+	 * 
+	 * @param input
+	 *            a real input element
+	 */
+	private synchronized void processRealElement(IN input) {
+		// Run the precalls to detect missed windows
+		for (ActiveTriggerPolicy<IN> trigger : activeTriggerPolicies) {
+			// Remark: In case multiple active triggers are present the ordering
+			// of the different fake elements returned by this triggers becomes
+			// a problem. This might lead to unexpected results...
+			// Should we limit the number of active triggers to 0 or 1?
+			IN[] result = trigger.preNotifyTrigger(input);
+			for (IN in : result) {
+				processFakeElement(in, trigger);
+			}
+		}
+
+		// Remember if a trigger occurred
+		boolean isTriggered = false;
+
+		// Process the triggers
+		for (TriggerPolicy<IN> triggerPolicy : triggerPolicies) {
+			if (triggerPolicy.notifyTrigger(input)) {
+				currentTriggerPolicies.add(triggerPolicy);
+			}
+		}
+
+		// call user function
+		if (!currentTriggerPolicies.isEmpty()) {
+			// emit
+			callUserFunctionAndLogException();
+
+			// clear the flag collection
+			currentTriggerPolicies.clear();
+
+			// remember trigger
+			isTriggered = true;
+		}
+
+		// Process the evictions and take care of double evictions
+		// In case there are multiple eviction policies present,
+		// only the one with the highest return value is recognized.
+		int currentMaxEviction = 0;
+
+		for (EvictionPolicy<IN> evictionPolicy : evictionPolicies) {
+			// use temporary variable to prevent multiple calls to
+			// notifyEviction
+			int tmp = evictionPolicy.notifyEviction(input, isTriggered, buffer.size());
+			if (tmp > currentMaxEviction) {
+				currentMaxEviction = tmp;
+			}
+		}
+
+		for (int i = 0; i < currentMaxEviction; i++) {
+			try {
+				buffer.removeFirst();
+			} catch (NoSuchElementException e) {
+				// In case no more elements are in the buffer:
+				// Prevent failure and stop deleting.
+				break;
+			}
+		}
+
+		// Add the current element to the buffer
+		buffer.add(input);
+
+	}
+
+	@Override
+	protected void callUserFunction() throws Exception {
+		Iterator<IN> reducedIterator = buffer.iterator();
+		IN reduced = null;
+
+		while (reducedIterator.hasNext() && reduced == null) {
+			reduced = reducedIterator.next();
+		}
+
+		while (reducedIterator.hasNext()) {
+			IN next = reducedIterator.next();
+			if (next != null) {
+				reduced = reducer.reduce(reduced, next);
+			}
+		}
+		if (reduced != null) {
+			String[] tmp = new String[currentTriggerPolicies.size()];
+			for (int i = 0; i < tmp.length; i++) {
+				tmp[i] = currentTriggerPolicies.get(i).toString();
+			}
+			collector.collect(new Tuple2<IN, String[]>(reduced, tmp));
+		}
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/36f1ee88/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/ActiveEvictionPolicy.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/ActiveEvictionPolicy.java
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/ActiveEvictionPolicy.java
new file mode 100644
index 0000000..17d4914
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/ActiveEvictionPolicy.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.windowing.policy;
+
+/**
+ * This interface is used for active eviction policies. beside the functionality
+ * inherited from {@link EvictionPolicy} it provides a method which gets called
+ * to notify on fake elements.
+ * 
+ * In case an eviction policy implements this interface instead of the
+ * {@link EvictionPolicy} interface, not only the real but also fake data points
+ * will cause a notification of the eviction.
+ * 
+ * Fake data points are mostly used in windowing based on time to trigger and
+ * evict even if no element arrives at all during a windows duration.
+ */
+public interface ActiveEvictionPolicy<DATA> extends EvictionPolicy<DATA> {
+
+	/**
+	 * Proves if and how many elements should be deleted from the element
+	 * buffer. The eviction takes place after the trigger and after the call to
+	 * the UDF. This method is only called with fake elements.
+	 * 
+	 * Note: Fake elements are always considered as triggered. Therefore this
+	 * method does not have a triggered parameter.
+	 * 
+	 * @param datapoint
+	 *            the current fake data point
+	 * @param bufferSize
+	 *            the current size of the buffer (only real elements are
+	 *            counted)
+	 * @return the number of elements to delete from the buffer (only real
+	 *         elements are counted)
+	 */
+	public int notifyEvictionWithFakeElement(DATA datapoint, int bufferSize);
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/36f1ee88/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/ActiveTriggerCallback.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/ActiveTriggerCallback.java
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/ActiveTriggerCallback.java
new file mode 100644
index 0000000..d74a3ae
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/ActiveTriggerCallback.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.windowing.policy;
+
+/**
+ * In case an {@link ActiveTriggerPolicy} is used, it can implement own
+ * {@link Runnable} classes. Such {@link Runnable} classes will be executed as
+ * an own thread and can submit fake elements, to the element
+ * buffer at any time.
+ * 
+ * The factory method for runnables of the {@link ActiveTriggerPolicy} gets an
+ * instance of this interface as parameter. The describes adding of elements can
+ * be done by the runnable using the methods provided in this interface.
+ * 
+ * @param <DATA>
+ *            The data type which can be consumed by the methods provided in
+ *            this callback implementation.
+ */
+public interface ActiveTriggerCallback<DATA> {
+
+	/**
+	 * Submits a new fake data point to the element buffer. Such a fake element
+	 * might be used to trigger at any time, but will never be included in the
+	 * result of the reduce function. The submission of a fake element causes
+	 * notifications only at the {@link ActiveTriggerPolicy} and
+	 * {@link ActiveEvictionPolicy} implementations.
+	 * 
+	 * @param datapoint
+	 *            the fake data point to be added
+	 */
+	public void sendFakeElement(DATA datapoint);
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/36f1ee88/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/ActiveTriggerPolicy.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/ActiveTriggerPolicy.java
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/ActiveTriggerPolicy.java
new file mode 100644
index 0000000..f549766
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/ActiveTriggerPolicy.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.windowing.policy;
+
+import org.apache.flink.streaming.api.invokable.util.TimeStamp;
+
+/**
+ * This interface extends the {@link TriggerPolicy} interface with functionality
+ * for active triggers. Active triggers can act in two ways:
+ * 
+ * 1) Whenever an element arrives at the invokable, the
+ * {@link ActiveTriggerPolicy#preNotifyTrigger(Object)} method gets called first.
+ * It can return zero ore more fake data points which will be added before the
+ * the currently arrived real element gets processed. This allows to handle
+ * empty windows in time based windowing with an user defined {@link TimeStamp}.
+ * Triggers are not called on fake datapoint. A fake datapoint is always
+ * considered as triggered.
+ * 
+ * 2) An active trigger has a factory method for a runnable. This factory method
+ * gets called at the start up of the invokable. The returned runnable will be
+ * executed in its own thread and can submit fake elements at any time threw an
+ * {@link ActiveTriggerCallback}. This allows to have time based triggers based
+ * on any system internal time measure. Triggers are not called on fake
+ * datapoint. A fake datapoints is always considered as triggered.
+ * 
+ * @param <DATA>
+ *            The data type which can be handled by this policy
+ */
+public interface ActiveTriggerPolicy<DATA> extends TriggerPolicy<DATA> {
+
+	/**
+	 * Whenever an element arrives at the invokable, the
+	 * {@link ActiveTriggerPolicy#preNotifyTrigger(Object)} method gets called
+	 * first. It can return zero ore more fake data points which will be added
+	 * before the the currently arrived real element gets processed. This allows
+	 * to handle empty windows in time based windowing with an user defined
+	 * {@link TimeStamp}. Triggers are not called on fake datapoints. A fake
+	 * datapoint is always considered as triggered.
+	 * 
+	 * @param datapoint
+	 *            the data point which arrived at the invokable
+	 * @return zero ore more fake data points which will be added before the the
+	 *         currently arrived real element gets processed.
+	 */
+	public DATA[] preNotifyTrigger(DATA datapoint);
+
+	/**
+	 * This is the factory method for a runnable. This factory method gets
+	 * called at the start up of the invokable. The returned runnable will be
+	 * executed in its own thread and can submit fake elements at any time threw
+	 * an {@link ActiveTriggerCallback}. This allows to have time based triggers
+	 * based on any system internal time measure. Triggers are not called on
+	 * fake datapoints. A fake datapoint is always considered as triggered.
+	 * 
+	 * @param callback
+	 *            A callback object which allows to add fake elements from
+	 *            within the returned {@link Runnable}.
+	 * @return The runnable implementation or null in case there is no. In case
+	 *         an {@link ActiveTriggerPolicy} is used, it can implement own
+	 *         {@link Runnable} classes. Such {@link Runnable} classes will be
+	 *         executed as an own thread and can submit fake elements, to the
+	 *         element buffer at any time.
+	 */
+	public Runnable createActiveTriggerRunnable(ActiveTriggerCallback<DATA> callback);
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/36f1ee88/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/EvictionPolicy.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/EvictionPolicy.java
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/EvictionPolicy.java
new file mode 100644
index 0000000..c224ad4
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/EvictionPolicy.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.windowing.policy;
+
+import java.io.Serializable;
+
+/**
+ * An eviction policy specifies under which condition data points should be
+ * deleted from the buffer. Deletions must be done only in the order the
+ * elements arrived. Therefore, the policy only returns the number of elements
+ * to evict on each element arrival.
+ * 
+ * @param <DATA>
+ *            the type of the data handled by this policy
+ */
+public interface EvictionPolicy<DATA> extends Serializable {
+
+	/**
+	 * Proves if and how many elements should be deleted from the element
+	 * buffer. The eviction takes place after the trigger and after the call to
+	 * the UDF but before the adding of the new data point.
+	 *
+	 * @param datapoint
+	 *            data point the data point which arrived
+	 * @param triggered
+	 *            Information whether the UDF was triggered or not
+	 * @param bufferSize
+	 *            The current size of the element buffer at the invokable
+	 * @return The number of elements to be deleted from the buffer
+	 */
+	public int notifyEviction(DATA datapoint, boolean triggered, int bufferSize);
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/36f1ee88/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/TriggerPolicy.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/TriggerPolicy.java
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/TriggerPolicy.java
new file mode 100644
index 0000000..c212df6
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/TriggerPolicy.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.windowing.policy;
+
+import java.io.Serializable;
+
+/**
+ * Proves and returns if a new window should be started. In case the trigger
+ * occurs (return value true) the UDF will be executed on the current element
+ * buffer without the last added element which is provided as parameter. This
+ * element will be added to the buffer after the execution of the UDF.
+ * 
+ * @param <DATA>
+ *            The data type which can be handled by this policy
+ */
+public interface TriggerPolicy<DATA> extends Serializable {
+
+	/**
+	 * Proves and returns if a new window should be started. In case the trigger
+	 * occurs (return value true) the UDF will be executed on the current
+	 * element buffer without the last added element which is provided as
+	 * parameter. This element will be added to the buffer after the execution
+	 * of the UDF.
+	 * 
+	 * There are possibly different strategies for eviction and triggering: 1)
+	 * including last data point: Better/faster for count eviction 2) excluding
+	 * last data point: Essentially required for time based eviction and delta
+	 * rules As 2) is required for some policies and the benefit of using 1) is
+	 * small for the others, policies are implemented according to 2).
+	 *
+	 * @param datapoint
+	 *            the data point which arrived
+	 * @return true if the current windows should be closed, otherwise false. In
+	 *         true case the given data point will be part of the next window
+	 *         and will not be included in the current one.
+	 */
+	public boolean notifyTrigger(DATA datapoint);
+
+}


Mime
View raw message