flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rmetz...@apache.org
Subject [35/52] [partial] flink git commit: [FLINK-1452] Rename 'flink-addons' to 'flink-staging'
Date Mon, 02 Feb 2015 18:42:13 GMT
http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/deltafunction/EuclideanDistance.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/deltafunction/EuclideanDistance.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/deltafunction/EuclideanDistance.java
deleted file mode 100644
index 9d055d5..0000000
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/deltafunction/EuclideanDistance.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.windowing.deltafunction;
-
-import org.apache.flink.streaming.api.windowing.extractor.Extractor;
-
-/**
- * This delta function calculates the euclidean distance between two given
- * points.
- * 
- * Euclidean distance: http://en.wikipedia.org/wiki/Euclidean_distance
- * 
- * @param <DATA>
- *            The input data type. This delta function works with a double[],
- *            but can extract/convert to it from any other given object in case
- *            the respective extractor has been set. See
- *            {@link ExtractionAwareDeltaFunction} for more information.
- */
-public class EuclideanDistance<DATA> extends ExtractionAwareDeltaFunction<DATA, double[]> {
-
-	public EuclideanDistance() {
-		super(null);
-	}
-
-	public EuclideanDistance(Extractor<DATA, double[]> converter) {
-		super(converter);
-	}
-
-	/**
-	 * auto-generated version id
-	 */
-	private static final long serialVersionUID = 3119432599634512359L;
-
-	@Override
-	public double getNestedDelta(double[] oldDataPoint, double[] newDataPoint) {
-		double result = 0;
-		for (int i = 0; i < oldDataPoint.length; i++) {
-			result += (oldDataPoint[i] - newDataPoint[i]) * (oldDataPoint[i] - newDataPoint[i]);
-		}
-		return Math.sqrt(result);
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/deltafunction/ExtractionAwareDeltaFunction.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/deltafunction/ExtractionAwareDeltaFunction.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/deltafunction/ExtractionAwareDeltaFunction.java
deleted file mode 100644
index 81d01a4..0000000
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/deltafunction/ExtractionAwareDeltaFunction.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.windowing.deltafunction;
-
-import org.apache.flink.streaming.api.windowing.extractor.Extractor;
-
-/**
- * Extend this abstract class to implement a delta function which is aware of
- * extracting the data on which the delta is calculated from a more complex data
- * structure. For example in case you want to be able to run a delta only on one
- * field of a Tuple type or only on some fields from an array.
- * 
- * @param <DATA>
- *            The input data type. The input of this type will be passed to the
- *            extractor which will transform into a TO-object. The delta
- *            function then runs on this TO-object.
- * @param <TO>
- *            The type on which the delta function runs. (The type of the delta
- *            function)
- */
-public abstract class ExtractionAwareDeltaFunction<DATA, TO> implements DeltaFunction<DATA> {
-
-	/**
-	 * Generated Version ID
-	 */
-	private static final long serialVersionUID = 6927486219702689554L;
-	private Extractor<DATA, TO> converter;
-
-	public ExtractionAwareDeltaFunction(Extractor<DATA, TO> converter) {
-		this.converter = converter;
-	}
-
-	/**
-	 * This method takes the two data point and runs the set extractor on it.
-	 * The delta function implemented at {@link getNestedDelta} is then called
-	 * with the extracted data. In case no extractor is set the input data gets
-	 * passes to {@link getNestedDelta} as-is. The return value is just
-	 * forwarded from {@link getNestedDelta}.
-	 * 
-	 * @param oldDataPoint
-	 *            the older data point as raw data (before extraction).
-	 * @param newDataPoint
-	 *            the new data point as raw data (before extraction).
-	 * @return the delta between the two points.
-	 */
-	@SuppressWarnings("unchecked")
-	@Override
-	public double getDelta(DATA oldDataPoint, DATA newDataPoint) {
-		if (converter == null) {
-			// In case no conversion/extraction is required, we can cast DATA to
-			// TO
-			// => Therefore, "unchecked" warning is suppressed for this method.
-			return getNestedDelta((TO) oldDataPoint, (TO) newDataPoint);
-		} else {
-			return getNestedDelta(converter.extract(oldDataPoint), converter.extract(newDataPoint));
-		}
-
-	}
-
-	/**
-	 * This method is exactly the same as
-	 * {@link DeltaFunction#getDelta(Object, Object)} except that it gets the
-	 * result of the previously done extractions as input. Therefore, this
-	 * method only does the actual calculation of the delta but no data
-	 * extraction or conversion.
-	 * 
-	 * @param oldDataPoint
-	 *            the older data point.
-	 * @param newDataPoint
-	 *            the new data point.
-	 * @return the delta between the two points.
-	 */
-	public abstract double getNestedDelta(TO oldDataPoint, TO newDataPoint);
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/extractor/ArrayFromTuple.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/extractor/ArrayFromTuple.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/extractor/ArrayFromTuple.java
deleted file mode 100644
index ee878ac..0000000
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/extractor/ArrayFromTuple.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.windowing.extractor;
-
-import org.apache.flink.api.java.tuple.Tuple;
-
-/**
- * Converts a Tuple to an Object-Array. The field which should be included in
- * the array can selected and reordered as needed.
- */
-public class ArrayFromTuple implements Extractor<Tuple, Object[]> {
-
-	/**
-	 * Auto generated version id
-	 */
-	private static final long serialVersionUID = -6076121226427616818L;
-	int[] order = null;
-
-	/**
-	 * Using this constructor the extractor will convert the whole tuple (all
-	 * fields in the original order) to an array.
-	 */
-	public ArrayFromTuple() {
-		// noting to do
-	}
-
-	/**
-	 * Using this constructor the extractor will combine the fields as specified
-	 * in the indexes parameter in an object array.
-	 * 
-	 * @param indexes
-	 *            the field ids (enumerated from 0)
-	 */
-	public ArrayFromTuple(int... indexes) {
-		this.order = indexes;
-	}
-
-	@Override
-	public Object[] extract(Tuple in) {
-		Object[] output;
-
-		if (order == null) {
-			// copy the whole tuple
-			output = new Object[in.getArity()];
-			for (int i = 0; i < in.getArity(); i++) {
-				output[i] = in.getField(i);
-			}
-		} else {
-			// copy user specified order
-			output = new Object[order.length];
-			for (int i = 0; i < order.length; i++) {
-				output[i] = in.getField(order[i]);
-			}
-		}
-
-		return output;
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/extractor/ConcatinatedExtract.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/extractor/ConcatinatedExtract.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/extractor/ConcatinatedExtract.java
deleted file mode 100644
index 9048a3c..0000000
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/extractor/ConcatinatedExtract.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.windowing.extractor;
-
-/**
- * Combines two extractors which will be executed one after each other.
- *
- * @param <FROM>
- *            The input type of the first extractor.
- * @param <OVER>
- *            The output type of the first and the input type of the second
- *            extractor.
- * @param <TO>
- *            The output type of the second extractor and the output type of the
- *            over all extraction.
- */
-public class ConcatinatedExtract<FROM, OVER, TO> implements Extractor<FROM, TO> {
-
-	/**
-	 * auto-generated id
-	 */
-	private static final long serialVersionUID = -7807197760725651752L;
-
-	private Extractor<FROM, OVER> e1;
-	private Extractor<OVER, TO> e2;
-
-	/**
-	 * Combines two extractors which will be executed one after each other.
-	 * 
-	 * @param e1
-	 *            First extractor: This extractor gets applied to the input data
-	 *            first. Its output as then passed as input to the second
-	 *            extractor.
-	 * @param e2
-	 *            Second extractor: This extractor gets the output of the first
-	 *            extractor as input. Its output is then the result of the over
-	 *            all extraction.
-	 */
-	public ConcatinatedExtract(Extractor<FROM, OVER> e1, Extractor<OVER, TO> e2) {
-		this.e1 = e1;
-		this.e2 = e2;
-	}
-
-	@Override
-	public TO extract(FROM in) {
-		return e2.extract(e1.extract(in));
-	}
-
-	public <OUT> ConcatinatedExtract<FROM, TO, OUT> add(Extractor<TO, OUT> e3) {
-		return new ConcatinatedExtract<FROM, TO, OUT>(this, e3);
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/extractor/Extractor.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/extractor/Extractor.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/extractor/Extractor.java
deleted file mode 100644
index b103ca3..0000000
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/extractor/Extractor.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.windowing.extractor;
-
-import java.io.Serializable;
-
-/**
- * Extractors allow to extract/convert one type to another. They are mostly used
- * to extract some fields out of a more complex structure (Tuple/Array) to run
- * further calculation on the extraction result.
- * 
- * @param <FROM>
- *            The input data type.
- * @param <TO>
- *            The output data type.
- */
-public interface Extractor<FROM, TO> extends Serializable {
-
-	/**
-	 * Extracts/Converts the given input to an object of the output type
-	 * 
-	 * @param in
-	 *            the input data
-	 * @return the extracted/converted data
-	 */
-	public TO extract(FROM in);
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/extractor/FieldFromArray.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/extractor/FieldFromArray.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/extractor/FieldFromArray.java
deleted file mode 100644
index 0568276..0000000
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/extractor/FieldFromArray.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.windowing.extractor;
-
-import java.lang.reflect.Array;
-
-/**
- * Extracts a single field out of an array.
- * 
- * @param <OUT>
- *            The type of the extracted field.
- */
-public class FieldFromArray<OUT> implements Extractor<Object, OUT> {
-
-	/**
-	 * Auto-gernated version id
-	 */
-	private static final long serialVersionUID = -5161386546695574359L;
-	private int fieldId = 0;
-
-	/**
-	 * Extracts the first field (id 0) from the array
-	 */
-	public FieldFromArray() {
-		// noting to do => will use default 0
-	}
-
-	/**
-	 * Extracts the field with the given id from the array.
-	 * 
-	 * @param fieldId
-	 *            The id of the field which will be extracted from the array.
-	 */
-	public FieldFromArray(int fieldId) {
-		this.fieldId = fieldId;
-	}
-
-	@SuppressWarnings("unchecked")
-	@Override
-	public OUT extract(Object in) {
-		return (OUT) Array.get(in, fieldId);
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/extractor/FieldFromTuple.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/extractor/FieldFromTuple.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/extractor/FieldFromTuple.java
deleted file mode 100644
index 07b38f5..0000000
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/extractor/FieldFromTuple.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.windowing.extractor;
-
-import org.apache.flink.api.java.tuple.Tuple;
-
-/**
- * Extracts a single field out of a tuple.
- * 
- * @param <OUT>
- *            The type of the extracted field.
- */
-public class FieldFromTuple<OUT> implements Extractor<Tuple, OUT> {
-
-	/**
-	 * Auto-gernated version id
-	 */
-	private static final long serialVersionUID = -5161386546695574359L;
-	private int fieldId = 0;
-
-	/**
-	 * Extracts the first field (id 0) from the tuple
-	 */
-	public FieldFromTuple() {
-		// noting to do => will use default 0
-	}
-
-	/**
-	 * Extracts the field with the given id from the tuple.
-	 * 
-	 * @param fieldId
-	 *            The id of the field which will be extracted from the tuple.
-	 */
-	public FieldFromTuple(int fieldId) {
-		this.fieldId = fieldId;
-	}
-
-	@Override
-	public OUT extract(Tuple in) {
-		return in.getField(fieldId);
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/extractor/FieldsFromArray.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/extractor/FieldsFromArray.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/extractor/FieldsFromArray.java
deleted file mode 100644
index 4e98689..0000000
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/extractor/FieldsFromArray.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.windowing.extractor;
-
-import java.lang.reflect.Array;
-
-/**
- * Extracts multiple fields from an array and puts them into a new array of the
- * specified type.
- *
- * @param <OUT>
- *            The type of the output array. If out is set to String, the output
- *            of the extractor will be a String[]. If it is set to String[] the
- *            output will be String[][].
- */
-public class FieldsFromArray<OUT> implements Extractor<Object, OUT[]> {
-
-	/**
-	 * Auto-generated version id
-	 */
-	private static final long serialVersionUID = 8075055384516397670L;
-	private int[] order;
-	private Class<OUT> clazz;
-
-	/**
-	 * Extracts multiple fields from an array and puts them in the given order
-	 * into a new array of the specified type.
-	 * 
-	 * @param clazz
-	 *            the Class object representing the component type of the new
-	 *            array
-	 * @param indexes
-	 *            The indexes of the fields to be extracted. Any order is
-	 *            possible, but not more than 255 fields due to limitations in
-	 *            {@link Array#newInstance(Class, int...)}.
-	 */
-	public FieldsFromArray(Class<OUT> clazz, int... indexes) {
-		this.order = indexes;
-		this.clazz = clazz;
-	}
-
-	@SuppressWarnings("unchecked")
-	@Override
-	public OUT[] extract(Object in) {
-		OUT[] output = (OUT[]) Array.newInstance(clazz, order.length);
-		for (int i = 0; i < order.length; i++) {
-			output[i] = (OUT) Array.get(in, this.order[i]);
-		}
-		return output;
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/extractor/FieldsFromTuple.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/extractor/FieldsFromTuple.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/extractor/FieldsFromTuple.java
deleted file mode 100644
index 1bfc461..0000000
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/extractor/FieldsFromTuple.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.windowing.extractor;
-
-import org.apache.flink.api.java.tuple.Tuple;
-
-/**
- * Extracts one or more fields of the type Double from a tuple and puts them
- * into a new double[]
- */
-public class FieldsFromTuple implements Extractor<Tuple, double[]> {
-
-	/**
-	 * auto generated version id
-	 */
-	private static final long serialVersionUID = -2554079091050273761L;
-	int[] indexes;
-
-	/**
-	 * Extracts one or more fields of the the type Double from a tuple and puts
-	 * them into a new double[] (in the specified order).
-	 * 
-	 * @param indexes
-	 *            The indexes of the fields to be extracted.
-	 */
-	public FieldsFromTuple(int... indexes) {
-		this.indexes = indexes;
-	}
-
-	@Override
-	public double[] extract(Tuple in) {
-		double[] out = new double[indexes.length];
-		for (int i = 0; i < indexes.length; i++) {
-			out[i] = (Double) in.getField(indexes[i]);
-		}
-		return out;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/Count.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/Count.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/Count.java
deleted file mode 100644
index 840546f..0000000
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/Count.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.windowing.helper;
-
-import org.apache.flink.streaming.api.windowing.policy.CountEvictionPolicy;
-import org.apache.flink.streaming.api.windowing.policy.CountTriggerPolicy;
-import org.apache.flink.streaming.api.windowing.policy.EvictionPolicy;
-import org.apache.flink.streaming.api.windowing.policy.TriggerPolicy;
-
-/**
- * Represents a count based trigger or eviction policy. Use the
- * {@link Count#of(int)} to get an instance.
- */
-@SuppressWarnings("rawtypes")
-public class Count implements WindowingHelper {
-
-	private int count;
-	private int deleteOnEviction = 1;
-	private int startValue = CountTriggerPolicy.DEFAULT_START_VALUE;
-
-	/**
-	 * Specifies on which element a trigger or an eviction should happen (based
-	 * on the count of the elements).
-	 * 
-	 * This constructor does exactly the same as {@link Count#of(int)}.
-	 * 
-	 * @param count
-	 *            the number of elements to count before trigger/evict
-	 */
-	public Count(int count) {
-		this.count = count;
-	}
-
-	@Override
-	public EvictionPolicy<?> toEvict() {
-		return new CountEvictionPolicy(count, deleteOnEviction);
-	}
-
-	@Override
-	public TriggerPolicy<?> toTrigger() {
-		return new CountTriggerPolicy(count, startValue);
-	}
-
-	/**
-	 * Sets the number of elements deleted at each eviction (i.e when the number
-	 * elements exceeds the window size). By default the elements get deleted
-	 * one by one (deleteOnEvition = 1)
-	 * 
-	 * @param deleteOnEviction
-	 *            The number of elements deleted at each evition
-	 * @return Helper representing the count based policy
-	 * 
-	 */
-	public Count withDelete(int deleteOnEviction) {
-		this.deleteOnEviction = deleteOnEviction;
-		return this;
-	}
-
-	/**
-	 * Sets the initial value of the counter. 0 by default
-	 * 
-	 * @param startValue
-	 *            Starting value of the window counter
-	 * @return Helper representing the count based policy
-	 * 
-	 */
-	public Count startingAt(int startValue) {
-		this.startValue = startValue;
-		return this;
-	}
-
-	/**
-	 * Specifies a count based eviction (window size) or trigger policy (slide
-	 * size). For eviction 'count' defines the number of elements in each
-	 * window. For trigger 'count' defines how often do we call the user
-	 * function in terms of number of elements received.
-	 * 
-	 * @param count
-	 *            the number of elements to count before trigger/evict
-	 * @return Helper representing the count based policy
-	 */
-	public static Count of(int count) {
-		return new Count(count);
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/Delta.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/Delta.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/Delta.java
deleted file mode 100644
index 5434a4e..0000000
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/Delta.java
+++ /dev/null
@@ -1,91 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.windowing.helper;
-
-import org.apache.flink.streaming.api.windowing.deltafunction.DeltaFunction;
-import org.apache.flink.streaming.api.windowing.policy.DeltaPolicy;
-import org.apache.flink.streaming.api.windowing.policy.EvictionPolicy;
-import org.apache.flink.streaming.api.windowing.policy.TriggerPolicy;
-
-/**
- * This helper represents a trigger or eviction policy based on a
- * {@link DeltaFunction}.
- * 
- * @param <DATA>
- *            the data type handled by the delta function represented by this
- *            helper.
- */
-public class Delta<DATA> implements WindowingHelper<DATA> {
-
-	private DeltaFunction<DATA> deltaFunction;
-	private DATA initVal;
-	private double threshold;
-
-	/**
-	 * Creates a delta helper representing a delta count or eviction policy
-	 * 
-	 * @param deltaFunction
-	 *            The delta function which should be used to calculate the delta
-	 *            between points.
-	 * @param initVal
-	 *            The initial value which will be used to calculate the first
-	 *            delta.
-	 * @param threshold
-	 *            The threshold used by the delta function.
-	 */
-	public Delta(DeltaFunction<DATA> deltaFunction, DATA initVal, double threshold) {
-		this.deltaFunction = deltaFunction;
-		this.initVal = initVal;
-		this.threshold = threshold;
-	}
-
-	@Override
-	public EvictionPolicy<DATA> toEvict() {
-		return new DeltaPolicy<DATA>(deltaFunction, initVal, threshold);
-	}
-
-	@Override
-	public TriggerPolicy<DATA> toTrigger() {
-		return new DeltaPolicy<DATA>(deltaFunction, initVal, threshold);
-	}
-
-	/**
-	 * Creates a delta helper representing a delta trigger or eviction policy.
-	 * </br></br> This policy calculates a delta between the data point which
-	 * triggered last and the currently arrived data point. It triggers if the
-	 * delta is higher than a specified threshold. </br></br> In case it gets
-	 * used for eviction, this policy starts from the first element of the
-	 * buffer and removes all elements from the buffer which have a higher delta
-	 * then the threshold. As soon as there is an element with a lower delta,
-	 * the eviction stops.
-	 * 
-	 * @param deltaFunction
-	 *            The delta function which should be used to calculate the delta
-	 *            between points.
-	 * @param initVal
-	 *            The initial value which will be used to calculate the first
-	 *            delta.
-	 * @param threshold
-	 *            The threshold used by the delta function.
-	 * @return Helper representing a delta trigger or eviction policy
-	 */
-	public static <DATA> Delta<DATA> of(double threshold, DeltaFunction<DATA> deltaFunction,
-			DATA initVal) {
-		return new Delta<DATA>(deltaFunction, initVal, threshold);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/SystemTimestamp.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/SystemTimestamp.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/SystemTimestamp.java
deleted file mode 100644
index 8581ac5..0000000
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/SystemTimestamp.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.windowing.helper;
-
-/**
- * {@link Timestamp} implementation to be used when system time is needed to
- * determine windows
- */
-public class SystemTimestamp<T> implements Timestamp<T> {
-
-	private static final long serialVersionUID = 1L;
-
-	@Override
-	public long getTimestamp(T value) {
-		return System.currentTimeMillis();
-	}
-
-	public static <R> TimestampWrapper<R> getWrapper() {
-		return new TimestampWrapper<R>(new SystemTimestamp<R>(), System.currentTimeMillis());
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/Time.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/Time.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/Time.java
deleted file mode 100644
index f94eea4..0000000
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/Time.java
+++ /dev/null
@@ -1,166 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.windowing.helper;
-
-import java.util.concurrent.TimeUnit;
-
-import org.apache.flink.streaming.api.windowing.policy.EvictionPolicy;
-import org.apache.flink.streaming.api.windowing.policy.TimeEvictionPolicy;
-import org.apache.flink.streaming.api.windowing.policy.TimeTriggerPolicy;
-import org.apache.flink.streaming.api.windowing.policy.TriggerPolicy;
-
-/**
- * This helper represents a time based count or eviction policy. By default the
- * time is measured with {@link System#currentTimeMillis()} in
- * {@link DefaultTimeStamp}.
- * 
- * @param <DATA>
- *            The data type which is handled by the time stamp used in the
- *            policy represented by this helper
- */
-public class Time<DATA> implements WindowingHelper<DATA> {
-
-	protected long length;
-	protected TimeUnit granularity;
-	protected TimestampWrapper<DATA> timestampWrapper;
-	protected long delay;
-
-	/**
-	 * Creates a helper representing a trigger which triggers every given
-	 * length or an eviction which evicts all elements older than length.
-	 * 
-	 * @param length
-	 *            The number of time units
-	 * @param timeUnit
-	 *            The unit of time such as minute oder millisecond. Note that
-	 *            the smallest possible granularity is milliseconds. Any smaller
-	 *            time unit might cause an error at runtime due to conversion
-	 *            problems.
-	 * @param timestamp
-	 *            The user defined timestamp that will be used to extract time
-	 *            information from the incoming elements
-	 * @param startTime
-	 *            The startTime of the stream for computing the first window
-	 */
-	private Time(long length, TimeUnit timeUnit, Timestamp<DATA> timestamp, long startTime) {
-		this(length, timeUnit, new TimestampWrapper<DATA>(timestamp, startTime));
-	}
-
-	/**
-	 * Creates a helper representing a trigger which triggers every given
-	 * length or an eviction which evicts all elements older than length.
-	 * 
-	 * @param length
-	 *            The number of time units
-	 * @param timeUnit
-	 *            The unit of time such as minute oder millisecond. Note that
-	 *            the smallest possible granularity is milliseconds. Any smaller
-	 *            time unit might cause an error at runtime due to conversion
-	 *            problems.
-	 * @param timestampWrapper
-	 *            The user defined {@link TimestampWrapper} that will be used to
-	 *            extract time information from the incoming elements
-	 */
-	private Time(long length, TimeUnit timeUnit, TimestampWrapper<DATA> timestampWrapper) {
-		this.length = length;
-		this.granularity = timeUnit;
-		this.timestampWrapper = timestampWrapper;
-		this.delay = 0;
-	}
-
-	@Override
-	public EvictionPolicy<DATA> toEvict() {
-		return new TimeEvictionPolicy<DATA>(granularityInMillis(), timestampWrapper);
-	}
-
-	@Override
-	public TriggerPolicy<DATA> toTrigger() {
-		return new TimeTriggerPolicy<DATA>(granularityInMillis(), timestampWrapper, delay);
-	}
-
-	/**
-	 * Creates a helper representing a time trigger which triggers every given
-	 * length (slide size) or a time eviction which evicts all elements older
-	 * than length (window size) using System time.
-	 * 
-	 * @param length
-	 *            The number of time units
-	 * @param timeUnit
-	 *            The unit of time such as minute oder millisecond. Note that
-	 *            the smallest possible granularity is milliseconds. Any smaller
-	 *            time unit might cause an error at runtime due to conversion
-	 *            problems.
-	 * @return Helper representing the time based trigger and eviction policy
-	 */
-	@SuppressWarnings("unchecked")
-	public static <DATA> Time<DATA> of(long length, TimeUnit timeUnit) {
-		return new Time<DATA>(length, timeUnit,
-				(TimestampWrapper<DATA>) SystemTimestamp.getWrapper());
-	}
-
-	/**
-	 * Creates a helper representing a time trigger which triggers every given
-	 * length (slide size) or a time eviction which evicts all elements older
-	 * than length (window size) using a user defined timestamp extractor.
-	 * 
-	 * @param length
-	 *            The number of time units
-	 * @param timestamp
-	 *            The user defined timestamp that will be used to extract time
-	 *            information from the incoming elements
-	 * @param startTime
-	 *            The startTime used to compute the first window
-	 * @return Helper representing the time based trigger and eviction policy
-	 */
-	public static <DATA> Time<DATA> of(long length, Timestamp<DATA> timestamp, long startTime) {
-		return new Time<DATA>(length, null, timestamp, startTime);
-	}
-
-	/**
-	 * Creates a helper representing a time trigger which triggers every given
-	 * length (slide size) or a time eviction which evicts all elements older
-	 * than length (window size) using a user defined timestamp extractor. By
-	 * default the start time is set to 0.
-	 * 
-	 * @param length
-	 *            The number of time units
-	 * @param timestamp
-	 *            The user defined timestamp that will be used to extract time
-	 *            information from the incoming elements
-	 * @return Helper representing the time based trigger and eviction policy
-	 */
-	public static <DATA> Time<DATA> of(long length, Timestamp<DATA> timestamp) {
-		return of(length, timestamp, 0);
-	}
-
-	/**
-	 * Sets the delay for the first processed window.
-	 * 
-	 * @param delay
-	 *            The number of time units before the first processed window.
-	 * @return Helper representing the time based trigger and eviction policy
-	 */
-	public Time<DATA> withDelay(long delay) {
-		this.delay = delay;
-		return this;
-	}
-
-	protected long granularityInMillis() {
-		return granularity == null ? length : granularity.toMillis(length);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/Timestamp.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/Timestamp.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/Timestamp.java
deleted file mode 100644
index fea6020..0000000
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/Timestamp.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.windowing.helper;
-
-import java.io.Serializable;
-
-/**
- * Interface for getting a timestamp from a custom value. Used in window
- * reduces. In order to work properly, the timestamps must be non-decreasing.
- *
- * @param <T>
- *            Type of the value to create the timestamp from.
- */
-public interface Timestamp<T> extends Serializable {
-
-	/**
-	 * Values
-	 * 
-	 * @param value
-	 *            The value to create the timestamp from
-	 * @return The timestamp
-	 */
-	public long getTimestamp(T value);
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/TimestampWrapper.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/TimestampWrapper.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/TimestampWrapper.java
deleted file mode 100644
index 8c3a09d..0000000
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/TimestampWrapper.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.windowing.helper;
-
-import java.io.Serializable;
-
-public class TimestampWrapper<T> implements Serializable {
-
-	private static final long serialVersionUID = 1L;
-	private long startTime;
-	private Timestamp<T> timestamp;
-
-	public TimestampWrapper(Timestamp<T> timeStamp, long startTime) {
-		this.timestamp = timeStamp;
-		this.startTime = startTime;
-	}
-
-	public long getTimestamp(T in) {
-		return timestamp.getTimestamp(in);
-	}
-
-	public long getStartTime() {
-		return startTime;
-	}
-
-	public boolean isDefaultTimestamp() {
-		return timestamp instanceof SystemTimestamp;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/WindowingHelper.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/WindowingHelper.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/WindowingHelper.java
deleted file mode 100644
index 9df8432..0000000
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/WindowingHelper.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.windowing.helper;
-
-import org.apache.flink.streaming.api.windowing.policy.EvictionPolicy;
-import org.apache.flink.streaming.api.windowing.policy.TriggerPolicy;
-
-/**
- * A helper representing a count or eviction policy. Such helper classes are
- * used to provide a nice and well readable API.
- * 
- * @param <DATA>
- *            the type of input data handled by this helper
- * @see Count
- * @see Time
- * @see Delta
- */
-public interface WindowingHelper<DATA> {
-
-	public EvictionPolicy<DATA> toEvict();
-
-	public TriggerPolicy<DATA> toTrigger();
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/ActiveCloneableEvictionPolicyWrapper.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/ActiveCloneableEvictionPolicyWrapper.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/ActiveCloneableEvictionPolicyWrapper.java
deleted file mode 100644
index 29ba9eb..0000000
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/ActiveCloneableEvictionPolicyWrapper.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.windowing.policy;
-
-/**
- * The {@link ActiveEvictionPolicy} wraps around a non active
- * {@link EvictionPolicy}. It forwards all calls to
- * {@link ActiveEvictionPolicy#notifyEvictionWithFakeElement(Object, int)} to
- * {@link EvictionPolicy#notifyEviction(Object, boolean, int)} while the
- * triggered parameter will be set to true.
- * 
- * This class additionally implements the clone method and can wrap around
- * {@link CloneableEvictionPolicy} to make it active.
- * 
- * @param <DATA>
- *            The data type handled by this policy
- */
-public class ActiveCloneableEvictionPolicyWrapper<DATA> extends ActiveEvictionPolicyWrapper<DATA>
-		implements CloneableEvictionPolicy<DATA> {
-
-	/**
-	 * Auto generated version ID
-	 */
-	private static final long serialVersionUID = 1520261575300622769L;
-	CloneableEvictionPolicy<DATA> nestedPolicy;
-
-	/**
-	 * Creates a wrapper which activates the eviction policy which is wrapped
-	 * in. This means that the nested policy will get called on fake elements as
-	 * well as on real elements.
-	 * 
-	 * This specialized version of the {@link ActiveEvictionPolicyWrapper} works
-	 * with {@link CloneableEvictionPolicy} and is thereby cloneable as well.
-	 * 
-	 * @param nestedPolicy
-	 *            The policy which should be activated/wrapped in.
-	 */
-	public ActiveCloneableEvictionPolicyWrapper(CloneableEvictionPolicy<DATA> nestedPolicy) {
-		super(nestedPolicy);
-		this.nestedPolicy = nestedPolicy;
-	}
-
-	@Override
-	public ActiveCloneableEvictionPolicyWrapper<DATA> clone() {
-		return new ActiveCloneableEvictionPolicyWrapper<DATA>(nestedPolicy.clone());
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/ActiveEvictionPolicy.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/ActiveEvictionPolicy.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/ActiveEvictionPolicy.java
deleted file mode 100644
index fe172bc..0000000
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/ActiveEvictionPolicy.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.windowing.policy;
-
-/**
- * This interface is used for active eviction policies. beside the functionality
- * inherited from {@link EvictionPolicy} it provides a method which gets called
- * to notify on fake elements.
- * 
- * In case an eviction policy implements this interface instead of the
- * {@link EvictionPolicy} interface, not only the real but also fake data points
- * will cause a notification of the eviction.
- * 
- * Fake data points are mostly used in windowing based on time to trigger and
- * evict even if no element arrives at all during a windows duration.
- */
-public interface ActiveEvictionPolicy<DATA> extends EvictionPolicy<DATA> {
-
-	/**
-	 * Proves if and how many elements should be deleted from the element
-	 * buffer. The eviction takes place after the trigger and after the call to
-	 * the UDF. This method is only called with fake elements.
-	 * 
-	 * Note: Fake elements are always considered as triggered. Therefore this
-	 * method does not have a triggered parameter.
-	 * 
-	 * @param datapoint
-	 *            the current fake data point
-	 * @param bufferSize
-	 *            the current size of the buffer (only real elements are
-	 *            counted)
-	 * @return the number of elements to delete from the buffer (only real
-	 *         elements are counted)
-	 */
-	public int notifyEvictionWithFakeElement(Object datapoint, int bufferSize);
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/ActiveEvictionPolicyWrapper.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/ActiveEvictionPolicyWrapper.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/ActiveEvictionPolicyWrapper.java
deleted file mode 100644
index b3b6935..0000000
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/ActiveEvictionPolicyWrapper.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.windowing.policy;
-
-/**
- * This {@link ActiveEvictionPolicy} wraps around a non active
- * {@link EvictionPolicy}. It forwards all calls to
- * {@link ActiveEvictionPolicy#notifyEvictionWithFakeElement(Object, int)} to
- * {@link EvictionPolicy#notifyEviction(Object, boolean, int)} while the
- * triggered parameter will be set to true.
- * 
- * @param <DATA>
- *            The data type handled by this policy
- */
-public class ActiveEvictionPolicyWrapper<DATA> implements ActiveEvictionPolicy<DATA> {
-
-	/**
-	 * Auto generated version ID
-	 */
-	private static final long serialVersionUID = -7656558669799505882L;
-	private EvictionPolicy<DATA> nestedPolicy;
-
-	/**
-	 * Creates a wrapper which activates the eviction policy which is wrapped
-	 * in. This means that the nested policy will get called on fake elements as
-	 * well as on real elements.
-	 * 
-	 * @param nestedPolicy
-	 *            The policy which should be activated/wrapped in.
-	 */
-	public ActiveEvictionPolicyWrapper(EvictionPolicy<DATA> nestedPolicy) {
-		if (nestedPolicy == null) {
-			throw new RuntimeException("The nested policy must not be null.");
-		}
-		this.nestedPolicy = nestedPolicy;
-	}
-
-	@Override
-	public int notifyEviction(DATA datapoint, boolean triggered, int bufferSize) {
-		return nestedPolicy.notifyEviction(datapoint, triggered, bufferSize);
-	}
-
-	@SuppressWarnings("unchecked")
-	@Override
-	public int notifyEvictionWithFakeElement(Object datapoint, int bufferSize) {
-		return nestedPolicy.notifyEviction((DATA) datapoint, true, bufferSize);
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/ActiveTriggerCallback.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/ActiveTriggerCallback.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/ActiveTriggerCallback.java
deleted file mode 100644
index c44be37..0000000
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/ActiveTriggerCallback.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.windowing.policy;
-
-/**
- * In case an {@link ActiveTriggerPolicy} is used, it can implement own
- * {@link Runnable} classes. Such {@link Runnable} classes will be executed as
- * an own thread and can submit fake elements, to the element buffer at any
- * time.
- * 
- * The factory method for runnables of the {@link ActiveTriggerPolicy} gets an
- * instance of this interface as parameter. The describes adding of elements can
- * be done by the runnable using the methods provided in this interface.
- * 
- */
-public interface ActiveTriggerCallback {
-
-	/**
-	 * Submits a new fake data point to the element buffer. Such a fake element
-	 * might be used to trigger at any time, but will never be included in the
-	 * result of the reduce function. The submission of a fake element causes
-	 * notifications only at the {@link ActiveTriggerPolicy} and
-	 * {@link ActiveEvictionPolicy} implementations.
-	 * 
-	 * @param datapoint
-	 *            the fake data point to be added
-	 */
-	public void sendFakeElement(Object datapoint);
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/ActiveTriggerPolicy.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/ActiveTriggerPolicy.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/ActiveTriggerPolicy.java
deleted file mode 100644
index abe5298..0000000
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/ActiveTriggerPolicy.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.windowing.policy;
-
-import org.apache.flink.streaming.api.windowing.helper.Timestamp;
-
-/**
- * This interface extends the {@link TriggerPolicy} interface with functionality
- * for active triggers. Active triggers can act in two ways:
- * 
- * 1) Whenever an element arrives at the invokable, the
- * {@link ActiveTriggerPolicy#preNotifyTrigger(Object)} method gets called
- * first. It can return zero ore more fake data points which will be added
- * before the currently arrived real element gets processed. This allows to
- * handle empty windows in time based windowing with an user defined
- * {@link Timestamp}. Triggers are not called on fake datapoint. A fake
- * datapoint is always considered as triggered.
- * 
- * 2) An active trigger has a factory method for a runnable. This factory method
- * gets called at the start up of the invokable. The returned runnable will be
- * executed in its own thread and can submit fake elements at any time threw an
- * {@link ActiveTriggerCallback}. This allows to have time based triggers based
- * on any system internal time measure. Triggers are not called on fake
- * datapoint. A fake datapoints is always considered as triggered.
- * 
- * @param <DATA>
- *            The data type which can be handled by this policy
- */
-public interface ActiveTriggerPolicy<DATA> extends TriggerPolicy<DATA> {
-
-	/**
-	 * Whenever an element arrives at the invokable, the
-	 * {@link ActiveTriggerPolicy#preNotifyTrigger(Object)} method gets called
-	 * first. It can return zero ore more fake data points which will be added
-	 * before the the currently arrived real element gets processed. This allows
-	 * to handle empty windows in time based windowing with an user defined
-	 * {@link Timestamp}. Triggers are not called on fake datapoints. A fake
-	 * datapoint is always considered as triggered.
-	 * 
-	 * @param datapoint
-	 *            the data point which arrived at the invokable
-	 * @return zero ore more fake data points which will be added before the the
-	 *         currently arrived real element gets processed.
-	 */
-	public Object[] preNotifyTrigger(DATA datapoint);
-
-	/**
-	 * This is the factory method for a runnable. This factory method gets
-	 * called at the start up of the invokable. The returned runnable will be
-	 * executed in its own thread and can submit fake elements at any time threw
-	 * an {@link ActiveTriggerCallback}. This allows to have time based triggers
-	 * based on any system internal time measure. Triggers are not called on
-	 * fake datapoints. A fake datapoint is always considered as triggered.
-	 * 
-	 * @param callback
-	 *            A callback object which allows to add fake elements from
-	 *            within the returned {@link Runnable}.
-	 * @return The runnable implementation or null in case there is no. In case
-	 *         an {@link ActiveTriggerPolicy} is used, it can implement own
-	 *         {@link Runnable} classes. Such {@link Runnable} classes will be
-	 *         executed as an own thread and can submit fake elements, to the
-	 *         element buffer at any time.
-	 */
-	public Runnable createActiveTriggerRunnable(ActiveTriggerCallback callback);
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/CloneableEvictionPolicy.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/CloneableEvictionPolicy.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/CloneableEvictionPolicy.java
deleted file mode 100644
index 132b495..0000000
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/CloneableEvictionPolicy.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.windowing.policy;
-
-import org.apache.flink.streaming.api.invokable.operator.GroupedWindowInvokable;
-
-/**
- * When used in {@link GroupedWindowInvokable}, eviction policies must
- * provide a clone method. Eviction policies get cloned to provide an own
- * instance for each group and respectively each individual element buffer as
- * groups maintain their own buffers with the elements belonging to the
- * respective group.
- * 
- * This interface extends {@link EvictionPolicy} with such a clone method. It
- * also adds the Java {@link Cloneable} interface as flag.
- * 
- * @param <DATA>
- *            The data type handled by this policy
- */
-public interface CloneableEvictionPolicy<DATA> extends EvictionPolicy<DATA>, Cloneable {
-
-	/**
-	 * This method should return an exact copy of the object it belongs to
-	 * including the current object state.
-	 * 
-	 * @return a copy of this object
-	 */
-	public CloneableEvictionPolicy<DATA> clone();
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/CloneableTriggerPolicy.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/CloneableTriggerPolicy.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/CloneableTriggerPolicy.java
deleted file mode 100644
index f5772a1..0000000
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/CloneableTriggerPolicy.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.windowing.policy;
-
-import org.apache.flink.streaming.api.invokable.operator.GroupedWindowInvokable;
-
-/**
- * When used in {@link GroupedWindowInvokable}, trigger policies can provide
- * a clone method. Cloneable triggers can can be used in a distributed manner,
- * which means they get cloned to provide an own instance for each group. This
- * allows each group to trigger individually and only based on the elements
- * belonging to the respective group.
- * 
- * This interface extends {@link TriggerPolicy} with such a clone method. It
- * also adds the Java {@link Cloneable} interface as flag.
- * 
- * @param <DATA>
- *            The data type handled by this policy
- */
-public interface CloneableTriggerPolicy<DATA> extends TriggerPolicy<DATA>, Cloneable {
-
-	/**
-	 * This method should return an exact copy of the object it belongs to
-	 * including the current object state.
-	 * 
-	 * @return a copy of this object
-	 */
-	public CloneableTriggerPolicy<DATA> clone();
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/CountEvictionPolicy.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/CountEvictionPolicy.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/CountEvictionPolicy.java
deleted file mode 100644
index a9e7b3f..0000000
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/CountEvictionPolicy.java
+++ /dev/null
@@ -1,119 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.windowing.policy;
-
-/**
- * This eviction policy allows the eviction of data points from the buffer using
- * a counter of arriving elements and a threshold (maximal buffer size)
- * 
- * By default this policy does not react on fake elements. Wrap it in an
- * {@link ActiveEvictionPolicyWrapper} to make it count even fake elements.
- * 
- * @param <IN>
- *            the type of the incoming data points
- */
-public class CountEvictionPolicy<IN> implements CloneableEvictionPolicy<IN> {
-
-	/**
-	 * Auto generated version id
-	 */
-	private static final long serialVersionUID = 2319201348806427996L;
-
-	int maxElements;
-	int counter;
-	int deleteOnEviction = 1;
-
-	/**
-	 * This constructor allows the setup of the simplest possible count based
-	 * eviction. It keeps the size of the buffer according to the given
-	 * maxElements parameter by deleting the oldest element in the buffer.
-	 * Eviction only takes place if the counter of arriving elements would be
-	 * higher than maxElements without eviction.
-	 * 
-	 * @param maxElements
-	 *            The maximum number of elements before eviction. As soon as one
-	 *            more element arrives, the oldest element will be deleted
-	 */
-	public CountEvictionPolicy(int maxElements) {
-		this(maxElements, 1);
-	}
-
-	/**
-	 * This constructor allows to set up both, the maximum number of elements
-	 * and the number of elements to be deleted in case of an eviction.
-	 * 
-	 * Eviction only takes place if the counter of arriving elements would be
-	 * higher than maxElements without eviction. In such a case deleteOnEviction
-	 * elements will be removed from the buffer.
-	 * 
-	 * The counter of arriving elements is adjusted respectively, but never set
-	 * below zero:
-	 * counter=(counter-deleteOnEviction<0)?0:counter-deleteOnEviction
-	 * 
-	 * @param maxElements
-	 *            maxElements The maximum number of elements before eviction.
-	 * @param deleteOnEviction
-	 *            The number of elements to be deleted on eviction. The counter
-	 *            will be adjusted respectively but never below zero.
-	 */
-	public CountEvictionPolicy(int maxElements, int deleteOnEviction) {
-		this(maxElements, deleteOnEviction, 0);
-	}
-
-	/**
-	 * The same as {@link CountEvictionPolicy#CountEvictionPolicy(int, int)}.
-	 * Additionally a custom start value for the counter of arriving elements
-	 * can be set. By setting a negative start value the first eviction can be
-	 * delayed.
-	 * 
-	 * @param maxElements
-	 *            maxElements The maximum number of elements before eviction.
-	 * @param deleteOnEviction
-	 *            The number of elements to be deleted on eviction. The counter
-	 *            will be adjusted respectively but never below zero.
-	 * @param startValue
-	 *            A custom start value for the counter of arriving elements.
-	 * @see CountEvictionPolicy#NextGenCountEvictionPolicy(int, int)
-	 */
-	public CountEvictionPolicy(int maxElements, int deleteOnEviction, int startValue) {
-		this.counter = startValue;
-		this.deleteOnEviction = deleteOnEviction;
-		this.maxElements = maxElements;
-	}
-
-	@Override
-	public int notifyEviction(IN datapoint, boolean triggered, int bufferSize) {
-		// The comparison have to be >= and not == to cover case max=0
-		if (counter >= maxElements) {
-			// Adjust the counter according to the current eviction
-			counter = (counter - deleteOnEviction < 0) ? 0 : counter - deleteOnEviction;
-			// The current element will be added after the eviction
-			// Therefore, increase counter in any case
-			counter++;
-			return deleteOnEviction;
-		} else {
-			counter++;
-			return 0;
-		}
-	}
-
-	@Override
-	public CountEvictionPolicy<IN> clone() {
-		return new CountEvictionPolicy<IN>(maxElements, deleteOnEviction, counter);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/CountTriggerPolicy.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/CountTriggerPolicy.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/CountTriggerPolicy.java
deleted file mode 100644
index 7a3c75a..0000000
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/CountTriggerPolicy.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.windowing.policy;
-
-/**
- * This policy triggers at every n'th element.
- * 
- * @param <IN>
- *            The type of the data points which are handled by this policy
- */
-public class CountTriggerPolicy<IN> implements CloneableTriggerPolicy<IN> {
-
-	/**
-	 * Auto generated version ID
-	 */
-	private static final long serialVersionUID = -6357200688886103968L;
-
-	public static final int DEFAULT_START_VALUE = 0;
-
-	private int counter;
-	private int max;
-
-	/**
-	 * This constructor will set up a count based trigger, which triggers after
-	 * max elements have arrived.
-	 * 
-	 * @param max
-	 *            The number of arriving elements before the trigger occurs.
-	 */
-	public CountTriggerPolicy(int max) {
-		this(max, DEFAULT_START_VALUE);
-	}
-
-	/**
-	 * In addition to {@link CountTriggerPolicy#CountTriggerPolicy(int)} this
-	 * constructor allows to set a custom start value for the element counter.
-	 * This can be used to delay the first trigger by setting a negative start
-	 * value. Often the first trigger should be delayed in case of sliding
-	 * windows. For example if the size of a window should be 4 and a trigger
-	 * should happen every 2, a start value of -2 would allow to also have the
-	 * first window of size 4.
-	 * 
-	 * @param max
-	 *            The number of arriving elements before the trigger occurs.
-	 * @param startValue
-	 *            The start value for the counter of arriving elements.
-	 * @see CountTriggerPolicy#CountTriggerPolicy(int)
-	 */
-	public CountTriggerPolicy(int max, int startValue) {
-		this.max = max;
-		this.counter = startValue;
-	}
-
-	@Override
-	public boolean notifyTrigger(IN datapoint) {
-		// The comparison have to be >= and not == to cover case max=0
-		if (counter >= max) {
-			// The current data point will be part of the next window!
-			// Therefore the counter needs to be set to one already.
-			counter = 1;
-			return true;
-		} else {
-			counter++;
-			return false;
-		}
-	}
-
-	@Override
-	public CountTriggerPolicy<IN> clone() {
-		return new CountTriggerPolicy<IN>(max, counter);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/DeltaPolicy.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/DeltaPolicy.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/DeltaPolicy.java
deleted file mode 100644
index 77bc692..0000000
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/DeltaPolicy.java
+++ /dev/null
@@ -1,112 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.windowing.policy;
-
-import java.util.LinkedList;
-import java.util.List;
-
-import org.apache.flink.streaming.api.windowing.deltafunction.DeltaFunction;
-
-/**
- * This policy calculates a delta between the data point which triggered last
- * and the currently arrived data point. It triggers if the delta is higher than
- * a specified threshold.
- * 
- * In case it gets used for eviction, this policy starts from the first element
- * of the buffer and removes all elements from the buffer which have a higher
- * delta then the threshold. As soon as there is an element with a lower delta,
- * the eviction stops.
- * 
- * By default this policy does not react on fake elements. Wrap it in an
- * {@link ActiveEvictionPolicyWrapper} to make it calculate the delta even on
- * fake elements.
- * 
- * @param <DATA>
- *            The type of the data points which are handled by this policy
- */
-public class DeltaPolicy<DATA> implements CloneableTriggerPolicy<DATA>,
-		CloneableEvictionPolicy<DATA> {
-
-	/**
-	 * Auto generated version ID
-	 */
-	private static final long serialVersionUID = -7797538922123394967L;
-
-	private DeltaFunction<DATA> deltaFuntion;
-	private List<DATA> windowBuffer;
-	private double threshold;
-	private DATA triggerDataPoint;
-
-	/**
-	 * Crates a delta policy which calculates a delta between the data point
-	 * which triggered last and the currently arrived data point. It triggers if
-	 * the delta is higher than a specified threshold.
-	 * 
-	 * In case it gets used for eviction, this policy starts from the first
-	 * element of the buffer and removes all elements from the buffer which have
-	 * a higher delta then the threshold. As soon as there is an element with a
-	 * lower delta, the eviction stops.
-	 * 
-	 * @param deltaFuntion
-	 *            The delta function to be used.
-	 * @param init
-	 *            The initial to be used for the calculation of a delta before
-	 *            the first trigger.
-	 * @param threshold
-	 *            The threshold upon which a triggering should happen.
-	 */
-	public DeltaPolicy(DeltaFunction<DATA> deltaFuntion, DATA init, double threshold) {
-		this.deltaFuntion = deltaFuntion;
-		this.triggerDataPoint = init;
-		this.windowBuffer = new LinkedList<DATA>();
-		this.threshold = threshold;
-	}
-
-	@Override
-	public boolean notifyTrigger(DATA datapoint) {
-		if (deltaFuntion.getDelta(this.triggerDataPoint, datapoint) > this.threshold) {
-			this.triggerDataPoint = datapoint;
-			return true;
-		} else {
-			return false;
-		}
-	}
-
-	@Override
-	public int notifyEviction(DATA datapoint, boolean triggered, int bufferSize) {
-		windowBuffer = windowBuffer.subList(windowBuffer.size() - bufferSize, bufferSize);
-		int evictCount = 0;
-		for (DATA bufferPoint : windowBuffer) {
-			if (deltaFuntion.getDelta(bufferPoint, datapoint) < this.threshold) {
-				break;
-			}
-			evictCount++;
-		}
-
-		if (evictCount > 0) {
-			windowBuffer = windowBuffer.subList(evictCount, windowBuffer.size());
-		}
-		windowBuffer.add(datapoint);
-		return evictCount;
-	}
-
-	@Override
-	public DeltaPolicy<DATA> clone() {
-		return new DeltaPolicy<DATA>(deltaFuntion, triggerDataPoint, threshold);
-	}
-}


Mime
View raw message