flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mbala...@apache.org
Subject [01/34] incubator-flink git commit: [streaming] Added cosine distance and Euclidean distance as examples for possible delta function implementations. Test cases are included as well.
Date Fri, 05 Dec 2014 17:26:06 GMT
Repository: incubator-flink
Updated Branches:
  refs/heads/master a677c7723 -> f84408805


[streaming] Added cosine distance and Euclidean distance as examples for possible delta function
implementations. Test cases are included as well.

[streaming] Added policy helper for delta policies to provide a simpler API for such policies.

[streaming] Created ActiveEvictionPolicyWrapper. This eviction policy wrappes around a non-active
policy and makes it active by forwarding notification on fake elements to the regular notification
method.


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/4d586e72
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/4d586e72
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/4d586e72

Branch: refs/heads/master
Commit: 4d586e7264194fc7fc8ca86df6e555b1e3205fac
Parents: b263624
Author: Jonas Traub (powibol) <jon@s-traub.com>
Authored: Mon Oct 27 14:04:36 2014 +0100
Committer: mbalassi <mbalassi@apache.org>
Committed: Fri Dec 5 16:45:08 2014 +0100

----------------------------------------------------------------------
 .../windowing/deltafunction/CosineDistance.java | 87 ++++++++++++++++++++
 .../deltafunction/EuclideanDistance.java        | 58 +++++++++++++
 .../streaming/api/windowing/helper/Delta.java   | 84 +++++++++++++++++++
 .../policy/ActiveEvictionPolicyWrapper.java     | 63 ++++++++++++++
 .../deltafunction/CosineDistanceTest.java       | 70 ++++++++++++++++
 .../deltafunction/EuclideanDistanceTest.java    | 71 ++++++++++++++++
 6 files changed, 433 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4d586e72/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/deltafunction/CosineDistance.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/deltafunction/CosineDistance.java
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/deltafunction/CosineDistance.java
new file mode 100644
index 0000000..15aaf51
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/deltafunction/CosineDistance.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.windowing.deltafunction;
+
+import org.apache.flink.streaming.api.windowing.extractor.Extractor;
+
+/**
+ * This delta function calculates the cosine distance between two given vectors.
+ * The cosine distance is defined as: cosineDistance=1-cosineSimilarity
+ * 
+ * Cosine similarity: http://en.wikipedia.org/wiki/Cosine_similarity
+ * 
+ * @param <DATA>
+ *            The input data type. This delta function works with a double[],
+ *            but can extract/convert to it from any other given object in case
+ *            the respective extractor has been set. See
+ *            {@link ExtractionAwareDeltaFunction} for more information.
+ */
+public class CosineDistance<DATA> extends ExtractionAwareDeltaFunction<DATA, double[]>
{
+
+	/**
+	 * auto-generated id
+	 */
+	private static final long serialVersionUID = -1217813582965151599L;
+
+	public CosineDistance() {
+		super(null);
+	}
+
+	public CosineDistance(Extractor<DATA, double[]> converter) {
+		super(converter);
+	}
+
+	@Override
+	public double getNestedDelta(double[] oldDataPoint, double[] newDataPoint) {
+		if (isNullvector(oldDataPoint, newDataPoint)) {
+			return 0;
+		}
+
+		double sum1 = 0;
+		double sum2 = 0;
+		for (int i = 0; i < oldDataPoint.length; i++) {
+			sum1 += oldDataPoint[i] * oldDataPoint[i];
+			sum2 += newDataPoint[i] * newDataPoint[i];
+		}
+		sum1 = Math.sqrt(sum1);
+		sum2 = Math.sqrt(sum2);
+
+		return 1d - (dotProduct(oldDataPoint, newDataPoint) / (sum1 * sum2));
+	}
+
+	private double dotProduct(double[] a, double[] b) {
+		double result = 0;
+		for (int i = 0; i < a.length; i++) {
+			result += a[i] * b[i];
+		}
+		return result;
+	}
+
+	private boolean isNullvector(double[]... vectors) {
+		outer: for (double[] v : vectors) {
+			for (double field : v) {
+				if (field != 0) {
+					continue outer;
+				}
+			}
+			// This position is only reached in case all fields are 0.
+			return true;
+		}
+		return false;
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4d586e72/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/deltafunction/EuclideanDistance.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/deltafunction/EuclideanDistance.java
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/deltafunction/EuclideanDistance.java
new file mode 100644
index 0000000..9d055d5
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/deltafunction/EuclideanDistance.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.windowing.deltafunction;
+
+import org.apache.flink.streaming.api.windowing.extractor.Extractor;
+
+/**
+ * This delta function calculates the euclidean distance between two given
+ * points.
+ * 
+ * Euclidean distance: http://en.wikipedia.org/wiki/Euclidean_distance
+ * 
+ * @param <DATA>
+ *            The input data type. This delta function works with a double[],
+ *            but can extract/convert to it from any other given object in case
+ *            the respective extractor has been set. See
+ *            {@link ExtractionAwareDeltaFunction} for more information.
+ */
+public class EuclideanDistance<DATA> extends ExtractionAwareDeltaFunction<DATA,
double[]> {
+
+	public EuclideanDistance() {
+		super(null);
+	}
+
+	public EuclideanDistance(Extractor<DATA, double[]> converter) {
+		super(converter);
+	}
+
+	/**
+	 * auto-generated version id
+	 */
+	private static final long serialVersionUID = 3119432599634512359L;
+
+	@Override
+	public double getNestedDelta(double[] oldDataPoint, double[] newDataPoint) {
+		double result = 0;
+		for (int i = 0; i < oldDataPoint.length; i++) {
+			result += (oldDataPoint[i] - newDataPoint[i]) * (oldDataPoint[i] - newDataPoint[i]);
+		}
+		return Math.sqrt(result);
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4d586e72/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/Delta.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/Delta.java
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/Delta.java
new file mode 100644
index 0000000..89c6c4a
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/Delta.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.windowing.helper;
+
+import org.apache.flink.streaming.api.windowing.deltafunction.DeltaFunction;
+import org.apache.flink.streaming.api.windowing.policy.DeltaPolicy;
+import org.apache.flink.streaming.api.windowing.policy.EvictionPolicy;
+import org.apache.flink.streaming.api.windowing.policy.TriggerPolicy;
+
+/**
+ * This helper represents a trigger or eviction policy based on a
+ * {@link DeltaFunction}.
+ * 
+ * @param <DATA>
+ *            the data type handled by the delta function represented by this
+ *            helper.
+ */
+public class Delta<DATA> implements WindowingHelper<DATA> {
+
+	private DeltaFunction<DATA> deltaFunction;
+	private DATA initVal;
+	private double threshold;
+
+	/**
+	 * Creates a delta helper representing a delta count or eviction policy
+	 * 
+	 * @param deltaFunction
+	 *            The delta function which should be used to calculate the delta
+	 *            between points.
+	 * @param initVal
+	 *            The initial value which will be used to calculate the first
+	 *            delta.
+	 * @param threshold
+	 *            The threshold used by the delta function.
+	 */
+	public Delta(DeltaFunction<DATA> deltaFunction, DATA initVal, double threshold) {
+		this.deltaFunction = deltaFunction;
+		this.initVal = initVal;
+		this.threshold = threshold;
+	}
+
+	@Override
+	public EvictionPolicy<DATA> toEvict() {
+		return new DeltaPolicy<DATA>(deltaFunction, initVal, threshold);
+	}
+
+	@Override
+	public TriggerPolicy<DATA> toTrigger() {
+		return new DeltaPolicy<DATA>(deltaFunction, initVal, threshold);
+	}
+
+	/**
+	 * Creates a delta helper representing a delta count or eviction policy
+	 * 
+	 * @param deltaFunction
+	 *            The delta function which should be used to calculate the delta
+	 *            between points.
+	 * @param initVal
+	 *            The initial value which will be used to calculate the first
+	 *            delta.
+	 * @param threshold
+	 *            The threshold used by the delta function.
+	 * @return a delta helper representing a delta count or eviction policy
+	 */
+	public static <DATA> Delta<DATA> of(DeltaFunction<DATA> deltaFunction,
DATA initVal,
+			double threshold) {
+		return new Delta<DATA>(deltaFunction, initVal, threshold);
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4d586e72/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/ActiveEvictionPolicyWrapper.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/ActiveEvictionPolicyWrapper.java
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/ActiveEvictionPolicyWrapper.java
new file mode 100644
index 0000000..a110fbc
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/ActiveEvictionPolicyWrapper.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.windowing.policy;
+
+/**
+ * This {@link ActiveEvictionPolicy} wraps around a non active
+ * {@link EvictionPolicy}. It forwards all calls to
+ * {@link ActiveEvictionPolicy#notifyEvictionWithFakeElement(Object, int)} to
+ * {@link EvictionPolicy#notifyEviction(Object, boolean, int)} while the
+ * triggered parameter will be set to true.
+ * 
+ * @param <DATA>
+ *            The data type handled by this policy
+ */
+public class ActiveEvictionPolicyWrapper<DATA> implements ActiveEvictionPolicy<DATA>
{
+
+	/**
+	 * Auto generated version ID
+	 */
+	private static final long serialVersionUID = -7656558669799505882L;
+	private EvictionPolicy<DATA> nestedPolicy;
+
+	/**
+	 * Creates a wrapper which activates the eviction policy which is wrapped
+	 * in. This means that the nested policy will get called on fake elements as
+	 * well as on real elements.
+	 * 
+	 * @param nestedPolicy
+	 *            The policy which should be activated/wrapped in.
+	 */
+	public ActiveEvictionPolicyWrapper(EvictionPolicy<DATA> nestedPolicy) {
+		if (nestedPolicy == null) {
+			throw new RuntimeException("The nested policy must not be null.");
+		}
+		this.nestedPolicy = nestedPolicy;
+	}
+
+	@Override
+	public int notifyEviction(DATA datapoint, boolean triggered, int bufferSize) {
+		return nestedPolicy.notifyEviction(datapoint, triggered, bufferSize);
+	}
+
+	@Override
+	public int notifyEvictionWithFakeElement(DATA datapoint, int bufferSize) {
+		return nestedPolicy.notifyEviction(datapoint, true, bufferSize);
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4d586e72/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/deltafunction/CosineDistanceTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/deltafunction/CosineDistanceTest.java
b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/deltafunction/CosineDistanceTest.java
new file mode 100644
index 0000000..e12b254
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/deltafunction/CosineDistanceTest.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.windowing.deltafunction;
+
+import static org.junit.Assert.*;
+
+import org.junit.Test;
+
+public class CosineDistanceTest {
+	
+	@SuppressWarnings({ "rawtypes", "unchecked" })
+	@Test
+	public void testCosineDistance() {
+		
+		//Reference calculated using wolfram alpha
+		double[][][] testdata={
+				{{0,0,0},{0,0,0}},
+				{{0,0,0},{1,2,3}},
+				{{1,2,3},{0,0,0}},
+				{{1,2,3},{4,5,6}},
+				{{1,2,3},{-4,-5,-6}},
+				{{1,2,-3},{-4,5,-6}},
+				{{1,2,3,4},{5,6,7,8}},
+				{{1,2},{3,4}},
+				{{1},{2}},
+			};
+		double[] referenceSolutions={
+				0,
+				0,
+				0,
+				0.025368,
+				1.974631,
+				0.269026,
+				0.031136,
+				0.016130,
+				0
+		};
+		
+		for (int i = 0; i < testdata.length; i++) {
+			assertEquals("Wrong result for inputs " + arrayToString(testdata[i][0]) + " and "
+					+ arrayToString(testdata[i][0]), referenceSolutions[i],
+					new CosineDistance().getDelta(testdata[i][0], testdata[i][1]), 0.000001);
+		}
+	}
+	
+	private String arrayToString(double[] in){
+		if (in.length==0) return "{}";
+		String result="{";
+		for (double d:in){
+			result+=d+",";
+		}
+		return result.substring(0, result.length()-1)+"}";
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4d586e72/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/deltafunction/EuclideanDistanceTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/deltafunction/EuclideanDistanceTest.java
b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/deltafunction/EuclideanDistanceTest.java
new file mode 100644
index 0000000..8c62497
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/deltafunction/EuclideanDistanceTest.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.windowing.deltafunction;
+
+import static org.junit.Assert.*;
+
+import org.junit.Test;
+
+public class EuclideanDistanceTest {
+	
+	@SuppressWarnings({ "rawtypes", "unchecked" })
+	@Test
+	public void testEuclideanDistance() {
+		
+		//Reference calculated using wolfram alpha
+		double[][][] testdata={
+				{{0,0,0},{0,0,0}},
+				{{0,0,0},{1,2,3}},
+				{{1,2,3},{0,0,0}},
+				{{1,2,3},{4,5,6}},
+				{{1,2,3},{-4,-5,-6}},
+				{{1,2,-3},{-4,5,-6}},
+				{{1,2,3,4},{5,6,7,8}},
+				{{1,2},{3,4}},
+				{{1},{2}},
+			};
+		double[] referenceSolutions={
+				0,
+				3.741657,
+				3.741657,
+				5.196152,
+				12.4499,
+				6.557439,
+				8.0,
+				2.828427,
+				1
+		};
+		
+		for (int i = 0; i < testdata.length; i++) {
+			assertEquals("Wrong result for inputs " + arrayToString(testdata[i][0]) + " and "
+					+ arrayToString(testdata[i][0]), referenceSolutions[i],
+					new EuclideanDistance().getDelta(testdata[i][0], testdata[i][1]), 0.000001);
+		}
+		
+	}
+	
+	private String arrayToString(double[] in){
+		if (in.length==0) return "{}";
+		String result="{";
+		for (double d:in){
+			result+=d+",";
+		}
+		return result.substring(0, result.length()-1)+"}";
+	}
+
+}


Mime
View raw message