flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rmetz...@apache.org
Subject [10/52] [partial] flink git commit: [FLINK-1452] Rename 'flink-addons' to 'flink-staging'
Date Mon, 02 Feb 2015 18:41:48 GMT
http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/deltafunction/EuclideanDistance.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/deltafunction/EuclideanDistance.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/deltafunction/EuclideanDistance.java
new file mode 100644
index 0000000..9d055d5
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/deltafunction/EuclideanDistance.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.windowing.deltafunction;
+
+import org.apache.flink.streaming.api.windowing.extractor.Extractor;
+
+/**
+ * This delta function calculates the euclidean distance between two given
+ * points.
+ * 
+ * Euclidean distance: http://en.wikipedia.org/wiki/Euclidean_distance
+ * 
+ * @param <DATA>
+ *            The input data type. This delta function works with a double[],
+ *            but can extract/convert to it from any other given object in case
+ *            the respective extractor has been set. See
+ *            {@link ExtractionAwareDeltaFunction} for more information.
+ */
+public class EuclideanDistance<DATA> extends ExtractionAwareDeltaFunction<DATA, double[]> {
+
+	public EuclideanDistance() {
+		super(null);
+	}
+
+	public EuclideanDistance(Extractor<DATA, double[]> converter) {
+		super(converter);
+	}
+
+	/**
+	 * auto-generated version id
+	 */
+	private static final long serialVersionUID = 3119432599634512359L;
+
+	@Override
+	public double getNestedDelta(double[] oldDataPoint, double[] newDataPoint) {
+		double result = 0;
+		for (int i = 0; i < oldDataPoint.length; i++) {
+			result += (oldDataPoint[i] - newDataPoint[i]) * (oldDataPoint[i] - newDataPoint[i]);
+		}
+		return Math.sqrt(result);
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/deltafunction/ExtractionAwareDeltaFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/deltafunction/ExtractionAwareDeltaFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/deltafunction/ExtractionAwareDeltaFunction.java
new file mode 100644
index 0000000..81d01a4
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/deltafunction/ExtractionAwareDeltaFunction.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.windowing.deltafunction;
+
+import org.apache.flink.streaming.api.windowing.extractor.Extractor;
+
+/**
+ * Extend this abstract class to implement a delta function which is aware of
+ * extracting the data on which the delta is calculated from a more complex data
+ * structure. For example in case you want to be able to run a delta only on one
+ * field of a Tuple type or only on some fields from an array.
+ * 
+ * @param <DATA>
+ *            The input data type. The input of this type will be passed to the
+ *            extractor which will transform into a TO-object. The delta
+ *            function then runs on this TO-object.
+ * @param <TO>
+ *            The type on which the delta function runs. (The type of the delta
+ *            function)
+ */
+public abstract class ExtractionAwareDeltaFunction<DATA, TO> implements DeltaFunction<DATA> {
+
+	/**
+	 * Generated Version ID
+	 */
+	private static final long serialVersionUID = 6927486219702689554L;
+	private Extractor<DATA, TO> converter;
+
+	public ExtractionAwareDeltaFunction(Extractor<DATA, TO> converter) {
+		this.converter = converter;
+	}
+
+	/**
+	 * This method takes the two data point and runs the set extractor on it.
+	 * The delta function implemented at {@link getNestedDelta} is then called
+	 * with the extracted data. In case no extractor is set the input data gets
+	 * passes to {@link getNestedDelta} as-is. The return value is just
+	 * forwarded from {@link getNestedDelta}.
+	 * 
+	 * @param oldDataPoint
+	 *            the older data point as raw data (before extraction).
+	 * @param newDataPoint
+	 *            the new data point as raw data (before extraction).
+	 * @return the delta between the two points.
+	 */
+	@SuppressWarnings("unchecked")
+	@Override
+	public double getDelta(DATA oldDataPoint, DATA newDataPoint) {
+		if (converter == null) {
+			// In case no conversion/extraction is required, we can cast DATA to
+			// TO
+			// => Therefore, "unchecked" warning is suppressed for this method.
+			return getNestedDelta((TO) oldDataPoint, (TO) newDataPoint);
+		} else {
+			return getNestedDelta(converter.extract(oldDataPoint), converter.extract(newDataPoint));
+		}
+
+	}
+
+	/**
+	 * This method is exactly the same as
+	 * {@link DeltaFunction#getDelta(Object, Object)} except that it gets the
+	 * result of the previously done extractions as input. Therefore, this
+	 * method only does the actual calculation of the delta but no data
+	 * extraction or conversion.
+	 * 
+	 * @param oldDataPoint
+	 *            the older data point.
+	 * @param newDataPoint
+	 *            the new data point.
+	 * @return the delta between the two points.
+	 */
+	public abstract double getNestedDelta(TO oldDataPoint, TO newDataPoint);
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/extractor/ArrayFromTuple.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/extractor/ArrayFromTuple.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/extractor/ArrayFromTuple.java
new file mode 100644
index 0000000..ee878ac
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/extractor/ArrayFromTuple.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.windowing.extractor;
+
+import org.apache.flink.api.java.tuple.Tuple;
+
+/**
+ * Converts a Tuple to an Object-Array. The field which should be included in
+ * the array can selected and reordered as needed.
+ */
+public class ArrayFromTuple implements Extractor<Tuple, Object[]> {
+
+	/**
+	 * Auto generated version id
+	 */
+	private static final long serialVersionUID = -6076121226427616818L;
+	int[] order = null;
+
+	/**
+	 * Using this constructor the extractor will convert the whole tuple (all
+	 * fields in the original order) to an array.
+	 */
+	public ArrayFromTuple() {
+		// noting to do
+	}
+
+	/**
+	 * Using this constructor the extractor will combine the fields as specified
+	 * in the indexes parameter in an object array.
+	 * 
+	 * @param indexes
+	 *            the field ids (enumerated from 0)
+	 */
+	public ArrayFromTuple(int... indexes) {
+		this.order = indexes;
+	}
+
+	@Override
+	public Object[] extract(Tuple in) {
+		Object[] output;
+
+		if (order == null) {
+			// copy the whole tuple
+			output = new Object[in.getArity()];
+			for (int i = 0; i < in.getArity(); i++) {
+				output[i] = in.getField(i);
+			}
+		} else {
+			// copy user specified order
+			output = new Object[order.length];
+			for (int i = 0; i < order.length; i++) {
+				output[i] = in.getField(order[i]);
+			}
+		}
+
+		return output;
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/extractor/ConcatinatedExtract.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/extractor/ConcatinatedExtract.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/extractor/ConcatinatedExtract.java
new file mode 100644
index 0000000..9048a3c
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/extractor/ConcatinatedExtract.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.windowing.extractor;
+
+/**
+ * Combines two extractors which will be executed one after each other.
+ *
+ * @param <FROM>
+ *            The input type of the first extractor.
+ * @param <OVER>
+ *            The output type of the first and the input type of the second
+ *            extractor.
+ * @param <TO>
+ *            The output type of the second extractor and the output type of the
+ *            over all extraction.
+ */
+public class ConcatinatedExtract<FROM, OVER, TO> implements Extractor<FROM, TO> {
+
+	/**
+	 * auto-generated id
+	 */
+	private static final long serialVersionUID = -7807197760725651752L;
+
+	private Extractor<FROM, OVER> e1;
+	private Extractor<OVER, TO> e2;
+
+	/**
+	 * Combines two extractors which will be executed one after each other.
+	 * 
+	 * @param e1
+	 *            First extractor: This extractor gets applied to the input data
+	 *            first. Its output as then passed as input to the second
+	 *            extractor.
+	 * @param e2
+	 *            Second extractor: This extractor gets the output of the first
+	 *            extractor as input. Its output is then the result of the over
+	 *            all extraction.
+	 */
+	public ConcatinatedExtract(Extractor<FROM, OVER> e1, Extractor<OVER, TO> e2) {
+		this.e1 = e1;
+		this.e2 = e2;
+	}
+
+	@Override
+	public TO extract(FROM in) {
+		return e2.extract(e1.extract(in));
+	}
+
+	public <OUT> ConcatinatedExtract<FROM, TO, OUT> add(Extractor<TO, OUT> e3) {
+		return new ConcatinatedExtract<FROM, TO, OUT>(this, e3);
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/extractor/Extractor.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/extractor/Extractor.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/extractor/Extractor.java
new file mode 100644
index 0000000..b103ca3
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/extractor/Extractor.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.windowing.extractor;
+
+import java.io.Serializable;
+
+/**
+ * Extractors allow to extract/convert one type to another. They are mostly used
+ * to extract some fields out of a more complex structure (Tuple/Array) to run
+ * further calculation on the extraction result.
+ * 
+ * @param <FROM>
+ *            The input data type.
+ * @param <TO>
+ *            The output data type.
+ */
+public interface Extractor<FROM, TO> extends Serializable {
+
+	/**
+	 * Extracts/Converts the given input to an object of the output type
+	 * 
+	 * @param in
+	 *            the input data
+	 * @return the extracted/converted data
+	 */
+	public TO extract(FROM in);
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/extractor/FieldFromArray.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/extractor/FieldFromArray.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/extractor/FieldFromArray.java
new file mode 100644
index 0000000..0568276
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/extractor/FieldFromArray.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.windowing.extractor;
+
+import java.lang.reflect.Array;
+
+/**
+ * Extracts a single field out of an array.
+ * 
+ * @param <OUT>
+ *            The type of the extracted field.
+ */
+public class FieldFromArray<OUT> implements Extractor<Object, OUT> {
+
+	/**
+	 * Auto-gernated version id
+	 */
+	private static final long serialVersionUID = -5161386546695574359L;
+	private int fieldId = 0;
+
+	/**
+	 * Extracts the first field (id 0) from the array
+	 */
+	public FieldFromArray() {
+		// noting to do => will use default 0
+	}
+
+	/**
+	 * Extracts the field with the given id from the array.
+	 * 
+	 * @param fieldId
+	 *            The id of the field which will be extracted from the array.
+	 */
+	public FieldFromArray(int fieldId) {
+		this.fieldId = fieldId;
+	}
+
+	@SuppressWarnings("unchecked")
+	@Override
+	public OUT extract(Object in) {
+		return (OUT) Array.get(in, fieldId);
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/extractor/FieldFromTuple.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/extractor/FieldFromTuple.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/extractor/FieldFromTuple.java
new file mode 100644
index 0000000..07b38f5
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/extractor/FieldFromTuple.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.windowing.extractor;
+
+import org.apache.flink.api.java.tuple.Tuple;
+
+/**
+ * Extracts a single field out of a tuple.
+ * 
+ * @param <OUT>
+ *            The type of the extracted field.
+ */
+public class FieldFromTuple<OUT> implements Extractor<Tuple, OUT> {
+
+	/**
+	 * Auto-gernated version id
+	 */
+	private static final long serialVersionUID = -5161386546695574359L;
+	private int fieldId = 0;
+
+	/**
+	 * Extracts the first field (id 0) from the tuple
+	 */
+	public FieldFromTuple() {
+		// noting to do => will use default 0
+	}
+
+	/**
+	 * Extracts the field with the given id from the tuple.
+	 * 
+	 * @param fieldId
+	 *            The id of the field which will be extracted from the tuple.
+	 */
+	public FieldFromTuple(int fieldId) {
+		this.fieldId = fieldId;
+	}
+
+	@Override
+	public OUT extract(Tuple in) {
+		return in.getField(fieldId);
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/extractor/FieldsFromArray.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/extractor/FieldsFromArray.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/extractor/FieldsFromArray.java
new file mode 100644
index 0000000..4e98689
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/extractor/FieldsFromArray.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.windowing.extractor;
+
+import java.lang.reflect.Array;
+
+/**
+ * Extracts multiple fields from an array and puts them into a new array of the
+ * specified type.
+ *
+ * @param <OUT>
+ *            The type of the output array. If out is set to String, the output
+ *            of the extractor will be a String[]. If it is set to String[] the
+ *            output will be String[][].
+ */
+public class FieldsFromArray<OUT> implements Extractor<Object, OUT[]> {
+
+	/**
+	 * Auto-generated version id
+	 */
+	private static final long serialVersionUID = 8075055384516397670L;
+	private int[] order;
+	private Class<OUT> clazz;
+
+	/**
+	 * Extracts multiple fields from an array and puts them in the given order
+	 * into a new array of the specified type.
+	 * 
+	 * @param clazz
+	 *            the Class object representing the component type of the new
+	 *            array
+	 * @param indexes
+	 *            The indexes of the fields to be extracted. Any order is
+	 *            possible, but not more than 255 fields due to limitations in
+	 *            {@link Array#newInstance(Class, int...)}.
+	 */
+	public FieldsFromArray(Class<OUT> clazz, int... indexes) {
+		this.order = indexes;
+		this.clazz = clazz;
+	}
+
+	@SuppressWarnings("unchecked")
+	@Override
+	public OUT[] extract(Object in) {
+		OUT[] output = (OUT[]) Array.newInstance(clazz, order.length);
+		for (int i = 0; i < order.length; i++) {
+			output[i] = (OUT) Array.get(in, this.order[i]);
+		}
+		return output;
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/extractor/FieldsFromTuple.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/extractor/FieldsFromTuple.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/extractor/FieldsFromTuple.java
new file mode 100644
index 0000000..1bfc461
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/extractor/FieldsFromTuple.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.windowing.extractor;
+
+import org.apache.flink.api.java.tuple.Tuple;
+
+/**
+ * Extracts one or more fields of the type Double from a tuple and puts them
+ * into a new double[]
+ */
+public class FieldsFromTuple implements Extractor<Tuple, double[]> {
+
+	/**
+	 * auto generated version id
+	 */
+	private static final long serialVersionUID = -2554079091050273761L;
+	int[] indexes;
+
+	/**
+	 * Extracts one or more fields of the the type Double from a tuple and puts
+	 * them into a new double[] (in the specified order).
+	 * 
+	 * @param indexes
+	 *            The indexes of the fields to be extracted.
+	 */
+	public FieldsFromTuple(int... indexes) {
+		this.indexes = indexes;
+	}
+
+	@Override
+	public double[] extract(Tuple in) {
+		double[] out = new double[indexes.length];
+		for (int i = 0; i < indexes.length; i++) {
+			out[i] = (Double) in.getField(indexes[i]);
+		}
+		return out;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/Count.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/Count.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/Count.java
new file mode 100644
index 0000000..840546f
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/Count.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.windowing.helper;
+
+import org.apache.flink.streaming.api.windowing.policy.CountEvictionPolicy;
+import org.apache.flink.streaming.api.windowing.policy.CountTriggerPolicy;
+import org.apache.flink.streaming.api.windowing.policy.EvictionPolicy;
+import org.apache.flink.streaming.api.windowing.policy.TriggerPolicy;
+
+/**
+ * Represents a count based trigger or eviction policy. Use the
+ * {@link Count#of(int)} to get an instance.
+ */
+@SuppressWarnings("rawtypes")
+public class Count implements WindowingHelper {
+
+	private int count;
+	private int deleteOnEviction = 1;
+	private int startValue = CountTriggerPolicy.DEFAULT_START_VALUE;
+
+	/**
+	 * Specifies on which element a trigger or an eviction should happen (based
+	 * on the count of the elements).
+	 * 
+	 * This constructor does exactly the same as {@link Count#of(int)}.
+	 * 
+	 * @param count
+	 *            the number of elements to count before trigger/evict
+	 */
+	public Count(int count) {
+		this.count = count;
+	}
+
+	@Override
+	public EvictionPolicy<?> toEvict() {
+		return new CountEvictionPolicy(count, deleteOnEviction);
+	}
+
+	@Override
+	public TriggerPolicy<?> toTrigger() {
+		return new CountTriggerPolicy(count, startValue);
+	}
+
+	/**
+	 * Sets the number of elements deleted at each eviction (i.e when the number
+	 * elements exceeds the window size). By default the elements get deleted
+	 * one by one (deleteOnEvition = 1)
+	 * 
+	 * @param deleteOnEviction
+	 *            The number of elements deleted at each evition
+	 * @return Helper representing the count based policy
+	 * 
+	 */
+	public Count withDelete(int deleteOnEviction) {
+		this.deleteOnEviction = deleteOnEviction;
+		return this;
+	}
+
+	/**
+	 * Sets the initial value of the counter. 0 by default
+	 * 
+	 * @param startValue
+	 *            Starting value of the window counter
+	 * @return Helper representing the count based policy
+	 * 
+	 */
+	public Count startingAt(int startValue) {
+		this.startValue = startValue;
+		return this;
+	}
+
+	/**
+	 * Specifies a count based eviction (window size) or trigger policy (slide
+	 * size). For eviction 'count' defines the number of elements in each
+	 * window. For trigger 'count' defines how often do we call the user
+	 * function in terms of number of elements received.
+	 * 
+	 * @param count
+	 *            the number of elements to count before trigger/evict
+	 * @return Helper representing the count based policy
+	 */
+	public static Count of(int count) {
+		return new Count(count);
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/Delta.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/Delta.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/Delta.java
new file mode 100644
index 0000000..5434a4e
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/Delta.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.windowing.helper;
+
+import org.apache.flink.streaming.api.windowing.deltafunction.DeltaFunction;
+import org.apache.flink.streaming.api.windowing.policy.DeltaPolicy;
+import org.apache.flink.streaming.api.windowing.policy.EvictionPolicy;
+import org.apache.flink.streaming.api.windowing.policy.TriggerPolicy;
+
+/**
+ * This helper represents a trigger or eviction policy based on a
+ * {@link DeltaFunction}.
+ * 
+ * @param <DATA>
+ *            the data type handled by the delta function represented by this
+ *            helper.
+ */
+public class Delta<DATA> implements WindowingHelper<DATA> {
+
+	private DeltaFunction<DATA> deltaFunction;
+	private DATA initVal;
+	private double threshold;
+
+	/**
+	 * Creates a delta helper representing a delta count or eviction policy
+	 * 
+	 * @param deltaFunction
+	 *            The delta function which should be used to calculate the delta
+	 *            between points.
+	 * @param initVal
+	 *            The initial value which will be used to calculate the first
+	 *            delta.
+	 * @param threshold
+	 *            The threshold used by the delta function.
+	 */
+	public Delta(DeltaFunction<DATA> deltaFunction, DATA initVal, double threshold) {
+		this.deltaFunction = deltaFunction;
+		this.initVal = initVal;
+		this.threshold = threshold;
+	}
+
+	@Override
+	public EvictionPolicy<DATA> toEvict() {
+		return new DeltaPolicy<DATA>(deltaFunction, initVal, threshold);
+	}
+
+	@Override
+	public TriggerPolicy<DATA> toTrigger() {
+		return new DeltaPolicy<DATA>(deltaFunction, initVal, threshold);
+	}
+
+	/**
+	 * Creates a delta helper representing a delta trigger or eviction policy.
+	 * </br></br> This policy calculates a delta between the data point which
+	 * triggered last and the currently arrived data point. It triggers if the
+	 * delta is higher than a specified threshold. </br></br> In case it gets
+	 * used for eviction, this policy starts from the first element of the
+	 * buffer and removes all elements from the buffer which have a higher delta
+	 * then the threshold. As soon as there is an element with a lower delta,
+	 * the eviction stops.
+	 * 
+	 * @param deltaFunction
+	 *            The delta function which should be used to calculate the delta
+	 *            between points.
+	 * @param initVal
+	 *            The initial value which will be used to calculate the first
+	 *            delta.
+	 * @param threshold
+	 *            The threshold used by the delta function.
+	 * @return Helper representing a delta trigger or eviction policy
+	 */
+	public static <DATA> Delta<DATA> of(double threshold, DeltaFunction<DATA> deltaFunction,
+			DATA initVal) {
+		return new Delta<DATA>(deltaFunction, initVal, threshold);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/SystemTimestamp.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/SystemTimestamp.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/SystemTimestamp.java
new file mode 100644
index 0000000..8581ac5
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/SystemTimestamp.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.windowing.helper;
+
+/**
+ * {@link Timestamp} implementation to be used when system time is needed to
+ * determine windows
+ */
+public class SystemTimestamp<T> implements Timestamp<T> {
+
+	private static final long serialVersionUID = 1L;
+
+	@Override
+	public long getTimestamp(T value) {
+		return System.currentTimeMillis();
+	}
+
+	public static <R> TimestampWrapper<R> getWrapper() {
+		return new TimestampWrapper<R>(new SystemTimestamp<R>(), System.currentTimeMillis());
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/Time.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/Time.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/Time.java
new file mode 100644
index 0000000..f94eea4
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/Time.java
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.windowing.helper;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.flink.streaming.api.windowing.policy.EvictionPolicy;
+import org.apache.flink.streaming.api.windowing.policy.TimeEvictionPolicy;
+import org.apache.flink.streaming.api.windowing.policy.TimeTriggerPolicy;
+import org.apache.flink.streaming.api.windowing.policy.TriggerPolicy;
+
+/**
+ * This helper represents a time based count or eviction policy. By default the
+ * time is measured with {@link System#currentTimeMillis()} in
+ * {@link DefaultTimeStamp}.
+ * 
+ * @param <DATA>
+ *            The data type which is handled by the time stamp used in the
+ *            policy represented by this helper
+ */
+public class Time<DATA> implements WindowingHelper<DATA> {
+
+	protected long length;
+	protected TimeUnit granularity;
+	protected TimestampWrapper<DATA> timestampWrapper;
+	protected long delay;
+
+	/**
+	 * Creates a helper representing a trigger which triggers every given
+	 * length or an eviction which evicts all elements older than length.
+	 * 
+	 * @param length
+	 *            The number of time units
+	 * @param timeUnit
+	 *            The unit of time such as minute oder millisecond. Note that
+	 *            the smallest possible granularity is milliseconds. Any smaller
+	 *            time unit might cause an error at runtime due to conversion
+	 *            problems.
+	 * @param timestamp
+	 *            The user defined timestamp that will be used to extract time
+	 *            information from the incoming elements
+	 * @param startTime
+	 *            The startTime of the stream for computing the first window
+	 */
+	private Time(long length, TimeUnit timeUnit, Timestamp<DATA> timestamp, long startTime) {
+		this(length, timeUnit, new TimestampWrapper<DATA>(timestamp, startTime));
+	}
+
+	/**
+	 * Creates a helper representing a trigger which triggers every given
+	 * length or an eviction which evicts all elements older than length.
+	 * 
+	 * @param length
+	 *            The number of time units
+	 * @param timeUnit
+	 *            The unit of time such as minute oder millisecond. Note that
+	 *            the smallest possible granularity is milliseconds. Any smaller
+	 *            time unit might cause an error at runtime due to conversion
+	 *            problems.
+	 * @param timestampWrapper
+	 *            The user defined {@link TimestampWrapper} that will be used to
+	 *            extract time information from the incoming elements
+	 */
+	private Time(long length, TimeUnit timeUnit, TimestampWrapper<DATA> timestampWrapper) {
+		this.length = length;
+		this.granularity = timeUnit;
+		this.timestampWrapper = timestampWrapper;
+		this.delay = 0;
+	}
+
+	@Override
+	public EvictionPolicy<DATA> toEvict() {
+		return new TimeEvictionPolicy<DATA>(granularityInMillis(), timestampWrapper);
+	}
+
+	@Override
+	public TriggerPolicy<DATA> toTrigger() {
+		return new TimeTriggerPolicy<DATA>(granularityInMillis(), timestampWrapper, delay);
+	}
+
+	/**
+	 * Creates a helper representing a time trigger which triggers every given
+	 * length (slide size) or a time eviction which evicts all elements older
+	 * than length (window size) using System time.
+	 * 
+	 * @param length
+	 *            The number of time units
+	 * @param timeUnit
+	 *            The unit of time such as minute oder millisecond. Note that
+	 *            the smallest possible granularity is milliseconds. Any smaller
+	 *            time unit might cause an error at runtime due to conversion
+	 *            problems.
+	 * @return Helper representing the time based trigger and eviction policy
+	 */
+	@SuppressWarnings("unchecked")
+	public static <DATA> Time<DATA> of(long length, TimeUnit timeUnit) {
+		return new Time<DATA>(length, timeUnit,
+				(TimestampWrapper<DATA>) SystemTimestamp.getWrapper());
+	}
+
+	/**
+	 * Creates a helper representing a time trigger which triggers every given
+	 * length (slide size) or a time eviction which evicts all elements older
+	 * than length (window size) using a user defined timestamp extractor.
+	 * 
+	 * @param length
+	 *            The number of time units
+	 * @param timestamp
+	 *            The user defined timestamp that will be used to extract time
+	 *            information from the incoming elements
+	 * @param startTime
+	 *            The startTime used to compute the first window
+	 * @return Helper representing the time based trigger and eviction policy
+	 */
+	public static <DATA> Time<DATA> of(long length, Timestamp<DATA> timestamp, long startTime) {
+		return new Time<DATA>(length, null, timestamp, startTime);
+	}
+
+	/**
+	 * Creates a helper representing a time trigger which triggers every given
+	 * length (slide size) or a time eviction which evicts all elements older
+	 * than length (window size) using a user defined timestamp extractor. By
+	 * default the start time is set to 0.
+	 * 
+	 * @param length
+	 *            The number of time units
+	 * @param timestamp
+	 *            The user defined timestamp that will be used to extract time
+	 *            information from the incoming elements
+	 * @return Helper representing the time based trigger and eviction policy
+	 */
+	public static <DATA> Time<DATA> of(long length, Timestamp<DATA> timestamp) {
+		return of(length, timestamp, 0);
+	}
+
+	/**
+	 * Sets the delay for the first processed window.
+	 * 
+	 * @param delay
+	 *            The number of time units before the first processed window.
+	 * @return Helper representing the time based trigger and eviction policy
+	 */
+	public Time<DATA> withDelay(long delay) {
+		this.delay = delay;
+		return this;
+	}
+
+	protected long granularityInMillis() {
+		return granularity == null ? length : granularity.toMillis(length);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/Timestamp.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/Timestamp.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/Timestamp.java
new file mode 100644
index 0000000..fea6020
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/Timestamp.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.windowing.helper;
+
+import java.io.Serializable;
+
+/**
+ * Interface for getting a timestamp from a custom value. Used in window
+ * reduces. In order to work properly, the timestamps must be non-decreasing.
+ *
+ * @param <T>
+ *            Type of the value to create the timestamp from.
+ */
+public interface Timestamp<T> extends Serializable {
+
+	/**
+	 * Values
+	 * 
+	 * @param value
+	 *            The value to create the timestamp from
+	 * @return The timestamp
+	 */
+	public long getTimestamp(T value);
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/TimestampWrapper.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/TimestampWrapper.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/TimestampWrapper.java
new file mode 100644
index 0000000..8c3a09d
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/TimestampWrapper.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.windowing.helper;
+
+import java.io.Serializable;
+
+public class TimestampWrapper<T> implements Serializable {
+
+	private static final long serialVersionUID = 1L;
+	private long startTime;
+	private Timestamp<T> timestamp;
+
+	public TimestampWrapper(Timestamp<T> timeStamp, long startTime) {
+		this.timestamp = timeStamp;
+		this.startTime = startTime;
+	}
+
+	public long getTimestamp(T in) {
+		return timestamp.getTimestamp(in);
+	}
+
+	public long getStartTime() {
+		return startTime;
+	}
+
+	public boolean isDefaultTimestamp() {
+		return timestamp instanceof SystemTimestamp;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/WindowingHelper.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/WindowingHelper.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/WindowingHelper.java
new file mode 100644
index 0000000..9df8432
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/WindowingHelper.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.windowing.helper;
+
+import org.apache.flink.streaming.api.windowing.policy.EvictionPolicy;
+import org.apache.flink.streaming.api.windowing.policy.TriggerPolicy;
+
+/**
+ * A helper representing a count or eviction policy. Such helper classes are
+ * used to provide a nice and well readable API.
+ * 
+ * @param <DATA>
+ *            the type of input data handled by this helper
+ * @see Count
+ * @see Time
+ * @see Delta
+ */
+public interface WindowingHelper<DATA> {
+
+	public EvictionPolicy<DATA> toEvict();
+
+	public TriggerPolicy<DATA> toTrigger();
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/ActiveCloneableEvictionPolicyWrapper.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/ActiveCloneableEvictionPolicyWrapper.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/ActiveCloneableEvictionPolicyWrapper.java
new file mode 100644
index 0000000..29ba9eb
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/ActiveCloneableEvictionPolicyWrapper.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.windowing.policy;
+
+/**
+ * The {@link ActiveEvictionPolicy} wraps around a non active
+ * {@link EvictionPolicy}. It forwards all calls to
+ * {@link ActiveEvictionPolicy#notifyEvictionWithFakeElement(Object, int)} to
+ * {@link EvictionPolicy#notifyEviction(Object, boolean, int)} while the
+ * triggered parameter will be set to true.
+ * 
+ * This class additionally implements the clone method and can wrap around
+ * {@link CloneableEvictionPolicy} to make it active.
+ * 
+ * @param <DATA>
+ *            The data type handled by this policy
+ */
+public class ActiveCloneableEvictionPolicyWrapper<DATA> extends ActiveEvictionPolicyWrapper<DATA>
+		implements CloneableEvictionPolicy<DATA> {
+
+	/**
+	 * Auto generated version ID
+	 */
+	private static final long serialVersionUID = 1520261575300622769L;
+	CloneableEvictionPolicy<DATA> nestedPolicy;
+
+	/**
+	 * Creates a wrapper which activates the eviction policy which is wrapped
+	 * in. This means that the nested policy will get called on fake elements as
+	 * well as on real elements.
+	 * 
+	 * This specialized version of the {@link ActiveEvictionPolicyWrapper} works
+	 * with {@link CloneableEvictionPolicy} and is thereby cloneable as well.
+	 * 
+	 * @param nestedPolicy
+	 *            The policy which should be activated/wrapped in.
+	 */
+	public ActiveCloneableEvictionPolicyWrapper(CloneableEvictionPolicy<DATA> nestedPolicy) {
+		super(nestedPolicy);
+		this.nestedPolicy = nestedPolicy;
+	}
+
+	@Override
+	public ActiveCloneableEvictionPolicyWrapper<DATA> clone() {
+		return new ActiveCloneableEvictionPolicyWrapper<DATA>(nestedPolicy.clone());
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/ActiveEvictionPolicy.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/ActiveEvictionPolicy.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/ActiveEvictionPolicy.java
new file mode 100644
index 0000000..fe172bc
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/ActiveEvictionPolicy.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.windowing.policy;
+
+/**
+ * This interface is used for active eviction policies. beside the functionality
+ * inherited from {@link EvictionPolicy} it provides a method which gets called
+ * to notify on fake elements.
+ * 
+ * In case an eviction policy implements this interface instead of the
+ * {@link EvictionPolicy} interface, not only the real but also fake data points
+ * will cause a notification of the eviction.
+ * 
+ * Fake data points are mostly used in windowing based on time to trigger and
+ * evict even if no element arrives at all during a windows duration.
+ */
+public interface ActiveEvictionPolicy<DATA> extends EvictionPolicy<DATA> {
+
+	/**
+	 * Proves if and how many elements should be deleted from the element
+	 * buffer. The eviction takes place after the trigger and after the call to
+	 * the UDF. This method is only called with fake elements.
+	 * 
+	 * Note: Fake elements are always considered as triggered. Therefore this
+	 * method does not have a triggered parameter.
+	 * 
+	 * @param datapoint
+	 *            the current fake data point
+	 * @param bufferSize
+	 *            the current size of the buffer (only real elements are
+	 *            counted)
+	 * @return the number of elements to delete from the buffer (only real
+	 *         elements are counted)
+	 */
+	public int notifyEvictionWithFakeElement(Object datapoint, int bufferSize);
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/ActiveEvictionPolicyWrapper.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/ActiveEvictionPolicyWrapper.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/ActiveEvictionPolicyWrapper.java
new file mode 100644
index 0000000..b3b6935
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/ActiveEvictionPolicyWrapper.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.windowing.policy;
+
+/**
+ * This {@link ActiveEvictionPolicy} wraps around a non active
+ * {@link EvictionPolicy}. It forwards all calls to
+ * {@link ActiveEvictionPolicy#notifyEvictionWithFakeElement(Object, int)} to
+ * {@link EvictionPolicy#notifyEviction(Object, boolean, int)} while the
+ * triggered parameter will be set to true.
+ * 
+ * @param <DATA>
+ *            The data type handled by this policy
+ */
+public class ActiveEvictionPolicyWrapper<DATA> implements ActiveEvictionPolicy<DATA> {
+
+	/**
+	 * Auto generated version ID
+	 */
+	private static final long serialVersionUID = -7656558669799505882L;
+	private EvictionPolicy<DATA> nestedPolicy;
+
+	/**
+	 * Creates a wrapper which activates the eviction policy which is wrapped
+	 * in. This means that the nested policy will get called on fake elements as
+	 * well as on real elements.
+	 * 
+	 * @param nestedPolicy
+	 *            The policy which should be activated/wrapped in.
+	 */
+	public ActiveEvictionPolicyWrapper(EvictionPolicy<DATA> nestedPolicy) {
+		if (nestedPolicy == null) {
+			throw new RuntimeException("The nested policy must not be null.");
+		}
+		this.nestedPolicy = nestedPolicy;
+	}
+
+	@Override
+	public int notifyEviction(DATA datapoint, boolean triggered, int bufferSize) {
+		return nestedPolicy.notifyEviction(datapoint, triggered, bufferSize);
+	}
+
+	@SuppressWarnings("unchecked")
+	@Override
+	public int notifyEvictionWithFakeElement(Object datapoint, int bufferSize) {
+		return nestedPolicy.notifyEviction((DATA) datapoint, true, bufferSize);
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/ActiveTriggerCallback.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/ActiveTriggerCallback.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/ActiveTriggerCallback.java
new file mode 100644
index 0000000..c44be37
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/ActiveTriggerCallback.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.windowing.policy;
+
+/**
+ * In case an {@link ActiveTriggerPolicy} is used, it can implement own
+ * {@link Runnable} classes. Such {@link Runnable} classes will be executed as
+ * an own thread and can submit fake elements, to the element buffer at any
+ * time.
+ * 
+ * The factory method for runnables of the {@link ActiveTriggerPolicy} gets an
+ * instance of this interface as parameter. The describes adding of elements can
+ * be done by the runnable using the methods provided in this interface.
+ * 
+ */
+public interface ActiveTriggerCallback {
+
+	/**
+	 * Submits a new fake data point to the element buffer. Such a fake element
+	 * might be used to trigger at any time, but will never be included in the
+	 * result of the reduce function. The submission of a fake element causes
+	 * notifications only at the {@link ActiveTriggerPolicy} and
+	 * {@link ActiveEvictionPolicy} implementations.
+	 * 
+	 * @param datapoint
+	 *            the fake data point to be added
+	 */
+	public void sendFakeElement(Object datapoint);
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/ActiveTriggerPolicy.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/ActiveTriggerPolicy.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/ActiveTriggerPolicy.java
new file mode 100644
index 0000000..abe5298
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/ActiveTriggerPolicy.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.windowing.policy;
+
+import org.apache.flink.streaming.api.windowing.helper.Timestamp;
+
+/**
+ * This interface extends the {@link TriggerPolicy} interface with functionality
+ * for active triggers. Active triggers can act in two ways:
+ * 
+ * 1) Whenever an element arrives at the invokable, the
+ * {@link ActiveTriggerPolicy#preNotifyTrigger(Object)} method gets called
+ * first. It can return zero ore more fake data points which will be added
+ * before the currently arrived real element gets processed. This allows to
+ * handle empty windows in time based windowing with an user defined
+ * {@link Timestamp}. Triggers are not called on fake datapoint. A fake
+ * datapoint is always considered as triggered.
+ * 
+ * 2) An active trigger has a factory method for a runnable. This factory method
+ * gets called at the start up of the invokable. The returned runnable will be
+ * executed in its own thread and can submit fake elements at any time threw an
+ * {@link ActiveTriggerCallback}. This allows to have time based triggers based
+ * on any system internal time measure. Triggers are not called on fake
+ * datapoint. A fake datapoints is always considered as triggered.
+ * 
+ * @param <DATA>
+ *            The data type which can be handled by this policy
+ */
+public interface ActiveTriggerPolicy<DATA> extends TriggerPolicy<DATA> {
+
+	/**
+	 * Whenever an element arrives at the invokable, the
+	 * {@link ActiveTriggerPolicy#preNotifyTrigger(Object)} method gets called
+	 * first. It can return zero ore more fake data points which will be added
+	 * before the the currently arrived real element gets processed. This allows
+	 * to handle empty windows in time based windowing with an user defined
+	 * {@link Timestamp}. Triggers are not called on fake datapoints. A fake
+	 * datapoint is always considered as triggered.
+	 * 
+	 * @param datapoint
+	 *            the data point which arrived at the invokable
+	 * @return zero ore more fake data points which will be added before the the
+	 *         currently arrived real element gets processed.
+	 */
+	public Object[] preNotifyTrigger(DATA datapoint);
+
+	/**
+	 * This is the factory method for a runnable. This factory method gets
+	 * called at the start up of the invokable. The returned runnable will be
+	 * executed in its own thread and can submit fake elements at any time threw
+	 * an {@link ActiveTriggerCallback}. This allows to have time based triggers
+	 * based on any system internal time measure. Triggers are not called on
+	 * fake datapoints. A fake datapoint is always considered as triggered.
+	 * 
+	 * @param callback
+	 *            A callback object which allows to add fake elements from
+	 *            within the returned {@link Runnable}.
+	 * @return The runnable implementation or null in case there is no. In case
+	 *         an {@link ActiveTriggerPolicy} is used, it can implement own
+	 *         {@link Runnable} classes. Such {@link Runnable} classes will be
+	 *         executed as an own thread and can submit fake elements, to the
+	 *         element buffer at any time.
+	 */
+	public Runnable createActiveTriggerRunnable(ActiveTriggerCallback callback);
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/CloneableEvictionPolicy.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/CloneableEvictionPolicy.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/CloneableEvictionPolicy.java
new file mode 100644
index 0000000..132b495
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/CloneableEvictionPolicy.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.windowing.policy;
+
+import org.apache.flink.streaming.api.invokable.operator.GroupedWindowInvokable;
+
+/**
+ * When used in {@link GroupedWindowInvokable}, eviction policies must
+ * provide a clone method. Eviction policies get cloned to provide an own
+ * instance for each group and respectively each individual element buffer as
+ * groups maintain their own buffers with the elements belonging to the
+ * respective group.
+ * 
+ * This interface extends {@link EvictionPolicy} with such a clone method. It
+ * also adds the Java {@link Cloneable} interface as flag.
+ * 
+ * @param <DATA>
+ *            The data type handled by this policy
+ */
+public interface CloneableEvictionPolicy<DATA> extends EvictionPolicy<DATA>, Cloneable {
+
+	/**
+	 * This method should return an exact copy of the object it belongs to
+	 * including the current object state.
+	 * 
+	 * @return a copy of this object
+	 */
+	public CloneableEvictionPolicy<DATA> clone();
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/CloneableTriggerPolicy.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/CloneableTriggerPolicy.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/CloneableTriggerPolicy.java
new file mode 100644
index 0000000..f5772a1
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/CloneableTriggerPolicy.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.windowing.policy;
+
+import org.apache.flink.streaming.api.invokable.operator.GroupedWindowInvokable;
+
+/**
+ * When used in {@link GroupedWindowInvokable}, trigger policies can provide
+ * a clone method. Cloneable triggers can can be used in a distributed manner,
+ * which means they get cloned to provide an own instance for each group. This
+ * allows each group to trigger individually and only based on the elements
+ * belonging to the respective group.
+ * 
+ * This interface extends {@link TriggerPolicy} with such a clone method. It
+ * also adds the Java {@link Cloneable} interface as flag.
+ * 
+ * @param <DATA>
+ *            The data type handled by this policy
+ */
+public interface CloneableTriggerPolicy<DATA> extends TriggerPolicy<DATA>, Cloneable {
+
+	/**
+	 * This method should return an exact copy of the object it belongs to
+	 * including the current object state.
+	 * 
+	 * @return a copy of this object
+	 */
+	public CloneableTriggerPolicy<DATA> clone();
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/CountEvictionPolicy.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/CountEvictionPolicy.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/CountEvictionPolicy.java
new file mode 100644
index 0000000..a9e7b3f
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/CountEvictionPolicy.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.windowing.policy;
+
+/**
+ * This eviction policy allows the eviction of data points from the buffer using
+ * a counter of arriving elements and a threshold (maximal buffer size)
+ * 
+ * By default this policy does not react on fake elements. Wrap it in an
+ * {@link ActiveEvictionPolicyWrapper} to make it count even fake elements.
+ * 
+ * @param <IN>
+ *            the type of the incoming data points
+ */
+public class CountEvictionPolicy<IN> implements CloneableEvictionPolicy<IN> {
+
+	/**
+	 * Auto generated version id
+	 */
+	private static final long serialVersionUID = 2319201348806427996L;
+
+	int maxElements;
+	int counter;
+	int deleteOnEviction = 1;
+
+	/**
+	 * This constructor allows the setup of the simplest possible count based
+	 * eviction. It keeps the size of the buffer according to the given
+	 * maxElements parameter by deleting the oldest element in the buffer.
+	 * Eviction only takes place if the counter of arriving elements would be
+	 * higher than maxElements without eviction.
+	 * 
+	 * @param maxElements
+	 *            The maximum number of elements before eviction. As soon as one
+	 *            more element arrives, the oldest element will be deleted
+	 */
+	public CountEvictionPolicy(int maxElements) {
+		this(maxElements, 1);
+	}
+
+	/**
+	 * This constructor allows to set up both, the maximum number of elements
+	 * and the number of elements to be deleted in case of an eviction.
+	 * 
+	 * Eviction only takes place if the counter of arriving elements would be
+	 * higher than maxElements without eviction. In such a case deleteOnEviction
+	 * elements will be removed from the buffer.
+	 * 
+	 * The counter of arriving elements is adjusted respectively, but never set
+	 * below zero:
+	 * counter=(counter-deleteOnEviction<0)?0:counter-deleteOnEviction
+	 * 
+	 * @param maxElements
+	 *            maxElements The maximum number of elements before eviction.
+	 * @param deleteOnEviction
+	 *            The number of elements to be deleted on eviction. The counter
+	 *            will be adjusted respectively but never below zero.
+	 */
+	public CountEvictionPolicy(int maxElements, int deleteOnEviction) {
+		this(maxElements, deleteOnEviction, 0);
+	}
+
+	/**
+	 * The same as {@link CountEvictionPolicy#CountEvictionPolicy(int, int)}.
+	 * Additionally a custom start value for the counter of arriving elements
+	 * can be set. By setting a negative start value the first eviction can be
+	 * delayed.
+	 * 
+	 * @param maxElements
+	 *            maxElements The maximum number of elements before eviction.
+	 * @param deleteOnEviction
+	 *            The number of elements to be deleted on eviction. The counter
+	 *            will be adjusted respectively but never below zero.
+	 * @param startValue
+	 *            A custom start value for the counter of arriving elements.
+	 * @see CountEvictionPolicy#NextGenCountEvictionPolicy(int, int)
+	 */
+	public CountEvictionPolicy(int maxElements, int deleteOnEviction, int startValue) {
+		this.counter = startValue;
+		this.deleteOnEviction = deleteOnEviction;
+		this.maxElements = maxElements;
+	}
+
+	@Override
+	public int notifyEviction(IN datapoint, boolean triggered, int bufferSize) {
+		// The comparison have to be >= and not == to cover case max=0
+		if (counter >= maxElements) {
+			// Adjust the counter according to the current eviction
+			counter = (counter - deleteOnEviction < 0) ? 0 : counter - deleteOnEviction;
+			// The current element will be added after the eviction
+			// Therefore, increase counter in any case
+			counter++;
+			return deleteOnEviction;
+		} else {
+			counter++;
+			return 0;
+		}
+	}
+
+	@Override
+	public CountEvictionPolicy<IN> clone() {
+		return new CountEvictionPolicy<IN>(maxElements, deleteOnEviction, counter);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/CountTriggerPolicy.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/CountTriggerPolicy.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/CountTriggerPolicy.java
new file mode 100644
index 0000000..7a3c75a
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/CountTriggerPolicy.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.windowing.policy;
+
+/**
+ * This policy triggers at every n'th element.
+ * 
+ * @param <IN>
+ *            The type of the data points which are handled by this policy
+ */
+public class CountTriggerPolicy<IN> implements CloneableTriggerPolicy<IN> {
+
+	/**
+	 * Auto generated version ID
+	 */
+	private static final long serialVersionUID = -6357200688886103968L;
+
+	public static final int DEFAULT_START_VALUE = 0;
+
+	private int counter;
+	private int max;
+
+	/**
+	 * This constructor will set up a count based trigger, which triggers after
+	 * max elements have arrived.
+	 * 
+	 * @param max
+	 *            The number of arriving elements before the trigger occurs.
+	 */
+	public CountTriggerPolicy(int max) {
+		this(max, DEFAULT_START_VALUE);
+	}
+
+	/**
+	 * In addition to {@link CountTriggerPolicy#CountTriggerPolicy(int)} this
+	 * constructor allows to set a custom start value for the element counter.
+	 * This can be used to delay the first trigger by setting a negative start
+	 * value. Often the first trigger should be delayed in case of sliding
+	 * windows. For example if the size of a window should be 4 and a trigger
+	 * should happen every 2, a start value of -2 would allow to also have the
+	 * first window of size 4.
+	 * 
+	 * @param max
+	 *            The number of arriving elements before the trigger occurs.
+	 * @param startValue
+	 *            The start value for the counter of arriving elements.
+	 * @see CountTriggerPolicy#CountTriggerPolicy(int)
+	 */
+	public CountTriggerPolicy(int max, int startValue) {
+		this.max = max;
+		this.counter = startValue;
+	}
+
+	@Override
+	public boolean notifyTrigger(IN datapoint) {
+		// The comparison have to be >= and not == to cover case max=0
+		if (counter >= max) {
+			// The current data point will be part of the next window!
+			// Therefore the counter needs to be set to one already.
+			counter = 1;
+			return true;
+		} else {
+			counter++;
+			return false;
+		}
+	}
+
+	@Override
+	public CountTriggerPolicy<IN> clone() {
+		return new CountTriggerPolicy<IN>(max, counter);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/DeltaPolicy.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/DeltaPolicy.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/DeltaPolicy.java
new file mode 100644
index 0000000..77bc692
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/DeltaPolicy.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.windowing.policy;
+
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.flink.streaming.api.windowing.deltafunction.DeltaFunction;
+
+/**
+ * This policy calculates a delta between the data point which triggered last
+ * and the currently arrived data point. It triggers if the delta is higher than
+ * a specified threshold.
+ * 
+ * In case it gets used for eviction, this policy starts from the first element
+ * of the buffer and removes all elements from the buffer which have a higher
+ * delta then the threshold. As soon as there is an element with a lower delta,
+ * the eviction stops.
+ * 
+ * By default this policy does not react on fake elements. Wrap it in an
+ * {@link ActiveEvictionPolicyWrapper} to make it calculate the delta even on
+ * fake elements.
+ * 
+ * @param <DATA>
+ *            The type of the data points which are handled by this policy
+ */
+public class DeltaPolicy<DATA> implements CloneableTriggerPolicy<DATA>,
+		CloneableEvictionPolicy<DATA> {
+
+	/**
+	 * Auto generated version ID
+	 */
+	private static final long serialVersionUID = -7797538922123394967L;
+
+	private DeltaFunction<DATA> deltaFuntion;
+	private List<DATA> windowBuffer;
+	private double threshold;
+	private DATA triggerDataPoint;
+
+	/**
+	 * Crates a delta policy which calculates a delta between the data point
+	 * which triggered last and the currently arrived data point. It triggers if
+	 * the delta is higher than a specified threshold.
+	 * 
+	 * In case it gets used for eviction, this policy starts from the first
+	 * element of the buffer and removes all elements from the buffer which have
+	 * a higher delta then the threshold. As soon as there is an element with a
+	 * lower delta, the eviction stops.
+	 * 
+	 * @param deltaFuntion
+	 *            The delta function to be used.
+	 * @param init
+	 *            The initial to be used for the calculation of a delta before
+	 *            the first trigger.
+	 * @param threshold
+	 *            The threshold upon which a triggering should happen.
+	 */
+	public DeltaPolicy(DeltaFunction<DATA> deltaFuntion, DATA init, double threshold) {
+		this.deltaFuntion = deltaFuntion;
+		this.triggerDataPoint = init;
+		this.windowBuffer = new LinkedList<DATA>();
+		this.threshold = threshold;
+	}
+
+	@Override
+	public boolean notifyTrigger(DATA datapoint) {
+		if (deltaFuntion.getDelta(this.triggerDataPoint, datapoint) > this.threshold) {
+			this.triggerDataPoint = datapoint;
+			return true;
+		} else {
+			return false;
+		}
+	}
+
+	@Override
+	public int notifyEviction(DATA datapoint, boolean triggered, int bufferSize) {
+		windowBuffer = windowBuffer.subList(windowBuffer.size() - bufferSize, bufferSize);
+		int evictCount = 0;
+		for (DATA bufferPoint : windowBuffer) {
+			if (deltaFuntion.getDelta(bufferPoint, datapoint) < this.threshold) {
+				break;
+			}
+			evictCount++;
+		}
+
+		if (evictCount > 0) {
+			windowBuffer = windowBuffer.subList(evictCount, windowBuffer.size());
+		}
+		windowBuffer.add(datapoint);
+		return evictCount;
+	}
+
+	@Override
+	public DeltaPolicy<DATA> clone() {
+		return new DeltaPolicy<DATA>(deltaFuntion, triggerDataPoint, threshold);
+	}
+}


Mime
View raw message