flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From g...@apache.org
Subject [2/5] flink git commit: [FLINK-4135] [gelly] Replace ChecksumHashCode as GraphAnalytic
Date Fri, 01 Jul 2016 19:14:13 GMT
[FLINK-4135] [gelly] Replace ChecksumHashCode as GraphAnalytic

Adds a GraphAnalytic to replace the checksumHashCode Java and Scala
utility functions.

This closes #2188


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0efa6441
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0efa6441
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0efa6441

Branch: refs/heads/master
Commit: 0efa6441420ba4a74ecb9a7d70d0a0d80e25e292
Parents: 149e7a0
Author: Greg Hogan <code@greghogan.com>
Authored: Thu Jun 30 09:02:47 2016 -0400
Committer: Greg Hogan <code@greghogan.com>
Committed: Fri Jul 1 14:35:42 2016 -0400

----------------------------------------------------------------------
 .../flink/graph/scala/utils/package.scala       | 60 ----------------
 .../scala/test/util/GraphUtilsITCase.scala      | 46 ------------
 .../graph/library/metric/ChecksumHashCode.java  | 73 ++++++++++++++++++++
 .../apache/flink/graph/utils/GraphUtils.java    | 30 --------
 .../filter/undirected/MaximumDegreeTest.java    | 10 +--
 .../library/metric/ChecksumHashCodeTest.java    | 48 +++++++++++++
 .../flink/graph/test/util/GraphUtilsITCase.java | 58 ----------------
 7 files changed, 127 insertions(+), 198 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/0efa6441/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/utils/package.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/utils/package.scala
