flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From va...@apache.org
Subject [3/4] flink git commit: [FLINK-1741] [gelly] Adds Jaccard Similarity Metric Example
Date Fri, 10 Apr 2015 08:29:27 GMT
[FLINK-1741] [gelly] Adds Jaccard Similarity Metric Example

This closes #544


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e281e4d6
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e281e4d6
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e281e4d6

Branch: refs/heads/master
Commit: e281e4d6fc22e498f56d88b0f661972345bf0e55
Parents: 48713a8
Author: andralungu <lungu.andra@gmail.com>
Authored: Mon Mar 30 14:50:09 2015 +0200
Committer: vasia <vasia@apache.org>
Committed: Fri Apr 10 11:04:19 2015 +0300

----------------------------------------------------------------------
 .../JaccardSimilarityMeasureExample.java        | 212 +++++++++++++++++++
 .../utils/JaccardSimilarityMeasureData.java     |  58 +++++
 .../JaccardSimilarityMeasureExampleITCase.java  |  74 +++++++
 3 files changed, 344 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/e281e4d6/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/JaccardSimilarityMeasureExample.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/JaccardSimilarityMeasureExample.java
b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/JaccardSimilarityMeasureExample.java
new file mode 100644
index 0000000..c81aeb3
--- /dev/null
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/JaccardSimilarityMeasureExample.java
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.example;
+
+import org.apache.flink.api.common.ProgramDescription;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.EdgeDirection;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.EdgesFunction;
+import org.apache.flink.graph.Triplet;
+import org.apache.flink.graph.example.utils.JaccardSimilarityMeasureData;
+import org.apache.flink.types.NullValue;
+
+import java.util.HashSet;
+
+/**
+ * Given a directed, unweighted graph, return a weighted graph where the edge values are
equal
+ * to the Jaccard similarity coefficient - the number of common neighbors divided by the
the size
+ * of the union of neighbor sets - for the src and target vertices.
+ *
+ * <p>
+ * Input files are plain text files and must be formatted as follows:
+ * <br>
+ * 	Edges are represented by pairs of srcVertexId, trgVertexId separated by tabs.
+ * 	Edges themselves are separated by newlines.
+ * 	For example: <code>1	2\n1	3\n</code> defines two edges 1-2 and 1-3.
+ * </p>
+ *
+ * Usage <code> JaccardSimilarityMeasureExample &lt;edge path&gt; &lt;result
path&gt;</code><br>
+ * If no parameters are provided, the program is run with default data from
+ * {@link org.apache.flink.graph.example.utils.JaccardSimilarityMeasureData}
+ */
+@SuppressWarnings("serial")
+public class JaccardSimilarityMeasureExample implements ProgramDescription {
+
+	public static void main(String [] args) throws Exception {
+
+		if(!parseParameters(args)) {
+			return;
+		}
+
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		DataSet<Edge<Long, Double>> edges = getEdgesDataSet(env);
+
+		Graph<Long, NullValue, Double> graph = Graph.fromDataSet(edges, env);
+
+		DataSet<Vertex<Long, HashSet<Long>>> verticesWithNeighbors =
+				graph.reduceOnEdges(new GatherNeighbors(), EdgeDirection.ALL);
+
+		Graph<Long, HashSet<Long>, Double> graphWithVertexValues = Graph.fromDataSet(verticesWithNeighbors,
edges, env);
+
+		// the edge value will be the Jaccard similarity coefficient(number of common neighbors/
all neighbors)
+		DataSet<Tuple3<Long, Long, Double>> edgesWithJaccardWeight = graphWithVertexValues.getTriplets()
+				.map(new WeighEdgesMapper());
+
+		DataSet<Edge<Long, Double>> result = graphWithVertexValues.joinWithEdges(edgesWithJaccardWeight,
+				new MapFunction<Tuple2<Double, Double>, Double>() {
+
+					@Override
+					public Double map(Tuple2<Double, Double> value) throws Exception {
+						return value.f1;
+					}
+				}).getEdges();
+
+		// emit result
+		if (fileOutput) {
+			result.writeAsCsv(outputPath, "\n", ",");
+		} else {
+			result.print();
+		}
+
+		env.execute("Executing Jaccard Similarity Measure");
+	}
+
+	@Override
+	public String getDescription() {
+		return "Vertex Jaccard Similarity Measure";
+	}
+
+	/**
+	 * Each vertex will have a HashSet containing its neighbor ids as value.
+	 */
+	private static final class GatherNeighbors implements EdgesFunction<Long, Double, Vertex<Long,
HashSet<Long>>> {
+
+		@Override
+		public Vertex<Long, HashSet<Long>> iterateEdges(Iterable<Tuple2<Long,
Edge<Long, Double>>> edges) throws Exception {
+
+			HashSet<Long> neighborsHashSet = new HashSet<Long>();
+			long vertexId = -1;
+
+			for(Tuple2<Long, Edge<Long, Double>> edge : edges) {
+				neighborsHashSet.add(getNeighborID(edge));
+				vertexId = edge.f0;
+			}
+			return new Vertex<Long, HashSet<Long>>(vertexId, neighborsHashSet);
+		}
+	}
+
+	/**
+	 * The edge weight will be the Jaccard coefficient, which is computed as follows:
+	 *
+	 * Consider the edge x-y
+	 * We denote by sizeX and sizeY, the neighbors hash set size of x and y respectively.
+	 * sizeX+sizeY = union + intersection of neighborhoods
+	 * size(hashSetX.addAll(hashSetY)).distinct = union of neighborhoods
+	 * The intersection can then be deduced.
+	 *
+	 * The Jaccard similarity coefficient is then, the intersection/union.
+	 */
+	private static class WeighEdgesMapper implements MapFunction<Triplet<Long, HashSet<Long>,
Double>,
+			Tuple3<Long, Long, Double>> {
+
+		@Override
+		public Tuple3<Long, Long, Double> map(Triplet<Long, HashSet<Long>, Double>
triplet)
+				throws Exception {
+
+			Vertex<Long, HashSet<Long>> source = triplet.getSrcVertex();
+			Vertex<Long, HashSet<Long>> target = triplet.getTrgVertex();
+
+			long unionPlusIntersection = source.getValue().size() + target.getValue().size();
+			// within a HashSet, all elements are distinct
+			source.getValue().addAll(target.getValue());
+			// the source value contains the union
+			long union = source.getValue().size();
+			long intersection = unionPlusIntersection - union;
+
+			return new Tuple3<Long, Long, Double>(source.getId(), target.getId(), (double) intersection/union);
+		}
+	}
+
+	/**
+	 * Helper method that extracts the neighborId given an edge.
+	 * @param edge
+	 * @return
+	 */
+	private static Long getNeighborID(Tuple2<Long, Edge<Long, Double>> edge) {
+		if(edge.f1.getSource() == edge.f0) {
+			return edge.f1.getTarget();
+		} else {
+			return edge.f1.getSource();
+		}
+	}
+
+	// *************************************************************************
+	// UTIL METHODS
+	// *************************************************************************
+
+	private static boolean fileOutput = false;
+	private static String edgeInputPath = null;
+	private static String outputPath = null;
+
+	private static boolean parseParameters(String [] args) {
+		if(args.length > 0) {
+			if(args.length != 2) {
+				System.err.println("Usage JaccardSimilarityMeasureExample <edge path> <output
path>");
+				return false;
+			}
+
+			fileOutput = true;
+			edgeInputPath = args[0];
+			outputPath = args[1];
+		} else {
+			System.out.println("Executing JaccardSimilarityMeasure example with default parameters
and built-in default data.");
+			System.out.println("Provide parameters to read input data from files.");
+			System.out.println("Usage JaccardSimilarityMeasureExample <edge path> <output
path>");
+		}
+
+		return true;
+	}
+
+	private static DataSet<Edge<Long, Double>> getEdgesDataSet(ExecutionEnvironment
env) {
+
+		if(fileOutput) {
+			return env.readCsvFile(edgeInputPath)
+					.ignoreComments("#")
+					.fieldDelimiter("\t")
+					.lineDelimiter("\n")
+					.types(Long.class, Long.class)
+					.map(new MapFunction<Tuple2<Long, Long>, Edge<Long, Double>>() {
+						@Override
+						public Edge<Long, Double> map(Tuple2<Long, Long> tuple2) throws Exception
{
+							return new Edge<Long, Double>(tuple2.f0, tuple2.f1, new Double(0));
+						}
+					});
+		} else {
+			return JaccardSimilarityMeasureData.getDefaultEdgeDataSet(env);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e281e4d6/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/JaccardSimilarityMeasureData.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/JaccardSimilarityMeasureData.java
b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/JaccardSimilarityMeasureData.java
new file mode 100644
index 0000000..7564b95
--- /dev/null
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/JaccardSimilarityMeasureData.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.example.utils;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.graph.Edge;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Provides the default data sets used for the Jaccard Similarity Measure example program.
+ * If no parameters are given to the program, the default data sets are used.
+ */
+public class JaccardSimilarityMeasureData {
+
+	public static final String EDGES = "1	2\n" + "1	3\n" + "1	4\n" + "1	5\n" + "2	3\n" + "2
4\n" +
+			"2	5\n" + "3	4\n" + "3	5\n" + "4	5";
+
+	public static DataSet<Edge<Long, Double>> getDefaultEdgeDataSet(ExecutionEnvironment
env) {
+
+		List<Edge<Long, Double>> edges = new ArrayList<Edge<Long, Double>>();
+		edges.add(new Edge<Long, Double>(1L, 2L, new Double(0)));
+		edges.add(new Edge<Long, Double>(1L, 3L, new Double(0)));
+		edges.add(new Edge<Long, Double>(1L, 4L, new Double(0)));
+		edges.add(new Edge<Long, Double>(1L, 5L, new Double(0)));
+		edges.add(new Edge<Long, Double>(2L, 3L, new Double(0)));
+		edges.add(new Edge<Long, Double>(2L, 4L, new Double(0)));
+		edges.add(new Edge<Long, Double>(2L, 5L, new Double(0)));
+		edges.add(new Edge<Long, Double>(3L, 4L, new Double(0)));
+		edges.add(new Edge<Long, Double>(3L, 5L, new Double(0)));
+		edges.add(new Edge<Long, Double>(4L, 5L, new Double(0)));
+
+		return env.fromCollection(edges);
+	}
+
+	public static final String JACCARD_EDGES = "1,2,0.6\n" + "1,3,0.6\n" + "1,4,0.6\n" + "1,5,0.6\n"
+
+			"2,3,0.6\n" + "2,4,0.6\n" + "2,5,0.6\n" + "3,4,0.6\n" + "3,5,0.6\n" + "4,5,0.6";
+
+	private JaccardSimilarityMeasureData() {}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e281e4d6/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/example/JaccardSimilarityMeasureExampleITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/example/JaccardSimilarityMeasureExampleITCase.java
b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/example/JaccardSimilarityMeasureExampleITCase.java
new file mode 100644
index 0000000..7269ed7
--- /dev/null
+++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/example/JaccardSimilarityMeasureExampleITCase.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.test.example;
+
+import com.google.common.base.Charsets;
+import com.google.common.io.Files;
+import org.apache.flink.graph.example.JaccardSimilarityMeasureExample;
+import org.apache.flink.graph.example.utils.JaccardSimilarityMeasureData;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.io.File;
+
+@RunWith(Parameterized.class)
+public class JaccardSimilarityMeasureExampleITCase extends MultipleProgramsTestBase {
+
+	private String edgesPath;
+
+	private String debugResultPath;
+
+	private String resultPath;
+
+	private String expected;
+
+	@Rule
+	public TemporaryFolder tempFolder = new TemporaryFolder();
+
+	public JaccardSimilarityMeasureExampleITCase(TestExecutionMode mode) {
+		super(mode);
+	}
+
+	@Before
+	public void before() throws Exception {
+		resultPath = tempFolder.newFile().toURI().toString();
+
+		File edgesFile = tempFolder.newFile();
+		Files.write(JaccardSimilarityMeasureData.EDGES, edgesFile, Charsets.UTF_8);
+
+		edgesPath = edgesFile.toURI().toString();
+	}
+
+	@Test
+	public void testJaccardSimilarityMeasureExample() throws Exception {
+		JaccardSimilarityMeasureExample.main(new String[]{edgesPath, resultPath});
+		expected = JaccardSimilarityMeasureData.JACCARD_EDGES;
+	}
+
+	@After
+	public void after() throws Exception {
+		compareResultsByLinesInMemory(expected, resultPath);
+	}
+}


Mime
View raw message