flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From aljos...@apache.org
Subject [09/12] flink git commit: Move delta window functions to package functions.windowing.delta
Date Mon, 28 Sep 2015 16:15:05 GMT
Move delta window functions to package functions.windowing.delta


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3be2dc1a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3be2dc1a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3be2dc1a

Branch: refs/heads/master
Commit: 3be2dc1aaaeae4cbbfaecaf4998a64f1199260eb
Parents: 6610cae
Author: Aljoscha Krettek <aljoscha.krettek@gmail.com>
Authored: Fri Sep 25 11:34:10 2015 +0200
Committer: Aljoscha Krettek <aljoscha.krettek@gmail.com>
Committed: Mon Sep 28 17:04:17 2015 +0200

----------------------------------------------------------------------
 .../windowing/delta/CosineDistance.java         | 92 ++++++++++++++++++++
 .../windowing/delta/DeltaFunction.java          | 44 ++++++++++
 .../windowing/delta/EuclideanDistance.java      | 58 ++++++++++++
 .../delta/ExtractionAwareDeltaFunction.java     | 90 +++++++++++++++++++
 .../windowing/deltafunction/CosineDistance.java | 92 --------------------
 .../windowing/deltafunction/DeltaFunction.java  | 44 ----------
 .../deltafunction/EuclideanDistance.java        | 58 ------------
 .../ExtractionAwareDeltaFunction.java           | 90 -------------------
 .../streaming/api/windowing/helper/Delta.java   |  2 +-
 .../api/windowing/policy/DeltaPolicy.java       |  2 +-
 .../api/complex/ComplexIntegrationTest.java     |  2 +-
 .../deltafunction/CosineDistanceTest.java       |  1 +
 .../deltafunction/EuclideanDistanceTest.java    |  1 +
 .../api/windowing/policy/DeltaPolicyTest.java   |  4 +-
 .../examples/windowing/TopSpeedWindowing.java   |  2 +-
 .../streaming/api/scala/windowing/Delta.scala   |  2 +-
 16 files changed, 293 insertions(+), 291 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/3be2dc1a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/delta/CosineDistance.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/delta/CosineDistance.java
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/delta/CosineDistance.java
new file mode 100644
index 0000000..7859b2c
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/delta/CosineDistance.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.windowing.delta;
+
+import org.apache.flink.streaming.api.windowing.extractor.Extractor;
+
+/**
+ * This delta function calculates the cosine distance between two given vectors.
+ * The cosine distance is defined as: cosineDistance=1-cosineSimilarity
+ * 
+ * Cosine similarity: http://en.wikipedia.org/wiki/Cosine_similarity
+ * 
+ * @param <DATA>
+ *            The input data type. This delta function works with a double[],
+ *            but can extract/convert to it from any other given object in case
+ *            the respective extractor has been set. See
+ *            {@link ExtractionAwareDeltaFunction} for more information.
+ */
+public class CosineDistance<DATA> extends ExtractionAwareDeltaFunction<DATA, double[]>
{
+
+	/**
+	 * auto-generated id
+	 */
+	private static final long serialVersionUID = -1217813582965151599L;
+
+	public CosineDistance() {
+		super(null);
+	}
+
+	public CosineDistance(Extractor<DATA, double[]> converter) {
+		super(converter);
+	}
+
+	@Override
+	public double getNestedDelta(double[] oldDataPoint, double[] newDataPoint) {
+		if (isNullvector(oldDataPoint, newDataPoint)) {
+			return 0;
+		}
+
+		if (oldDataPoint.length != newDataPoint.length) {
+			throw new IllegalArgumentException(
+					"The size of two input arrays are not same, can not compute cosine distance");
+		}
+
+		double sum1 = 0;
+		double sum2 = 0;
+		for (int i = 0; i < oldDataPoint.length; i++) {
+			sum1 += oldDataPoint[i] * oldDataPoint[i];
+			sum2 += newDataPoint[i] * newDataPoint[i];
+		}
+		sum1 = Math.sqrt(sum1);
+		sum2 = Math.sqrt(sum2);
+
+		return 1d - (dotProduct(oldDataPoint, newDataPoint) / (sum1 * sum2));
+	}
+
+	private double dotProduct(double[] a, double[] b) {
+		double result = 0;
+		for (int i = 0; i < a.length; i++) {
+			result += a[i] * b[i];
+		}
+		return result;
+	}
+
+	private boolean isNullvector(double[]... vectors) {
+		outer: for (double[] v : vectors) {
+			for (double field : v) {
+				if (field != 0) {
+					continue outer;
+				}
+			}
+			// This position is only reached in case all fields are 0.
+			return true;
+		}
+		return false;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/3be2dc1a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/delta/DeltaFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/delta/DeltaFunction.java
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/delta/DeltaFunction.java
new file mode 100644
index 0000000..0ce2bf9
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/delta/DeltaFunction.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.windowing.delta;
+
+import java.io.Serializable;
+
+/**
+ * This interface allows the implementation of a function which calculates the
+ * delta between two data points. Delta functions might be used in delta
+ * policies and allow flexible adaptive windowing based on the arriving data
+ * points.
+ *
+ * @param <DATA>
+ *            The type of input data which can be compared using this function.
+ */
+public interface DeltaFunction<DATA> extends Serializable {
+
+	/**
+	 * Calculates the delta between two given data points.
+	 * 
+	 * @param oldDataPoint
+	 *            the old data point.
+	 * @param newDataPoint
+	 *            the new data point.
+	 * @return the delta between the two given points.
+	 */
+	public double getDelta(DATA oldDataPoint, DATA newDataPoint);
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/3be2dc1a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/delta/EuclideanDistance.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/delta/EuclideanDistance.java
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/delta/EuclideanDistance.java
new file mode 100644
index 0000000..f9e8ec7
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/delta/EuclideanDistance.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.windowing.delta;
+
+import org.apache.flink.streaming.api.windowing.extractor.Extractor;
+
+/**
+ * This delta function calculates the euclidean distance between two given
+ * points.
+ * 
+ * Euclidean distance: http://en.wikipedia.org/wiki/Euclidean_distance
+ * 
+ * @param <DATA>
+ *            The input data type. This delta function works with a double[],
+ *            but can extract/convert to it from any other given object in case
+ *            the respective extractor has been set. See
+ *            {@link ExtractionAwareDeltaFunction} for more information.
+ */
+public class EuclideanDistance<DATA> extends ExtractionAwareDeltaFunction<DATA,
double[]> {
+
+	public EuclideanDistance() {
+		super(null);
+	}
+
+	public EuclideanDistance(Extractor<DATA, double[]> converter) {
+		super(converter);
+	}
+
+	/**
+	 * auto-generated version id
+	 */
+	private static final long serialVersionUID = 3119432599634512359L;
+
+	@Override
+	public double getNestedDelta(double[] oldDataPoint, double[] newDataPoint) {
+		double result = 0;
+		for (int i = 0; i < oldDataPoint.length; i++) {
+			result += (oldDataPoint[i] - newDataPoint[i]) * (oldDataPoint[i] - newDataPoint[i]);
+		}
+		return Math.sqrt(result);
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/3be2dc1a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/delta/ExtractionAwareDeltaFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/delta/ExtractionAwareDeltaFunction.java
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/delta/ExtractionAwareDeltaFunction.java
new file mode 100644
index 0000000..bd5b0b9
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/delta/ExtractionAwareDeltaFunction.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.windowing.delta;
+
+import org.apache.flink.streaming.api.windowing.extractor.Extractor;
+
+/**
+ * Extend this abstract class to implement a delta function which is aware of
+ * extracting the data on which the delta is calculated from a more complex data
+ * structure. For example in case you want to be able to run a delta only on one
+ * field of a Tuple type or only on some fields from an array.
+ * 
+ * @param <DATA>
+ *            The input data type. The input of this type will be passed to the
+ *            extractor which will transform into a TO-object. The delta
+ *            function then runs on this TO-object.
+ * @param <TO>
+ *            The type on which the delta function runs. (The type of the delta
+ *            function)
+ */
+public abstract class ExtractionAwareDeltaFunction<DATA, TO> implements DeltaFunction<DATA>
{
+
+	/**
+	 * Generated Version ID
+	 */
+	private static final long serialVersionUID = 6927486219702689554L;
+	private Extractor<DATA, TO> converter;
+
+	public ExtractionAwareDeltaFunction(Extractor<DATA, TO> converter) {
+		this.converter = converter;
+	}
+
+	/**
+	 * This method takes the two data point and runs the set extractor on it.
+	 * The delta function implemented at {@link #getNestedDelta} is then called
+	 * with the extracted data. In case no extractor is set the input data gets
+	 * passes to {@link #getNestedDelta} as-is. The return value is just
+	 * forwarded from {@link #getNestedDelta}.
+	 * 
+	 * @param oldDataPoint
+	 *            the older data point as raw data (before extraction).
+	 * @param newDataPoint
+	 *            the new data point as raw data (before extraction).
+	 * @return the delta between the two points.
+	 */
+	@SuppressWarnings("unchecked")
+	@Override
+	public double getDelta(DATA oldDataPoint, DATA newDataPoint) {
+		if (converter == null) {
+			// In case no conversion/extraction is required, we can cast DATA to
+			// TO
+			// => Therefore, "unchecked" warning is suppressed for this method.
+			return getNestedDelta((TO) oldDataPoint, (TO) newDataPoint);
+		} else {
+			return getNestedDelta(converter.extract(oldDataPoint), converter.extract(newDataPoint));
+		}
+
+	}
+
+	/**
+	 * This method is exactly the same as
+	 * {@link DeltaFunction#getDelta(Object, Object)} except that it gets the
+	 * result of the previously done extractions as input. Therefore, this
+	 * method only does the actual calculation of the delta but no data
+	 * extraction or conversion.
+	 * 
+	 * @param oldDataPoint
+	 *            the older data point.
+	 * @param newDataPoint
+	 *            the new data point.
+	 * @return the delta between the two points.
+	 */
+	public abstract double getNestedDelta(TO oldDataPoint, TO newDataPoint);
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/3be2dc1a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/deltafunction/CosineDistance.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/deltafunction/CosineDistance.java
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/deltafunction/CosineDistance.java
deleted file mode 100644
index 77486d0..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/deltafunction/CosineDistance.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.windowing.deltafunction;
-
-import org.apache.flink.streaming.api.windowing.extractor.Extractor;
-
-/**
- * This delta function calculates the cosine distance between two given vectors.
- * The cosine distance is defined as: cosineDistance=1-cosineSimilarity
- * 
- * Cosine similarity: http://en.wikipedia.org/wiki/Cosine_similarity
- * 
- * @param <DATA>
- *            The input data type. This delta function works with a double[],
- *            but can extract/convert to it from any other given object in case
- *            the respective extractor has been set. See
- *            {@link ExtractionAwareDeltaFunction} for more information.
- */
-public class CosineDistance<DATA> extends ExtractionAwareDeltaFunction<DATA, double[]>
{
-
-	/**
-	 * auto-generated id
-	 */
-	private static final long serialVersionUID = -1217813582965151599L;
-
-	public CosineDistance() {
-		super(null);
-	}
-
-	public CosineDistance(Extractor<DATA, double[]> converter) {
-		super(converter);
-	}
-
-	@Override
-	public double getNestedDelta(double[] oldDataPoint, double[] newDataPoint) {
-		if (isNullvector(oldDataPoint, newDataPoint)) {
-			return 0;
-		}
-
-		if (oldDataPoint.length != newDataPoint.length) {
-			throw new IllegalArgumentException(
-					"The size of two input arrays are not same, can not compute cosine distance");
-		}
-
-		double sum1 = 0;
-		double sum2 = 0;
-		for (int i = 0; i < oldDataPoint.length; i++) {
-			sum1 += oldDataPoint[i] * oldDataPoint[i];
-			sum2 += newDataPoint[i] * newDataPoint[i];
-		}
-		sum1 = Math.sqrt(sum1);
-		sum2 = Math.sqrt(sum2);
-
-		return 1d - (dotProduct(oldDataPoint, newDataPoint) / (sum1 * sum2));
-	}
-
-	private double dotProduct(double[] a, double[] b) {
-		double result = 0;
-		for (int i = 0; i < a.length; i++) {
-			result += a[i] * b[i];
-		}
-		return result;
-	}
-
-	private boolean isNullvector(double[]... vectors) {
-		outer: for (double[] v : vectors) {
-			for (double field : v) {
-				if (field != 0) {
-					continue outer;
-				}
-			}
-			// This position is only reached in case all fields are 0.
-			return true;
-		}
-		return false;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/3be2dc1a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/deltafunction/DeltaFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/deltafunction/DeltaFunction.java
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/deltafunction/DeltaFunction.java
deleted file mode 100644
index b2223d6..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/deltafunction/DeltaFunction.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.windowing.deltafunction;
-
-import java.io.Serializable;
-
-/**
- * This interface allows the implementation of a function which calculates the
- * delta between two data points. Delta functions might be used in delta
- * policies and allow flexible adaptive windowing based on the arriving data
- * points.
- *
- * @param <DATA>
- *            The type of input data which can be compared using this function.
- */
-public interface DeltaFunction<DATA> extends Serializable {
-
-	/**
-	 * Calculates the delta between two given data points.
-	 * 
-	 * @param oldDataPoint
-	 *            the old data point.
-	 * @param newDataPoint
-	 *            the new data point.
-	 * @return the delta between the two given points.
-	 */
-	public double getDelta(DATA oldDataPoint, DATA newDataPoint);
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/3be2dc1a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/deltafunction/EuclideanDistance.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/deltafunction/EuclideanDistance.java
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/deltafunction/EuclideanDistance.java
deleted file mode 100644
index 9d055d5..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/deltafunction/EuclideanDistance.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.windowing.deltafunction;
-
-import org.apache.flink.streaming.api.windowing.extractor.Extractor;
-
-/**
- * This delta function calculates the euclidean distance between two given
- * points.
- * 
- * Euclidean distance: http://en.wikipedia.org/wiki/Euclidean_distance
- * 
- * @param <DATA>
- *            The input data type. This delta function works with a double[],
- *            but can extract/convert to it from any other given object in case
- *            the respective extractor has been set. See
- *            {@link ExtractionAwareDeltaFunction} for more information.
- */
-public class EuclideanDistance<DATA> extends ExtractionAwareDeltaFunction<DATA,
double[]> {
-
-	public EuclideanDistance() {
-		super(null);
-	}
-
-	public EuclideanDistance(Extractor<DATA, double[]> converter) {
-		super(converter);
-	}
-
-	/**
-	 * auto-generated version id
-	 */
-	private static final long serialVersionUID = 3119432599634512359L;
-
-	@Override
-	public double getNestedDelta(double[] oldDataPoint, double[] newDataPoint) {
-		double result = 0;
-		for (int i = 0; i < oldDataPoint.length; i++) {
-			result += (oldDataPoint[i] - newDataPoint[i]) * (oldDataPoint[i] - newDataPoint[i]);
-		}
-		return Math.sqrt(result);
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/3be2dc1a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/deltafunction/ExtractionAwareDeltaFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/deltafunction/ExtractionAwareDeltaFunction.java
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/deltafunction/ExtractionAwareDeltaFunction.java
deleted file mode 100644
index 3e9f2ca..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/deltafunction/ExtractionAwareDeltaFunction.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.windowing.deltafunction;
-
-import org.apache.flink.streaming.api.windowing.extractor.Extractor;
-
-/**
- * Extend this abstract class to implement a delta function which is aware of
- * extracting the data on which the delta is calculated from a more complex data
- * structure. For example in case you want to be able to run a delta only on one
- * field of a Tuple type or only on some fields from an array.
- * 
- * @param <DATA>
- *            The input data type. The input of this type will be passed to the
- *            extractor which will transform into a TO-object. The delta
- *            function then runs on this TO-object.
- * @param <TO>
- *            The type on which the delta function runs. (The type of the delta
- *            function)
- */
-public abstract class ExtractionAwareDeltaFunction<DATA, TO> implements DeltaFunction<DATA>
{
-
-	/**
-	 * Generated Version ID
-	 */
-	private static final long serialVersionUID = 6927486219702689554L;
-	private Extractor<DATA, TO> converter;
-
-	public ExtractionAwareDeltaFunction(Extractor<DATA, TO> converter) {
-		this.converter = converter;
-	}
-
-	/**
-	 * This method takes the two data point and runs the set extractor on it.
-	 * The delta function implemented at {@link #getNestedDelta} is then called
-	 * with the extracted data. In case no extractor is set the input data gets
-	 * passes to {@link #getNestedDelta} as-is. The return value is just
-	 * forwarded from {@link #getNestedDelta}.
-	 * 
-	 * @param oldDataPoint
-	 *            the older data point as raw data (before extraction).
-	 * @param newDataPoint
-	 *            the new data point as raw data (before extraction).
-	 * @return the delta between the two points.
-	 */
-	@SuppressWarnings("unchecked")
-	@Override
-	public double getDelta(DATA oldDataPoint, DATA newDataPoint) {
-		if (converter == null) {
-			// In case no conversion/extraction is required, we can cast DATA to
-			// TO
-			// => Therefore, "unchecked" warning is suppressed for this method.
-			return getNestedDelta((TO) oldDataPoint, (TO) newDataPoint);
-		} else {
-			return getNestedDelta(converter.extract(oldDataPoint), converter.extract(newDataPoint));
-		}
-
-	}
-
-	/**
-	 * This method is exactly the same as
-	 * {@link DeltaFunction#getDelta(Object, Object)} except that it gets the
-	 * result of the previously done extractions as input. Therefore, this
-	 * method only does the actual calculation of the delta but no data
-	 * extraction or conversion.
-	 * 
-	 * @param oldDataPoint
-	 *            the older data point.
-	 * @param newDataPoint
-	 *            the new data point.
-	 * @return the delta between the two points.
-	 */
-	public abstract double getNestedDelta(TO oldDataPoint, TO newDataPoint);
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/3be2dc1a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/Delta.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/Delta.java
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/Delta.java
index 255049d..31063ab 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/Delta.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/Delta.java
@@ -20,7 +20,7 @@ package org.apache.flink.streaming.api.windowing.helper;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.streaming.api.windowing.deltafunction.DeltaFunction;
+import org.apache.flink.streaming.api.functions.windowing.delta.DeltaFunction;
 import org.apache.flink.streaming.api.windowing.policy.DeltaPolicy;
 import org.apache.flink.streaming.api.windowing.policy.EvictionPolicy;
 import org.apache.flink.streaming.api.windowing.policy.TriggerPolicy;

http://git-wip-us.apache.org/repos/asf/flink/blob/3be2dc1a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/DeltaPolicy.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/DeltaPolicy.java
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/DeltaPolicy.java
index 0583176..0b6a493 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/DeltaPolicy.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/DeltaPolicy.java
@@ -26,7 +26,7 @@ import java.util.List;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.runtime.util.DataInputDeserializer;
 import org.apache.flink.runtime.util.DataOutputSerializer;
-import org.apache.flink.streaming.api.windowing.deltafunction.DeltaFunction;
+import org.apache.flink.streaming.api.functions.windowing.delta.DeltaFunction;
 
 /**
  * This policy calculates a delta between the data point which triggered last

http://git-wip-us.apache.org/repos/asf/flink/blob/3be2dc1a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/complex/ComplexIntegrationTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/complex/ComplexIntegrationTest.java
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/complex/ComplexIntegrationTest.java
index f2c253c..abc1a18 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/complex/ComplexIntegrationTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/complex/ComplexIntegrationTest.java
@@ -37,7 +37,7 @@ import org.apache.flink.streaming.api.functions.WindowMapFunction;
 import org.apache.flink.streaming.api.functions.co.CoMapFunction;
 import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import org.apache.flink.streaming.api.windowing.deltafunction.DeltaFunction;
+import org.apache.flink.streaming.api.functions.windowing.delta.DeltaFunction;
 import org.apache.flink.streaming.api.windowing.helper.Count;
 import org.apache.flink.streaming.api.windowing.helper.Delta;
 import org.apache.flink.streaming.api.windowing.helper.Time;

http://git-wip-us.apache.org/repos/asf/flink/blob/3be2dc1a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/deltafunction/CosineDistanceTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/deltafunction/CosineDistanceTest.java
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/deltafunction/CosineDistanceTest.java
index e12b254..bdc7e94 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/deltafunction/CosineDistanceTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/deltafunction/CosineDistanceTest.java
@@ -19,6 +19,7 @@ package org.apache.flink.streaming.api.windowing.deltafunction;
 
 import static org.junit.Assert.*;
 
+import org.apache.flink.streaming.api.functions.windowing.delta.CosineDistance;
 import org.junit.Test;
 
 public class CosineDistanceTest {

http://git-wip-us.apache.org/repos/asf/flink/blob/3be2dc1a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/deltafunction/EuclideanDistanceTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/deltafunction/EuclideanDistanceTest.java
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/deltafunction/EuclideanDistanceTest.java
index 8c62497..85a0882 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/deltafunction/EuclideanDistanceTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/deltafunction/EuclideanDistanceTest.java
@@ -19,6 +19,7 @@ package org.apache.flink.streaming.api.windowing.deltafunction;
 
 import static org.junit.Assert.*;
 
+import org.apache.flink.streaming.api.functions.windowing.delta.EuclideanDistance;
 import org.junit.Test;
 
 public class EuclideanDistanceTest {

http://git-wip-us.apache.org/repos/asf/flink/blob/3be2dc1a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/DeltaPolicyTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/DeltaPolicyTest.java
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/DeltaPolicyTest.java
index 448377d..9ec4644 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/DeltaPolicyTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/DeltaPolicyTest.java
@@ -19,7 +19,7 @@ package org.apache.flink.streaming.api.windowing.policy;
 
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.streaming.api.windowing.deltafunction.DeltaFunction;
+import org.apache.flink.streaming.api.functions.windowing.delta.DeltaFunction;
 import org.junit.Test;
 
 import java.util.List;
@@ -85,4 +85,4 @@ public class DeltaPolicyTest {
 				0, 0), 3, SERIALIZER));
 
 	}
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/3be2dc1a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/TopSpeedWindowing.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/TopSpeedWindowing.java
b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/TopSpeedWindowing.java
index e48b437..1b48387 100644
--- a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/TopSpeedWindowing.java
+++ b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/TopSpeedWindowing.java
@@ -22,7 +22,7 @@ import org.apache.flink.api.java.tuple.Tuple4;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import org.apache.flink.streaming.api.windowing.deltafunction.DeltaFunction;
+import org.apache.flink.streaming.api.functions.windowing.delta.DeltaFunction;
 import org.apache.flink.streaming.api.windowing.helper.Delta;
 import org.apache.flink.streaming.api.windowing.helper.Time;
 import org.apache.flink.streaming.api.windowing.helper.Timestamp;

http://git-wip-us.apache.org/repos/asf/flink/blob/3be2dc1a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/windowing/Delta.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/windowing/Delta.scala
b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/windowing/Delta.scala
index f490726..461ad3c 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/windowing/Delta.scala
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/windowing/Delta.scala
@@ -19,8 +19,8 @@
 package org.apache.flink.streaming.api.scala.windowing
 
 import org.apache.flink.api.scala.ClosureCleaner
+import org.apache.flink.streaming.api.functions.windowing.delta.DeltaFunction
 import org.apache.flink.streaming.api.windowing.helper.{ Delta => JavaDelta }
-import org.apache.flink.streaming.api.windowing.deltafunction.DeltaFunction
 
 object Delta {
 


Mime
View raw message