flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mbala...@apache.org
Subject [17/34] incubator-flink git commit: [streaming] Windowing API update + package refactor
Date Fri, 05 Dec 2014 17:26:22 GMT
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2d73c3f1/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowInvokable.java
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowInvokable.java
new file mode 100644
index 0000000..953759c
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowInvokable.java
@@ -0,0 +1,368 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.invokable.operator;
+
+import java.util.LinkedList;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.streaming.api.invokable.StreamInvokable;
+import org.apache.flink.streaming.api.windowing.policy.ActiveEvictionPolicy;
+import org.apache.flink.streaming.api.windowing.policy.ActiveTriggerCallback;
+import org.apache.flink.streaming.api.windowing.policy.ActiveTriggerPolicy;
+import org.apache.flink.streaming.api.windowing.policy.EvictionPolicy;
+import org.apache.flink.streaming.api.windowing.policy.TriggerPolicy;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public abstract class WindowInvokable<IN, OUT> extends StreamInvokable<IN, OUT>
{
+
+	/**
+	 * Auto-generated serial version UID
+	 */
+	private static final long serialVersionUID = -8038984294071650730L;
+
+	private static final Logger LOG = LoggerFactory.getLogger(WindowInvokable.class);
+
+	private LinkedList<TriggerPolicy<IN>> triggerPolicies;
+	private LinkedList<EvictionPolicy<IN>> evictionPolicies;
+	private LinkedList<ActiveTriggerPolicy<IN>> activeTriggerPolicies;
+	private LinkedList<ActiveEvictionPolicy<IN>> activeEvictionPolicies;
+	private LinkedList<Thread> activePolicyTreads = new LinkedList<Thread>();
+	protected LinkedList<IN> buffer = new LinkedList<IN>();
+	private LinkedList<TriggerPolicy<IN>> currentTriggerPolicies = new LinkedList<TriggerPolicy<IN>>();
+
+	/**
+	 * This constructor created a windowing invokable using trigger and eviction
+	 * policies.
+	 * 
+	 * @param userFunction
+	 *            The user defined {@link ReduceFunction}
+	 * @param triggerPolicies
+	 *            A list of {@link TriggerPolicy}s and/or
+	 *            {@link ActiveTriggerPolicy}s
+	 * @param evictionPolicies
+	 *            A list of {@link EvictionPolicy}s and/or
+	 *            {@link ActiveEvictionPolicy}s
+	 */
+	public WindowInvokable(Function userFunction, LinkedList<TriggerPolicy<IN>>
triggerPolicies,
+			LinkedList<EvictionPolicy<IN>> evictionPolicies) {
+		super(userFunction);
+
+		this.triggerPolicies = triggerPolicies;
+		this.evictionPolicies = evictionPolicies;
+
+		activeTriggerPolicies = new LinkedList<ActiveTriggerPolicy<IN>>();
+		for (TriggerPolicy<IN> tp : triggerPolicies) {
+			if (tp instanceof ActiveTriggerPolicy) {
+				activeTriggerPolicies.add((ActiveTriggerPolicy<IN>) tp);
+			}
+		}
+
+		activeEvictionPolicies = new LinkedList<ActiveEvictionPolicy<IN>>();
+		for (EvictionPolicy<IN> ep : evictionPolicies) {
+			if (ep instanceof ActiveEvictionPolicy) {
+				activeEvictionPolicies.add((ActiveEvictionPolicy<IN>) ep);
+			}
+		}
+	}
+
+	@Override
+	public void open(org.apache.flink.configuration.Configuration parameters) throws Exception
{
+		super.open(parameters);
+		for (ActiveTriggerPolicy<IN> tp : activeTriggerPolicies) {
+			Runnable target = tp.createActiveTriggerRunnable(new WindowingCallback(tp));
+			if (target != null) {
+				Thread thread = new Thread(target);
+				activePolicyTreads.add(thread);
+				thread.start();
+			}
+		}
+	};
+
+	/**
+	 * This class allows the active trigger threads to call back and push fake
+	 * elements at any time.
+	 */
+	private class WindowingCallback implements ActiveTriggerCallback {
+		private ActiveTriggerPolicy<IN> policy;
+
+		public WindowingCallback(ActiveTriggerPolicy<IN> policy) {
+			this.policy = policy;
+		}
+
+		@Override
+		public void sendFakeElement(Object datapoint) {
+			processFakeElement(datapoint, this.policy);
+		}
+
+	}
+
+	@Override
+	protected void immutableInvoke() throws Exception {
+
+		// Prevent empty data streams
+		if ((reuse = recordIterator.next(reuse)) == null) {
+			throw new RuntimeException("DataStream must not be empty");
+		}
+
+		// Continuously run
+		while (reuse != null) {
+			processRealElement(reuse.getObject());
+
+			// Recreate the reuse-StremRecord object and load next StreamRecord
+			resetReuse();
+			reuse = recordIterator.next(reuse);
+		}
+
+		// Stop all remaining threads from policies
+		for (Thread t : activePolicyTreads) {
+			t.interrupt();
+		}
+
+		// finally trigger the buffer.
+		emitFinalWindow(null);
+
+	}
+
+	@Override
+	protected void mutableInvoke() throws Exception {
+		if (LOG.isInfoEnabled()) {
+			LOG.info("There is currently no mutable implementation of this operator. Immutable version
is used.");
+		}
+		immutableInvoke();
+	}
+
+	/**
+	 * This method gets called in case of an grouped windowing in case central
+	 * trigger occurred and the arriving element causing the trigger is not part
+	 * of this group.
+	 * 
+	 * Remark: This is NOT the same as
+	 * {@link WindowInvokable#processFakeElement(Object, TriggerPolicy)}!
+	 * Here the eviction using active policies takes place after the call to the
+	 * UDF. Usually it is done before when fake elements get submitted. This
+	 * special behaviour is needed to allow the
+	 * {@link GroupedWindowInvokable} to send central triggers to all groups,
+	 * even if the current element does not belong to the group.
+	 * 
+	 * @param input
+	 *            a fake input element
+	 * @param policies
+	 *            the list of policies which caused the call with this fake
+	 *            element
+	 */
+	protected synchronized void externalTriggerFakeElement(IN input,
+			List<TriggerPolicy<IN>> policies) {
+
+		// Set the current triggers
+		currentTriggerPolicies.addAll(policies);
+
+		// emit
+		callUserFunctionAndLogException();
+
+		// clear the flag collection
+		currentTriggerPolicies.clear();
+
+		// Process the evictions and take care of double evictions
+		// In case there are multiple eviction policies present,
+		// only the one with the highest return value is recognized.
+		int currentMaxEviction = 0;
+		for (ActiveEvictionPolicy<IN> evictionPolicy : activeEvictionPolicies) {
+			// use temporary variable to prevent multiple calls to
+			// notifyEviction
+			int tmp = evictionPolicy.notifyEvictionWithFakeElement(input, buffer.size());
+			if (tmp > currentMaxEviction) {
+				currentMaxEviction = tmp;
+			}
+		}
+
+		for (int i = 0; i < currentMaxEviction; i++) {
+			try {
+				buffer.removeFirst();
+			} catch (NoSuchElementException e) {
+				// In case no more elements are in the buffer:
+				// Prevent failure and stop deleting.
+				break;
+			}
+		}
+	}
+
+	/**
+	 * This method processed an arrived fake element The method is synchronized
+	 * to ensure that it cannot interleave with
+	 * {@link WindowInvokable#processRealElement(Object)}
+	 * 
+	 * @param input
+	 *            a fake input element
+	 * @param currentPolicy
+	 *            the policy which produced this fake element
+	 */
+	protected synchronized void processFakeElement(Object input, TriggerPolicy<IN> currentPolicy)
{
+
+		// Process the evictions and take care of double evictions
+		// In case there are multiple eviction policies present,
+		// only the one with the highest return value is recognized.
+		int currentMaxEviction = 0;
+		for (ActiveEvictionPolicy<IN> evictionPolicy : activeEvictionPolicies) {
+			// use temporary variable to prevent multiple calls to
+			// notifyEviction
+			int tmp = evictionPolicy.notifyEvictionWithFakeElement(input, buffer.size());
+			if (tmp > currentMaxEviction) {
+				currentMaxEviction = tmp;
+			}
+		}
+
+		for (int i = 0; i < currentMaxEviction; i++) {
+			try {
+				buffer.removeFirst();
+			} catch (NoSuchElementException e) {
+				// In case no more elements are in the buffer:
+				// Prevent failure and stop deleting.
+				break;
+			}
+		}
+
+		// Set the current trigger
+		currentTriggerPolicies.add(currentPolicy);
+
+		// emit
+		callUserFunctionAndLogException();
+
+		// clear the flag collection
+		currentTriggerPolicies.clear();
+	}
+
+	/**
+	 * This method processed an arrived real element The method is synchronized
+	 * to ensure that it cannot interleave with
+	 * {@link WindowInvokable#processFakeElement(Object)}.
+	 * 
+	 * @param input
+	 *            a real input element
+	 * @param triggerPolicies
+	 *            Allows to set trigger policies which are maintained
+	 *            externally. This is the case for central policies in
+	 *            {@link GroupedWindowInvokable}.
+	 */
+	protected synchronized void processRealElement(IN input, List<TriggerPolicy<IN>>
triggerPolicies) {
+		this.currentTriggerPolicies.addAll(triggerPolicies);
+		processRealElement(input);
+	}
+
+	/**
+	 * This method processed an arrived real element The method is synchronized
+	 * to ensure that it cannot interleave with
+	 * {@link WindowInvokable#processFakeElement(Object)}
+	 * 
+	 * @param input
+	 *            a real input element
+	 */
+	protected synchronized void processRealElement(IN input) {
+
+		// Run the precalls to detect missed windows
+		for (ActiveTriggerPolicy<IN> trigger : activeTriggerPolicies) {
+			// Remark: In case multiple active triggers are present the ordering
+			// of the different fake elements returned by this triggers becomes
+			// a problem. This might lead to unexpected results...
+			// Should we limit the number of active triggers to 0 or 1?
+			Object[] result = trigger.preNotifyTrigger(input);
+			for (Object in : result) {
+				processFakeElement(in, trigger);
+			}
+		}
+
+		// Remember if a trigger occurred
+		boolean isTriggered = false;
+
+		// Process the triggers
+		for (TriggerPolicy<IN> triggerPolicy : triggerPolicies) {
+			if (triggerPolicy.notifyTrigger(input)) {
+				currentTriggerPolicies.add(triggerPolicy);
+			}
+		}
+
+		// call user function
+		if (!currentTriggerPolicies.isEmpty()) {
+			// emit
+			callUserFunctionAndLogException();
+
+			// clear the flag collection
+			currentTriggerPolicies.clear();
+
+			// remember trigger
+			isTriggered = true;
+		}
+
+		// Process the evictions and take care of double evictions
+		// In case there are multiple eviction policies present,
+		// only the one with the highest return value is recognized.
+		int currentMaxEviction = 0;
+
+		for (EvictionPolicy<IN> evictionPolicy : evictionPolicies) {
+			// use temporary variable to prevent multiple calls to
+			// notifyEviction
+			int tmp = evictionPolicy.notifyEviction(input, isTriggered, buffer.size());
+			if (tmp > currentMaxEviction) {
+				currentMaxEviction = tmp;
+			}
+		}
+
+		for (int i = 0; i < currentMaxEviction; i++) {
+			try {
+				buffer.removeFirst();
+			} catch (NoSuchElementException e) {
+				// In case no more elements are in the buffer:
+				// Prevent failure and stop deleting.
+				break;
+			}
+		}
+
+		// Add the current element to the buffer
+		buffer.add(input);
+
+	}
+
+	/**
+	 * This method does the final reduce at the end of the stream and emits the
+	 * result.
+	 * 
+	 * @param centralTriggerPolicies
+	 *            Allows to set trigger policies which are maintained
+	 *            externally. This is the case for central policies in
+	 *            {@link GroupedWindowInvokable}.
+	 */
+	protected void emitFinalWindow(List<TriggerPolicy<IN>> centralTriggerPolicies)
{
+		if (!buffer.isEmpty()) {
+			currentTriggerPolicies.clear();
+
+			if (centralTriggerPolicies != null) {
+				currentTriggerPolicies.addAll(centralTriggerPolicies);
+			}
+
+			for (TriggerPolicy<IN> policy : triggerPolicies) {
+				currentTriggerPolicies.add(policy);
+			}
+
+			callUserFunctionAndLogException();
+		}
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2d73c3f1/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowReduceInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowReduceInvokable.java
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowReduceInvokable.java
new file mode 100644
index 0000000..b6456e1
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowReduceInvokable.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.invokable.operator;
+
+import java.util.Iterator;
+import java.util.LinkedList;
+
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.streaming.api.windowing.policy.EvictionPolicy;
+import org.apache.flink.streaming.api.windowing.policy.TriggerPolicy;
+
+public class WindowReduceInvokable<IN> extends WindowInvokable<IN, IN> {
+
+	private static final long serialVersionUID = 1L;
+
+	ReduceFunction<IN> reducer;
+
+	public WindowReduceInvokable(ReduceFunction<IN> userFunction,
+			LinkedList<TriggerPolicy<IN>> triggerPolicies,
+			LinkedList<EvictionPolicy<IN>> evictionPolicies) {
+		super(userFunction, triggerPolicies, evictionPolicies);
+		this.reducer = userFunction;
+	}
+
+	@Override
+	protected void callUserFunction() throws Exception {
+		Iterator<IN> reducedIterator = buffer.iterator();
+		IN reduced = null;
+
+		while (reducedIterator.hasNext() && reduced == null) {
+			reduced = reducedIterator.next();
+		}
+
+		while (reducedIterator.hasNext()) {
+			IN next = reducedIterator.next();
+			if (next != null) {
+				reduced = reducer.reduce(reduced, next);
+			}
+		}
+		if (reduced != null) {
+			collector.collect(reduced);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2d73c3f1/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowingGroupInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowingGroupInvokable.java
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowingGroupInvokable.java
deleted file mode 100644
index 2752fd1..0000000
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowingGroupInvokable.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.invokable.operator;
-
-import java.util.LinkedList;
-
-import org.apache.flink.api.common.functions.GroupReduceFunction;
-import org.apache.flink.streaming.api.windowing.policy.EvictionPolicy;
-import org.apache.flink.streaming.api.windowing.policy.TriggerPolicy;
-
-public class WindowingGroupInvokable<IN, OUT> extends WindowingInvokable<IN, OUT>
{
-
-	private static final long serialVersionUID = 1L;
-	GroupReduceFunction<IN, OUT> reducer;
-
-	public WindowingGroupInvokable(GroupReduceFunction<IN, OUT> userFunction,
-			LinkedList<TriggerPolicy<IN>> triggerPolicies,
-			LinkedList<EvictionPolicy<IN>> evictionPolicies) {
-		super(userFunction, triggerPolicies, evictionPolicies);
-		this.reducer = userFunction;
-	}
-
-	@Override
-	protected void callUserFunction() throws Exception {
-		reducer.reduce(buffer, collector);
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2d73c3f1/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowingInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowingInvokable.java
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowingInvokable.java
deleted file mode 100644
index 95a999c..0000000
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowingInvokable.java
+++ /dev/null
@@ -1,368 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.invokable.operator;
-
-import java.util.LinkedList;
-import java.util.List;
-import java.util.NoSuchElementException;
-
-import org.apache.flink.api.common.functions.Function;
-import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.streaming.api.invokable.StreamInvokable;
-import org.apache.flink.streaming.api.windowing.policy.ActiveEvictionPolicy;
-import org.apache.flink.streaming.api.windowing.policy.ActiveTriggerCallback;
-import org.apache.flink.streaming.api.windowing.policy.ActiveTriggerPolicy;
-import org.apache.flink.streaming.api.windowing.policy.EvictionPolicy;
-import org.apache.flink.streaming.api.windowing.policy.TriggerPolicy;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public abstract class WindowingInvokable<IN, OUT> extends StreamInvokable<IN, OUT>
{
-
-	/**
-	 * Auto-generated serial version UID
-	 */
-	private static final long serialVersionUID = -8038984294071650730L;
-
-	private static final Logger LOG = LoggerFactory.getLogger(WindowingInvokable.class);
-
-	private LinkedList<TriggerPolicy<IN>> triggerPolicies;
-	private LinkedList<EvictionPolicy<IN>> evictionPolicies;
-	private LinkedList<ActiveTriggerPolicy<IN>> activeTriggerPolicies;
-	private LinkedList<ActiveEvictionPolicy<IN>> activeEvictionPolicies;
-	private LinkedList<Thread> activePolicyTreads = new LinkedList<Thread>();
-	protected LinkedList<IN> buffer = new LinkedList<IN>();
-	private LinkedList<TriggerPolicy<IN>> currentTriggerPolicies = new LinkedList<TriggerPolicy<IN>>();
-
-	/**
-	 * This constructor created a windowing invokable using trigger and eviction
-	 * policies.
-	 * 
-	 * @param userFunction
-	 *            The user defined {@link ReduceFunction}
-	 * @param triggerPolicies
-	 *            A list of {@link TriggerPolicy}s and/or
-	 *            {@link ActiveTriggerPolicy}s
-	 * @param evictionPolicies
-	 *            A list of {@link EvictionPolicy}s and/or
-	 *            {@link ActiveEvictionPolicy}s
-	 */
-	public WindowingInvokable(Function userFunction, LinkedList<TriggerPolicy<IN>>
triggerPolicies,
-			LinkedList<EvictionPolicy<IN>> evictionPolicies) {
-		super(userFunction);
-
-		this.triggerPolicies = triggerPolicies;
-		this.evictionPolicies = evictionPolicies;
-
-		activeTriggerPolicies = new LinkedList<ActiveTriggerPolicy<IN>>();
-		for (TriggerPolicy<IN> tp : triggerPolicies) {
-			if (tp instanceof ActiveTriggerPolicy) {
-				activeTriggerPolicies.add((ActiveTriggerPolicy<IN>) tp);
-			}
-		}
-
-		activeEvictionPolicies = new LinkedList<ActiveEvictionPolicy<IN>>();
-		for (EvictionPolicy<IN> ep : evictionPolicies) {
-			if (ep instanceof ActiveEvictionPolicy) {
-				activeEvictionPolicies.add((ActiveEvictionPolicy<IN>) ep);
-			}
-		}
-	}
-
-	@Override
-	public void open(org.apache.flink.configuration.Configuration parameters) throws Exception
{
-		super.open(parameters);
-		for (ActiveTriggerPolicy<IN> tp : activeTriggerPolicies) {
-			Runnable target = tp.createActiveTriggerRunnable(new WindowingCallback(tp));
-			if (target != null) {
-				Thread thread = new Thread(target);
-				activePolicyTreads.add(thread);
-				thread.start();
-			}
-		}
-	};
-
-	/**
-	 * This class allows the active trigger threads to call back and push fake
-	 * elements at any time.
-	 */
-	private class WindowingCallback implements ActiveTriggerCallback {
-		private ActiveTriggerPolicy<IN> policy;
-
-		public WindowingCallback(ActiveTriggerPolicy<IN> policy) {
-			this.policy = policy;
-		}
-
-		@Override
-		public void sendFakeElement(Object datapoint) {
-			processFakeElement(datapoint, this.policy);
-		}
-
-	}
-
-	@Override
-	protected void immutableInvoke() throws Exception {
-
-		// Prevent empty data streams
-		if ((reuse = recordIterator.next(reuse)) == null) {
-			throw new RuntimeException("DataStream must not be empty");
-		}
-
-		// Continuously run
-		while (reuse != null) {
-			processRealElement(reuse.getObject());
-
-			// Recreate the reuse-StremRecord object and load next StreamRecord
-			resetReuse();
-			reuse = recordIterator.next(reuse);
-		}
-
-		// Stop all remaining threads from policies
-		for (Thread t : activePolicyTreads) {
-			t.interrupt();
-		}
-
-		// finally trigger the buffer.
-		emitFinalWindow(null);
-
-	}
-
-	@Override
-	protected void mutableInvoke() throws Exception {
-		if (LOG.isInfoEnabled()) {
-			LOG.info("There is currently no mutable implementation of this operator. Immutable version
is used.");
-		}
-		immutableInvoke();
-	}
-
-	/**
-	 * This method gets called in case of an grouped windowing in case central
-	 * trigger occurred and the arriving element causing the trigger is not part
-	 * of this group.
-	 * 
-	 * Remark: This is NOT the same as
-	 * {@link WindowingInvokable#processFakeElement(Object, TriggerPolicy)}!
-	 * Here the eviction using active policies takes place after the call to the
-	 * UDF. Usually it is done before when fake elements get submitted. This
-	 * special behaviour is needed to allow the
-	 * {@link GroupedWindowingInvokable} to send central triggers to all groups,
-	 * even if the current element does not belong to the group.
-	 * 
-	 * @param input
-	 *            a fake input element
-	 * @param policies
-	 *            the list of policies which caused the call with this fake
-	 *            element
-	 */
-	protected synchronized void externalTriggerFakeElement(IN input,
-			List<TriggerPolicy<IN>> policies) {
-
-		// Set the current triggers
-		currentTriggerPolicies.addAll(policies);
-
-		// emit
-		callUserFunctionAndLogException();
-
-		// clear the flag collection
-		currentTriggerPolicies.clear();
-
-		// Process the evictions and take care of double evictions
-		// In case there are multiple eviction policies present,
-		// only the one with the highest return value is recognized.
-		int currentMaxEviction = 0;
-		for (ActiveEvictionPolicy<IN> evictionPolicy : activeEvictionPolicies) {
-			// use temporary variable to prevent multiple calls to
-			// notifyEviction
-			int tmp = evictionPolicy.notifyEvictionWithFakeElement(input, buffer.size());
-			if (tmp > currentMaxEviction) {
-				currentMaxEviction = tmp;
-			}
-		}
-
-		for (int i = 0; i < currentMaxEviction; i++) {
-			try {
-				buffer.removeFirst();
-			} catch (NoSuchElementException e) {
-				// In case no more elements are in the buffer:
-				// Prevent failure and stop deleting.
-				break;
-			}
-		}
-	}
-
-	/**
-	 * This method processed an arrived fake element The method is synchronized
-	 * to ensure that it cannot interleave with
-	 * {@link WindowingInvokable#processRealElement(Object)}
-	 * 
-	 * @param input
-	 *            a fake input element
-	 * @param currentPolicy
-	 *            the policy which produced this fake element
-	 */
-	protected synchronized void processFakeElement(Object input, TriggerPolicy<IN> currentPolicy)
{
-
-		// Process the evictions and take care of double evictions
-		// In case there are multiple eviction policies present,
-		// only the one with the highest return value is recognized.
-		int currentMaxEviction = 0;
-		for (ActiveEvictionPolicy<IN> evictionPolicy : activeEvictionPolicies) {
-			// use temporary variable to prevent multiple calls to
-			// notifyEviction
-			int tmp = evictionPolicy.notifyEvictionWithFakeElement(input, buffer.size());
-			if (tmp > currentMaxEviction) {
-				currentMaxEviction = tmp;
-			}
-		}
-
-		for (int i = 0; i < currentMaxEviction; i++) {
-			try {
-				buffer.removeFirst();
-			} catch (NoSuchElementException e) {
-				// In case no more elements are in the buffer:
-				// Prevent failure and stop deleting.
-				break;
-			}
-		}
-
-		// Set the current trigger
-		currentTriggerPolicies.add(currentPolicy);
-
-		// emit
-		callUserFunctionAndLogException();
-
-		// clear the flag collection
-		currentTriggerPolicies.clear();
-	}
-
-	/**
-	 * This method processed an arrived real element The method is synchronized
-	 * to ensure that it cannot interleave with
-	 * {@link WindowingInvokable#processFakeElement(Object)}.
-	 * 
-	 * @param input
-	 *            a real input element
-	 * @param triggerPolicies
-	 *            Allows to set trigger policies which are maintained
-	 *            externally. This is the case for central policies in
-	 *            {@link GroupedWindowingInvokable}.
-	 */
-	protected synchronized void processRealElement(IN input, List<TriggerPolicy<IN>>
triggerPolicies) {
-		this.currentTriggerPolicies.addAll(triggerPolicies);
-		processRealElement(input);
-	}
-
-	/**
-	 * This method processed an arrived real element The method is synchronized
-	 * to ensure that it cannot interleave with
-	 * {@link WindowingInvokable#processFakeElement(Object)}
-	 * 
-	 * @param input
-	 *            a real input element
-	 */
-	protected synchronized void processRealElement(IN input) {
-
-		// Run the precalls to detect missed windows
-		for (ActiveTriggerPolicy<IN> trigger : activeTriggerPolicies) {
-			// Remark: In case multiple active triggers are present the ordering
-			// of the different fake elements returned by this triggers becomes
-			// a problem. This might lead to unexpected results...
-			// Should we limit the number of active triggers to 0 or 1?
-			Object[] result = trigger.preNotifyTrigger(input);
-			for (Object in : result) {
-				processFakeElement(in, trigger);
-			}
-		}
-
-		// Remember if a trigger occurred
-		boolean isTriggered = false;
-
-		// Process the triggers
-		for (TriggerPolicy<IN> triggerPolicy : triggerPolicies) {
-			if (triggerPolicy.notifyTrigger(input)) {
-				currentTriggerPolicies.add(triggerPolicy);
-			}
-		}
-
-		// call user function
-		if (!currentTriggerPolicies.isEmpty()) {
-			// emit
-			callUserFunctionAndLogException();
-
-			// clear the flag collection
-			currentTriggerPolicies.clear();
-
-			// remember trigger
-			isTriggered = true;
-		}
-
-		// Process the evictions and take care of double evictions
-		// In case there are multiple eviction policies present,
-		// only the one with the highest return value is recognized.
-		int currentMaxEviction = 0;
-
-		for (EvictionPolicy<IN> evictionPolicy : evictionPolicies) {
-			// use temporary variable to prevent multiple calls to
-			// notifyEviction
-			int tmp = evictionPolicy.notifyEviction(input, isTriggered, buffer.size());
-			if (tmp > currentMaxEviction) {
-				currentMaxEviction = tmp;
-			}
-		}
-
-		for (int i = 0; i < currentMaxEviction; i++) {
-			try {
-				buffer.removeFirst();
-			} catch (NoSuchElementException e) {
-				// In case no more elements are in the buffer:
-				// Prevent failure and stop deleting.
-				break;
-			}
-		}
-
-		// Add the current element to the buffer
-		buffer.add(input);
-
-	}
-
-	/**
-	 * This method does the final reduce at the end of the stream and emits the
-	 * result.
-	 * 
-	 * @param centralTriggerPolicies
-	 *            Allows to set trigger policies which are maintained
-	 *            externally. This is the case for central policies in
-	 *            {@link GroupedWindowingInvokable}.
-	 */
-	protected void emitFinalWindow(List<TriggerPolicy<IN>> centralTriggerPolicies)
{
-		if (!buffer.isEmpty()) {
-			currentTriggerPolicies.clear();
-
-			if (centralTriggerPolicies != null) {
-				currentTriggerPolicies.addAll(centralTriggerPolicies);
-			}
-
-			for (TriggerPolicy<IN> policy : triggerPolicies) {
-				currentTriggerPolicies.add(policy);
-			}
-
-			callUserFunctionAndLogException();
-		}
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2d73c3f1/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowingReduceInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowingReduceInvokable.java
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowingReduceInvokable.java
deleted file mode 100644
index 89f98e0..0000000
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowingReduceInvokable.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.invokable.operator;
-
-import java.util.Iterator;
-import java.util.LinkedList;
-
-import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.streaming.api.windowing.policy.EvictionPolicy;
-import org.apache.flink.streaming.api.windowing.policy.TriggerPolicy;
-
-public class WindowingReduceInvokable<IN> extends WindowingInvokable<IN, IN>
{
-
-	private static final long serialVersionUID = 1L;
-
-	ReduceFunction<IN> reducer;
-
-	public WindowingReduceInvokable(ReduceFunction<IN> userFunction,
-			LinkedList<TriggerPolicy<IN>> triggerPolicies,
-			LinkedList<EvictionPolicy<IN>> evictionPolicies) {
-		super(userFunction, triggerPolicies, evictionPolicies);
-		this.reducer = userFunction;
-	}
-
-	@Override
-	protected void callUserFunction() throws Exception {
-		Iterator<IN> reducedIterator = buffer.iterator();
-		IN reduced = null;
-
-		while (reducedIterator.hasNext() && reduced == null) {
-			reduced = reducedIterator.next();
-		}
-
-		while (reducedIterator.hasNext()) {
-			IN next = reducedIterator.next();
-			if (next != null) {
-				reduced = reducer.reduce(reduced, next);
-			}
-		}
-		if (reduced != null) {
-			collector.collect(reduced);
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2d73c3f1/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/CloneableEvictionPolicy.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/CloneableEvictionPolicy.java
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/CloneableEvictionPolicy.java
index 0c2d691..132b495 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/CloneableEvictionPolicy.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/CloneableEvictionPolicy.java
@@ -17,10 +17,10 @@
 
 package org.apache.flink.streaming.api.windowing.policy;
 
-import org.apache.flink.streaming.api.invokable.operator.GroupedWindowingInvokable;
+import org.apache.flink.streaming.api.invokable.operator.GroupedWindowInvokable;
 
 /**
- * When used in {@link GroupedWindowingInvokable}, eviction policies must
+ * When used in {@link GroupedWindowInvokable}, eviction policies must
  * provide a clone method. Eviction policies get cloned to provide an own
  * instance for each group and respectively each individual element buffer as
  * groups maintain their own buffers with the elements belonging to the

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2d73c3f1/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/CloneableTriggerPolicy.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/CloneableTriggerPolicy.java
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/CloneableTriggerPolicy.java
index 64f8bf3..f5772a1 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/CloneableTriggerPolicy.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/CloneableTriggerPolicy.java
@@ -17,10 +17,10 @@
 
 package org.apache.flink.streaming.api.windowing.policy;
 
-import org.apache.flink.streaming.api.invokable.operator.GroupedWindowingInvokable;
+import org.apache.flink.streaming.api.invokable.operator.GroupedWindowInvokable;
 
 /**
- * When used in {@link GroupedWindowingInvokable}, trigger policies can provide
+ * When used in {@link GroupedWindowInvokable}, trigger policies can provide
  * a clone method. Cloneable triggers can can be used in a distributed manner,
  * which means they get cloned to provide an own instance for each group. This
  * allows each group to trigger individually and only based on the elements

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2d73c3f1/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/GroupedWindowingInvokableTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/GroupedWindowingInvokableTest.java
b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/GroupedWindowingInvokableTest.java
index 79288d1..9d5744b 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/GroupedWindowingInvokableTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/GroupedWindowingInvokableTest.java
@@ -77,7 +77,7 @@ public class GroupedWindowingInvokableTest {
 
 		LinkedList<TriggerPolicy<Integer>> centralTriggers = new LinkedList<TriggerPolicy<Integer>>();
 
-		GroupedWindowingInvokable<Integer, Integer> invokable = new GroupedWindowingInvokable<Integer,
Integer>(
+		GroupedWindowInvokable<Integer, Integer> invokable = new GroupedWindowInvokable<Integer,
Integer>(
 				new ReduceFunction<Integer>() {
 					private static final long serialVersionUID = 1L;
 
@@ -136,7 +136,7 @@ public class GroupedWindowingInvokableTest {
 
 		LinkedList<TriggerPolicy<Tuple2<Integer, String>>> centralTriggers =
new LinkedList<TriggerPolicy<Tuple2<Integer, String>>>();
 
-		GroupedWindowingInvokable<Tuple2<Integer, String>, Tuple2<Integer, String>>
invokable2 = new GroupedWindowingInvokable<Tuple2<Integer, String>, Tuple2<Integer,
String>>(
+		GroupedWindowInvokable<Tuple2<Integer, String>, Tuple2<Integer, String>>
invokable2 = new GroupedWindowInvokable<Tuple2<Integer, String>, Tuple2<Integer,
String>>(
 				new ReduceFunction<Tuple2<Integer, String>>() {
 					private static final long serialVersionUID = 1L;
 
@@ -256,7 +256,7 @@ public class GroupedWindowingInvokableTest {
 
 		LinkedList<CloneableTriggerPolicy<Tuple2<Integer, String>>> distributedTriggers
= new LinkedList<CloneableTriggerPolicy<Tuple2<Integer, String>>>();
 
-		GroupedWindowingInvokable<Tuple2<Integer, String>, Tuple2<Integer, String>>
invokable = new GroupedWindowingInvokable<Tuple2<Integer, String>, Tuple2<Integer,
String>>(
+		GroupedWindowInvokable<Tuple2<Integer, String>, Tuple2<Integer, String>>
invokable = new GroupedWindowInvokable<Tuple2<Integer, String>, Tuple2<Integer,
String>>(
 				myReduceFunction, new TupleKeySelector<Tuple2<Integer, String>>(1),
 				distributedTriggers, evictions, triggers);
 
@@ -319,7 +319,7 @@ public class GroupedWindowingInvokableTest {
 			}
 		};
 
-		GroupedWindowingInvokable<Integer, Integer> invokable = new GroupedWindowingInvokable<Integer,
Integer>(
+		GroupedWindowInvokable<Integer, Integer> invokable = new GroupedWindowInvokable<Integer,
Integer>(
 				myReduceFunction, new KeySelector<Integer, Integer>() {
 					private static final long serialVersionUID = 1L;
 
@@ -395,7 +395,7 @@ public class GroupedWindowingInvokableTest {
 			}
 		};
 
-		GroupedWindowingInvokable<Integer, Integer> invokable = new GroupedWindowingInvokable<Integer,
Integer>(
+		GroupedWindowInvokable<Integer, Integer> invokable = new GroupedWindowInvokable<Integer,
Integer>(
 				myReduceFunction, new KeySelector<Integer, Integer>() {
 					private static final long serialVersionUID = 1L;
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2d73c3f1/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/WindowingInvokableTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/WindowingInvokableTest.java
b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/WindowingInvokableTest.java
index 32e71ba..4c6b187 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/WindowingInvokableTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/WindowingInvokableTest.java
@@ -94,7 +94,7 @@ public class WindowingInvokableTest {
 		// Always delete all elements older then 4
 		evictions.add(new TimeEvictionPolicy<Integer>(4L, myTimeStamp));
 
-		WindowingInvokable<Integer, Integer> invokable = new WindowingReduceInvokable<Integer>(
+		WindowInvokable<Integer, Integer> invokable = new WindowReduceInvokable<Integer>(
 				myReduceFunction, triggers, evictions);
 
 		ArrayList<Integer> result = new ArrayList<Integer>();
@@ -138,7 +138,7 @@ public class WindowingInvokableTest {
 		// time after the 3rd element
 		evictions.add(new CountEvictionPolicy<Integer>(2, 2, -1));
 
-		WindowingInvokable<Integer, Integer> invokable = new WindowingReduceInvokable<Integer>(
+		WindowInvokable<Integer, Integer> invokable = new WindowReduceInvokable<Integer>(
 				myReduceFunction, triggers, evictions);
 
 		List<Integer> expected = new ArrayList<Integer>();
@@ -191,7 +191,7 @@ public class WindowingInvokableTest {
 		// time after on the 5th element
 		evictions.add(new CountEvictionPolicy<Integer>(3, 3, -1));
 
-		WindowingInvokable<Integer, Integer> invokable2 = new WindowingReduceInvokable<Integer>(
+		WindowInvokable<Integer, Integer> invokable2 = new WindowReduceInvokable<Integer>(
 				myReduceFunction, triggers, evictions);
 
 		List<Integer> expected2 = new ArrayList<Integer>();
@@ -249,7 +249,7 @@ public class WindowingInvokableTest {
 			}
 		};
 
-		WindowingInvokable<Integer, Integer> invokable = new WindowingReduceInvokable<Integer>(
+		WindowInvokable<Integer, Integer> invokable = new WindowReduceInvokable<Integer>(
 				myReduceFunction, triggers, evictions);
 
 		ArrayList<Integer> result = new ArrayList<Integer>();


Mime
View raw message