b/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/utils/package.scala
deleted file mode 100644
index fe4ab5b..0000000
--- a/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/utils/package.scala
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.scala
-
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.java.Utils
-import org.apache.flink.graph.{Edge, Vertex}
-import org.apache.flink.util.AbstractID
-
-import scala.reflect.ClassTag
-
-package object utils {
-  /**
-    * This class provides utility methods for computing checksums over a Graph.
-    *
-    * @param self Graph
-    */
-  implicit class GraphUtils[K: TypeInformation : ClassTag, VV: TypeInformation : ClassTag,
EV:
-  TypeInformation : ClassTag](val self: Graph[K, VV, EV]) {
-
-    /**
-     * Convenience method to get the count (number of elements) of a Graph
-     * as well as the checksum (sum over element hashes). The vertex and
-     * edge DataSets are processed in a single job and the resultant counts
-     * and checksums are merged locally.
-     *
-     * @return the checksum over the vertices and edges
-     */
-    @throws(classOf[Exception])
-    def checksumHashCode(): Utils.ChecksumHashCode = {
-      val verticesId = new AbstractID().toString
-      self.getVertices.output(new Utils.ChecksumHashCodeHelper[Vertex[K,VV]](verticesId))
-
-      val edgesId = new AbstractID().toString
-      self.getEdges.output(new Utils.ChecksumHashCodeHelper[Edge[K,EV]](edgesId))
-
-      val res = self.getWrappedGraph.getContext.execute()
-
-      val checksum = res.getAccumulatorResult[Utils.ChecksumHashCode](verticesId)
-      checksum.add(res.getAccumulatorResult[Utils.ChecksumHashCode](edgesId))
-      checksum
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/0efa6441/flink-libraries/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/util/GraphUtilsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/util/GraphUtilsITCase.scala
b/flink-libraries/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/util/GraphUtilsITCase.scala
deleted file mode 100644
index c6d3c58..0000000
--- a/flink-libraries/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/util/GraphUtilsITCase.scala
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.scala.test.util
-
-import org.apache.flink.api.scala._
-import org.apache.flink.graph.scala._
-import org.apache.flink.graph.scala.utils._
-import org.apache.flink.graph.scala.test.TestGraphUtils
-import org.apache.flink.test.util.MultipleProgramsTestBase
-import org.junit.Assert.assertEquals
-import org.junit.Test
-import org.junit.runner.RunWith
-import org.junit.runners.Parameterized
-
-@RunWith(classOf[Parameterized])
-class GraphUtilsITCase(mode: MultipleProgramsTestBase.TestExecutionMode) extends
-MultipleProgramsTestBase(mode) {
-
-  @Test
-  @throws(classOf[Exception])
-  def testChecksumHashCodeVerticesAndEdges() {
-    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
-      .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
-    val checksum = graph.checksumHashCode()
-    assertEquals(checksum.getCount, 12L)
-    assertEquals(checksum.getChecksum, 19665L)
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/0efa6441/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/ChecksumHashCode.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/ChecksumHashCode.java
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/ChecksumHashCode.java
new file mode 100644
index 0000000..261e89b
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/ChecksumHashCode.java
@@ -0,0 +1,73 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one
+ *  * or more contributor license agreements.  See the NOTICE file
+ *  * distributed with this work for additional information
+ *  * regarding copyright ownership.  The ASF licenses this file
+ *  * to you under the Apache License, Version 2.0 (the
+ *  * "License"); you may not use this file except in compliance
+ *  * with the License.  You may obtain a copy of the License at
+ *  *
+ *  *     http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+
+package org.apache.flink.graph.library.metric;
+
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.java.Utils;
+import org.apache.flink.graph.AbstractGraphAnalytic;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.util.AbstractID;
+
+/**
+ * Convenience method to get the count (number of elements) of a Graph
+ * as well as the checksum (sum over element hashes). The vertex and
+ * edge DataSets are processed in a single job and the resultant counts
+ * and checksums are merged locally.
+ *
+ * @param <K> graph ID type
+ * @param <VV> vertex value type
+ * @param <EV> edge value type
+ */
+public class ChecksumHashCode<K, VV, EV>
+extends AbstractGraphAnalytic<K, VV, EV, Utils.ChecksumHashCode> {
+
+	private String verticesId = new AbstractID().toString();
+
+	private String edgesId = new AbstractID().toString();
+
+	@Override
+	public ChecksumHashCode<K, VV, EV> run(Graph<K, VV, EV> input)
+			throws Exception {
+		super.run(input);
+
+		input
+			.getVertices()
+			.output(new Utils.ChecksumHashCodeHelper<Vertex<K, VV>>(verticesId))
+				.name("ChecksumHashCode vertices");
+
+		input
+			.getEdges()
+			.output(new Utils.ChecksumHashCodeHelper<Edge<K, EV>>(edgesId))
+				.name("ChecksumHashCode edges");
+
+		return this;
+	}
+
+	@Override
+	public Utils.ChecksumHashCode getResult() {
+		JobExecutionResult res = env.getLastJobExecutionResult();
+		Utils.ChecksumHashCode checksum = res.getAccumulatorResult(verticesId);
+		checksum.add(res.<Utils.ChecksumHashCode>getAccumulatorResult(edgesId));
+		return checksum;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0efa6441/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/GraphUtils.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/GraphUtils.java
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/GraphUtils.java
index 264479b..0c93fa9 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/GraphUtils.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/GraphUtils.java
@@ -18,46 +18,16 @@
 
 package org.apache.flink.graph.utils;
 
-import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.Utils;
-import org.apache.flink.graph.Edge;
-import org.apache.flink.graph.Graph;
-import org.apache.flink.graph.Vertex;
 import org.apache.flink.types.LongValue;
-import org.apache.flink.util.AbstractID;
 
 import static org.apache.flink.api.java.typeutils.ValueTypeInfo.LONG_VALUE_TYPE_INFO;
 
 public class GraphUtils {
 
 	/**
-	 * Convenience method to get the count (number of elements) of a Graph
-	 * as well as the checksum (sum over element hashes). The vertex and
-	 * edge DataSets are processed in a single job and the resultant counts
-	 * and checksums are merged locally.
-	 *
-	 * @param graph Graph over which to compute the count and checksum
-	 * @return the checksum over the vertices and edges
-	 */
-	public static <K, VV, EV> Utils.ChecksumHashCode checksumHashCode(Graph<K, VV,
EV> graph) throws Exception {
-		final String verticesId = new AbstractID().toString();
-		graph.getVertices().output(new Utils.ChecksumHashCodeHelper<Vertex<K, VV>>(verticesId)).name("ChecksumHashCode
vertices");
-
-		final String edgesId = new AbstractID().toString();
-		graph.getEdges().output(new Utils.ChecksumHashCodeHelper<Edge<K, EV>>(edgesId)).name("ChecksumHashCode
edges");
-
-		JobExecutionResult res = graph.getContext().execute();
-
-		Utils.ChecksumHashCode checksum = res.<Utils.ChecksumHashCode>getAccumulatorResult(verticesId);
-		checksum.add(res.<Utils.ChecksumHashCode>getAccumulatorResult(edgesId));
-
-		return checksum;
-	}
-
-	/**
 	 * Count the number of elements in a DataSet.
 	 *
 	 * @param input DataSet of elements to be counted

http://git-wip-us.apache.org/repos/asf/flink/blob/0efa6441/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/filter/undirected/MaximumDegreeTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/filter/undirected/MaximumDegreeTest.java
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/filter/undirected/MaximumDegreeTest.java
index b3a3356..ca96f24 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/filter/undirected/MaximumDegreeTest.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/filter/undirected/MaximumDegreeTest.java
@@ -18,10 +18,10 @@
 
 package org.apache.flink.graph.asm.degree.filter.undirected;
 
-import org.apache.flink.api.java.Utils.ChecksumHashCode;
+import org.apache.flink.api.java.Utils;
 import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.asm.AsmTestBase;
-import org.apache.flink.graph.utils.GraphUtils;
+import org.apache.flink.graph.library.metric.ChecksumHashCode;
 import org.apache.flink.test.util.TestBaseUtils;
 import org.apache.flink.types.IntValue;
 import org.apache.flink.types.LongValue;
@@ -62,8 +62,10 @@ extends AsmTestBase {
 	@Test
 	public void testWithRMatGraph()
 			throws Exception {
-		ChecksumHashCode checksum = GraphUtils.checksumHashCode(undirectedRMatGraph
-			.run(new MaximumDegree<LongValue, NullValue, NullValue>(16)));
+		Utils.ChecksumHashCode checksum = undirectedRMatGraph
+			.run(new MaximumDegree<LongValue, NullValue, NullValue>(16))
+			.run(new ChecksumHashCode<LongValue, NullValue, NullValue>())
+			.execute();
 
 		assertEquals(805, checksum.getCount());
 		assertEquals(0x0000000008028b43L, checksum.getChecksum());

http://git-wip-us.apache.org/repos/asf/flink/blob/0efa6441/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/metric/ChecksumHashCodeTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/metric/ChecksumHashCodeTest.java
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/metric/ChecksumHashCodeTest.java
new file mode 100644
index 0000000..92a5fd0
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/metric/ChecksumHashCodeTest.java
@@ -0,0 +1,48 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one
+ *  * or more contributor license agreements.  See the NOTICE file
+ *  * distributed with this work for additional information
+ *  * regarding copyright ownership.  The ASF licenses this file
+ *  * to you under the Apache License, Version 2.0 (the
+ *  * "License"); you may not use this file except in compliance
+ *  * with the License.  You may obtain a copy of the License at
+ *  *
+ *  *     http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+
+package org.apache.flink.graph.library.metric;
+
+import org.apache.flink.api.java.Utils;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.asm.AsmTestBase;
+import org.apache.flink.graph.test.TestGraphUtils;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class ChecksumHashCodeTest
+extends AsmTestBase {
+
+	@Test
+	public void testSmallGraph() throws Exception {
+		Graph<Long, Long, Long> graph = Graph.fromDataSet(
+			TestGraphUtils.getLongLongVertexData(env),
+			TestGraphUtils.getLongLongEdgeData(env),
+			env);
+
+		Utils.ChecksumHashCode checksum = graph
+			.run(new ChecksumHashCode<Long, Long, Long>())
+			.execute();
+
+		assertEquals(checksum.getCount(), 12L);
+		assertEquals(checksum.getChecksum(), 19665L);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0efa6441/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/util/GraphUtilsITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/util/GraphUtilsITCase.java
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/util/GraphUtilsITCase.java
deleted file mode 100644
index 51602bc..0000000
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/util/GraphUtilsITCase.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.test.util;
-
-import static org.junit.Assert.assertEquals;
-
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.Utils.ChecksumHashCode;
-import org.apache.flink.graph.Graph;
-import org.apache.flink.graph.test.TestGraphUtils;
-import org.apache.flink.graph.utils.GraphUtils;
-import org.apache.flink.test.util.MultipleProgramsTestBase;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-@RunWith(Parameterized.class)
-public class GraphUtilsITCase extends MultipleProgramsTestBase {
-
-	public GraphUtilsITCase(TestExecutionMode mode){
-		super(mode);
-	}
-
-	@Test
-	public void testChecksumHashCodeVerticesAndEdges() throws Exception {
-		/*
-		* Test checksum hashcode
-		*/
-        final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-        Graph<Long, Long, Long> graph = Graph.fromDataSet(
-			TestGraphUtils.getLongLongVertexData(env),
-			TestGraphUtils.getLongLongEdgeData(env),
-			env);
-
-		ChecksumHashCode checksum = GraphUtils.checksumHashCode(graph);
-
-		assertEquals(checksum.getCount(), 12L);
-		assertEquals(checksum.getChecksum(), 19665L);
-    }
-
-}


Mime
View raw message