Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 8E5CA200C13 for ; Mon, 6 Feb 2017 16:45:41 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 8CA56160B53; Mon, 6 Feb 2017 15:45:41 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 98636160B49 for ; Mon, 6 Feb 2017 16:45:39 +0100 (CET) Received: (qmail 63720 invoked by uid 500); 6 Feb 2017 15:45:36 -0000 Mailing-List: contact commits-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list commits@flink.apache.org Received: (qmail 63690 invoked by uid 99); 6 Feb 2017 15:45:35 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 06 Feb 2017 15:45:35 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 2190ADFC15; Mon, 6 Feb 2017 15:45:35 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: greg@apache.org To: commits@flink.apache.org Date: Mon, 06 Feb 2017 15:45:35 -0000 Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: [1/2] flink git commit: [FLINK-5693] [gelly] ChecksumHashCode DataSetAnalytic archived-at: Mon, 06 Feb 2017 15:45:41 -0000 Repository: flink Updated Branches: refs/heads/master fffc8f055 -> 490162259 [FLINK-5693] [gelly] ChecksumHashCode DataSetAnalytic Adds a DataSetAnalytic that checksums and counts elements from a DataSet. This allows DataSetUtils.checksumHashCode to be deprecated. This closes #3244 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a7e5705c Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a7e5705c Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a7e5705c Branch: refs/heads/master Commit: a7e5705c1cd73b54f78348b4f6c3fc7250d7fd84 Parents: fffc8f0 Author: Greg Hogan Authored: Wed Oct 26 15:18:50 2016 -0400 Committer: Greg Hogan Committed: Mon Feb 6 09:44:18 2017 -0500 ---------------------------------------------------------------------- .../graph/asm/dataset/ChecksumHashCode.java | 172 +++++++++++++++++++ .../graph/library/metric/ChecksumHashCode.java | 29 ++-- .../graph/asm/dataset/ChecksumHashCodeTest.java | 57 ++++++ .../annotate/directed/EdgeDegreesPairTest.java | 16 +- .../directed/EdgeSourceDegreesTest.java | 16 +- .../directed/EdgeTargetDegreesTest.java | 16 +- .../annotate/directed/VertexDegreesTest.java | 30 ++-- .../annotate/directed/VertexInDegreeTest.java | 30 ++-- .../annotate/directed/VertexOutDegreeTest.java | 30 ++-- .../annotate/undirected/EdgeDegreePairTest.java | 34 ++-- .../undirected/EdgeSourceDegreeTest.java | 34 ++-- .../undirected/EdgeTargetDegreeTest.java | 34 ++-- .../annotate/undirected/VertexDegreeTest.java | 52 +++--- .../filter/undirected/MaximumDegreeTest.java | 4 +- .../LocalClusteringCoefficientTest.java | 11 +- .../directed/TriangleListingTest.java | 8 +- .../LocalClusteringCoefficientTest.java | 12 +- .../undirected/TriangleListingTest.java | 12 +- .../graph/library/link_analysis/HITSTest.java | 12 +- .../library/metric/ChecksumHashCodeTest.java | 4 +- .../library/similarity/JaccardIndexTest.java | 8 +- 21 files changed, 459 insertions(+), 162 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/a7e5705c/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/dataset/ChecksumHashCode.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/dataset/ChecksumHashCode.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/dataset/ChecksumHashCode.java new file mode 100644 index 0000000..13db7a0 --- /dev/null +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/dataset/ChecksumHashCode.java @@ -0,0 +1,172 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.asm.dataset; + +import org.apache.commons.lang3.builder.EqualsBuilder; +import org.apache.commons.lang3.builder.HashCodeBuilder; +import org.apache.flink.api.common.accumulators.Accumulator; +import org.apache.flink.api.common.accumulators.SimpleAccumulator; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.graph.AnalyticHelper; +import org.apache.flink.graph.asm.dataset.ChecksumHashCode.Checksum; + +import java.io.IOException; + +/** + * Convenience method to get the count (number of elements) of a + * {@link DataSet} as well as the checksum (sum over element hashes). + * + * @param element type + */ +public class ChecksumHashCode +extends AbstractDataSetAnalytic { + + private static final String CHECKSUM = "checksum"; + + private ChecksumHashCodeHelper checksumHashCodeHelper; + + @Override + public ChecksumHashCode run(DataSet input) + throws Exception { + super.run(input); + + checksumHashCodeHelper = new ChecksumHashCodeHelper<>(); + + input + .output(checksumHashCodeHelper) + .name("ChecksumHashCode"); + + return this; + } + + @Override + public Checksum getResult() { + return checksumHashCodeHelper.getAccumulator(env, CHECKSUM); + } + + private static class ChecksumHashCodeHelper + extends AnalyticHelper { + private long count; + private long checksum; + + @Override + public void writeRecord(U record) throws IOException { + count++; + // convert 32-bit integer to non-negative long + checksum += record.hashCode() & 0xffffffffL; + } + + @Override + public void close() throws IOException { + addAccumulator(CHECKSUM, new Checksum(count, checksum)); + } + } + + public static class Checksum + implements SimpleAccumulator { + private long count; + + private long checksum; + + /** + * Instantiate an immutable result. + * + * @param count count + * @param checksum checksum + */ + public Checksum(long count, long checksum) { + this.count = count; + this.checksum = checksum; + } + + /** + * Get the number of elements. + * + * @return number of elements + */ + public long getCount() { + return count; + } + + /** + * Get the checksum over the hash() of elements. + * + * @return checksum + */ + public long getChecksum() { + return checksum; + } + + @Override + public String toString() { + return String.format("ChecksumHashCode 0x%016x, count %d", this.checksum, this.count); + } + + @Override + public int hashCode() { + return new HashCodeBuilder() + .append(count) + .append(checksum) + .hashCode(); + } + + @Override + public boolean equals(Object obj) { + if (obj == null) { return false; } + if (obj == this) { return true; } + if (obj.getClass() != getClass()) { return false; } + + Checksum rhs = (Checksum)obj; + + return new EqualsBuilder() + .append(count, rhs.count) + .append(checksum, rhs.checksum) + .isEquals(); + } + + // Methods implementing SimpleAccumulator + + @Override + public void add(Checksum value) { + count += value.count; + checksum += value.checksum; + } + + @Override + public Checksum getLocalValue() { + return this; + } + + @Override + public void resetLocal() { + count = 0; + checksum = 0; + } + + @Override + public void merge(Accumulator other) { + add(other.getLocalValue()); + } + + @Override + public Accumulator clone() { + return new Checksum(count, checksum); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/a7e5705c/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/ChecksumHashCode.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/ChecksumHashCode.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/ChecksumHashCode.java index 5ba5a66..d2eeb41 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/ChecksumHashCode.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/ChecksumHashCode.java @@ -18,13 +18,11 @@ package org.apache.flink.graph.library.metric; -import org.apache.flink.api.common.JobExecutionResult; -import org.apache.flink.api.java.Utils; import org.apache.flink.graph.AbstractGraphAnalytic; import org.apache.flink.graph.Edge; import org.apache.flink.graph.Graph; import org.apache.flink.graph.Vertex; -import org.apache.flink.util.AbstractID; +import org.apache.flink.graph.asm.dataset.ChecksumHashCode.Checksum; /** * Convenience method to get the count (number of elements) of a Graph @@ -37,35 +35,30 @@ import org.apache.flink.util.AbstractID; * @param edge value type */ public class ChecksumHashCode -extends AbstractGraphAnalytic { +extends AbstractGraphAnalytic { - private String verticesId = new AbstractID().toString(); + private org.apache.flink.graph.asm.dataset.ChecksumHashCode> vertexChecksum; - private String edgesId = new AbstractID().toString(); + private org.apache.flink.graph.asm.dataset.ChecksumHashCode> edgeChecksum; @Override public ChecksumHashCode run(Graph input) throws Exception { super.run(input); - input - .getVertices() - .output(new Utils.ChecksumHashCodeHelper>(verticesId)) - .name("ChecksumHashCode vertices"); + vertexChecksum = new org.apache.flink.graph.asm.dataset.ChecksumHashCode<>(); + vertexChecksum.run(input.getVertices()); - input - .getEdges() - .output(new Utils.ChecksumHashCodeHelper>(edgesId)) - .name("ChecksumHashCode edges"); + edgeChecksum = new org.apache.flink.graph.asm.dataset.ChecksumHashCode<>(); + edgeChecksum.run(input.getEdges()); return this; } @Override - public Utils.ChecksumHashCode getResult() { - JobExecutionResult res = env.getLastJobExecutionResult(); - Utils.ChecksumHashCode checksum = res.getAccumulatorResult(verticesId); - checksum.add(res.getAccumulatorResult(edgesId)); + public Checksum getResult() { + Checksum checksum = vertexChecksum.getResult(); + checksum.add(edgeChecksum.getResult()); return checksum; } } http://git-wip-us.apache.org/repos/asf/flink/blob/a7e5705c/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/dataset/ChecksumHashCodeTest.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/dataset/ChecksumHashCodeTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/dataset/ChecksumHashCodeTest.java new file mode 100644 index 0000000..d25f9b6 --- /dev/null +++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/dataset/ChecksumHashCodeTest.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.asm.dataset; + +import org.apache.commons.lang.ArrayUtils; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.graph.asm.dataset.ChecksumHashCode.Checksum; +import org.junit.Before; +import org.junit.Test; + +import java.util.Arrays; +import java.util.List; + +import static org.junit.Assert.assertEquals; + +public class ChecksumHashCodeTest { + + private ExecutionEnvironment env; + + @Before + public void setup() + throws Exception { + env = ExecutionEnvironment.createCollectionsEnvironment(); + env.getConfig().enableObjectReuse(); + } + + @Test + public void testChecksumHashCode() + throws Exception { + List list = Arrays.asList(ArrayUtils.toObject( + new long[]{ 0, 1, 2, 3, 4, 5, 6, 7, 8, 9 })); + + DataSet dataset = env.fromCollection(list); + + Checksum checksum = new ChecksumHashCode().run(dataset).execute(); + + assertEquals(list.size(), checksum.getCount()); + assertEquals(list.size() * (list.size() - 1) / 2, checksum.getChecksum()); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/a7e5705c/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeDegreesPairTest.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeDegreesPairTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeDegreesPairTest.java index 485794c..0540f14 100644 --- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeDegreesPairTest.java +++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeDegreesPairTest.java @@ -19,11 +19,11 @@ package org.apache.flink.graph.asm.degree.annotate.directed; import org.apache.flink.api.java.DataSet; -import org.apache.flink.api.java.Utils.ChecksumHashCode; import org.apache.flink.api.java.tuple.Tuple3; -import org.apache.flink.api.java.utils.DataSetUtils; import org.apache.flink.graph.Edge; import org.apache.flink.graph.asm.AsmTestBase; +import org.apache.flink.graph.asm.dataset.ChecksumHashCode; +import org.apache.flink.graph.asm.dataset.ChecksumHashCode.Checksum; import org.apache.flink.graph.asm.degree.annotate.directed.VertexDegrees.Degrees; import org.apache.flink.test.util.TestBaseUtils; import org.apache.flink.types.IntValue; @@ -48,17 +48,21 @@ extends AsmTestBase { "(3,4,((null),(4,2,2),(1,0,1)))\n" + "(5,3,((null),(1,1,0),(4,2,2)))"; - DataSet>> degrees = directedSimpleGraph + DataSet>> degreesPair = directedSimpleGraph .run(new EdgeDegreesPair()); - TestBaseUtils.compareResultAsText(degrees.collect(), expectedResult); + TestBaseUtils.compareResultAsText(degreesPair.collect(), expectedResult); } @Test public void testWithRMatGraph() throws Exception { - ChecksumHashCode checksum = DataSetUtils.checksumHashCode(directedRMatGraph - .run(new EdgeDegreesPair())); + DataSet>> degreesPair = directedRMatGraph + .run(new EdgeDegreesPair()); + + Checksum checksum = new ChecksumHashCode>>() + .run(degreesPair) + .execute(); assertEquals(12009, checksum.getCount()); assertEquals(0x0000176fe94702a3L, checksum.getChecksum()); http://git-wip-us.apache.org/repos/asf/flink/blob/a7e5705c/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeSourceDegreesTest.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeSourceDegreesTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeSourceDegreesTest.java index 454e8d1..8d52889 100644 --- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeSourceDegreesTest.java +++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeSourceDegreesTest.java @@ -19,11 +19,11 @@ package org.apache.flink.graph.asm.degree.annotate.directed; import org.apache.flink.api.java.DataSet; -import org.apache.flink.api.java.Utils.ChecksumHashCode; import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.api.java.utils.DataSetUtils; import org.apache.flink.graph.Edge; import org.apache.flink.graph.asm.AsmTestBase; +import org.apache.flink.graph.asm.dataset.ChecksumHashCode; +import org.apache.flink.graph.asm.dataset.ChecksumHashCode.Checksum; import org.apache.flink.graph.asm.degree.annotate.directed.VertexDegrees.Degrees; import org.apache.flink.test.util.TestBaseUtils; import org.apache.flink.types.IntValue; @@ -48,17 +48,21 @@ extends AsmTestBase { "(3,4,((null),(4,2,2)))\n" + "(5,3,((null),(1,1,0)))"; - DataSet>> degrees = directedSimpleGraph + DataSet>> sourceDegrees = directedSimpleGraph .run(new EdgeSourceDegrees()); - TestBaseUtils.compareResultAsText(degrees.collect(), expectedResult); + TestBaseUtils.compareResultAsText(sourceDegrees.collect(), expectedResult); } @Test public void testWithRMatGraph() throws Exception { - ChecksumHashCode checksum = DataSetUtils.checksumHashCode(directedRMatGraph - .run(new EdgeSourceDegrees())); + DataSet>> sourceDegrees = directedRMatGraph + .run(new EdgeSourceDegrees()); + + Checksum checksum = new ChecksumHashCode>>() + .run(sourceDegrees) + .execute(); assertEquals(12009, checksum.getCount()); assertEquals(0x0000162435fde1d9L, checksum.getChecksum()); http://git-wip-us.apache.org/repos/asf/flink/blob/a7e5705c/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeTargetDegreesTest.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeTargetDegreesTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeTargetDegreesTest.java index 7add46b..eb0b892 100644 --- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeTargetDegreesTest.java +++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeTargetDegreesTest.java @@ -19,11 +19,11 @@ package org.apache.flink.graph.asm.degree.annotate.directed; import org.apache.flink.api.java.DataSet; -import org.apache.flink.api.java.Utils.ChecksumHashCode; import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.api.java.utils.DataSetUtils; import org.apache.flink.graph.Edge; import org.apache.flink.graph.asm.AsmTestBase; +import org.apache.flink.graph.asm.dataset.ChecksumHashCode; +import org.apache.flink.graph.asm.dataset.ChecksumHashCode.Checksum; import org.apache.flink.graph.asm.degree.annotate.directed.VertexDegrees.Degrees; import org.apache.flink.test.util.TestBaseUtils; import org.apache.flink.types.IntValue; @@ -48,17 +48,21 @@ extends AsmTestBase { "(3,4,((null),(1,0,1)))\n" + "(5,3,((null),(4,2,2)))"; - DataSet>> degrees = directedSimpleGraph + DataSet>> targetDegrees = directedSimpleGraph .run(new EdgeTargetDegrees()); - TestBaseUtils.compareResultAsText(degrees.collect(), expectedResult); + TestBaseUtils.compareResultAsText(targetDegrees.collect(), expectedResult); } @Test public void testWithRMatGraph() throws Exception { - ChecksumHashCode checksum = DataSetUtils.checksumHashCode(directedRMatGraph - .run(new EdgeTargetDegrees())); + DataSet>> targetDegrees = directedRMatGraph + .run(new EdgeTargetDegrees()); + + Checksum checksum = new ChecksumHashCode>>() + .run(targetDegrees) + .execute(); assertEquals(12009, checksum.getCount()); assertEquals(0x0000160af450cc81L, checksum.getChecksum()); http://git-wip-us.apache.org/repos/asf/flink/blob/a7e5705c/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexDegreesTest.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexDegreesTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexDegreesTest.java index e635bfa..06737b5 100644 --- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexDegreesTest.java +++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexDegreesTest.java @@ -19,10 +19,10 @@ package org.apache.flink.graph.asm.degree.annotate.directed; import org.apache.flink.api.java.DataSet; -import org.apache.flink.api.java.Utils.ChecksumHashCode; -import org.apache.flink.api.java.utils.DataSetUtils; import org.apache.flink.graph.Vertex; import org.apache.flink.graph.asm.AsmTestBase; +import org.apache.flink.graph.asm.dataset.ChecksumHashCode; +import org.apache.flink.graph.asm.dataset.ChecksumHashCode.Checksum; import org.apache.flink.graph.asm.degree.annotate.directed.VertexDegrees.Degrees; import org.apache.flink.test.util.TestBaseUtils; import org.apache.flink.types.IntValue; @@ -38,7 +38,7 @@ extends AsmTestBase { @Test public void testWithSimpleDirectedGraph() throws Exception { - DataSet> vertexDegrees = directedSimpleGraph + DataSet> degrees = directedSimpleGraph .run(new VertexDegrees()); String expectedResult = @@ -49,13 +49,13 @@ extends AsmTestBase { "(4,(1,0,1))\n" + "(5,(1,1,0))"; - TestBaseUtils.compareResultAsText(vertexDegrees.collect(), expectedResult); + TestBaseUtils.compareResultAsText(degrees.collect(), expectedResult); } @Test public void testWithSimpleUndirectedGraph() throws Exception { - DataSet> vertexDegrees = undirectedSimpleGraph + DataSet> degrees = undirectedSimpleGraph .run(new VertexDegrees()); String expectedResult = @@ -66,21 +66,21 @@ extends AsmTestBase { "(4,(1,1,1))\n" + "(5,(1,1,1))"; - TestBaseUtils.compareResultAsText(vertexDegrees.collect(), expectedResult); + TestBaseUtils.compareResultAsText(degrees.collect(), expectedResult); } @Test public void testWithEmptyGraph() throws Exception { - DataSet> vertexDegrees; + DataSet> degrees; - vertexDegrees = emptyGraph + degrees = emptyGraph .run(new VertexDegrees() .setIncludeZeroDegreeVertices(false)); - assertEquals(0, vertexDegrees.collect().size()); + assertEquals(0, degrees.collect().size()); - vertexDegrees = emptyGraph + degrees = emptyGraph .run(new VertexDegrees() .setIncludeZeroDegreeVertices(true)); @@ -89,14 +89,18 @@ extends AsmTestBase { "(1,(0,0,0))\n" + "(2,(0,0,0))"; - TestBaseUtils.compareResultAsText(vertexDegrees.collect(), expectedResult); + TestBaseUtils.compareResultAsText(degrees.collect(), expectedResult); } @Test public void testWithRMatGraph() throws Exception { - ChecksumHashCode checksum = DataSetUtils.checksumHashCode(directedRMatGraph - .run(new VertexDegrees())); + DataSet> degrees = directedRMatGraph + .run(new VertexDegrees()); + + Checksum checksum = new ChecksumHashCode>() + .run(degrees) + .execute(); assertEquals(902, checksum.getCount()); assertEquals(0x000001a3305dd86aL, checksum.getChecksum()); http://git-wip-us.apache.org/repos/asf/flink/blob/a7e5705c/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexInDegreeTest.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexInDegreeTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexInDegreeTest.java index c324106..95f83853 100644 --- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexInDegreeTest.java +++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexInDegreeTest.java @@ -19,10 +19,10 @@ package org.apache.flink.graph.asm.degree.annotate.directed; import org.apache.flink.api.java.DataSet; -import org.apache.flink.api.java.Utils.ChecksumHashCode; -import org.apache.flink.api.java.utils.DataSetUtils; import org.apache.flink.graph.Vertex; import org.apache.flink.graph.asm.AsmTestBase; +import org.apache.flink.graph.asm.dataset.ChecksumHashCode; +import org.apache.flink.graph.asm.dataset.ChecksumHashCode.Checksum; import org.apache.flink.test.util.TestBaseUtils; import org.apache.flink.types.IntValue; import org.apache.flink.types.LongValue; @@ -37,7 +37,7 @@ extends AsmTestBase { @Test public void testWithSimpleGraph() throws Exception { - DataSet> vertexDegrees = directedSimpleGraph + DataSet> inDegree = directedSimpleGraph .run(new VertexInDegree() .setIncludeZeroDegreeVertices(true)); @@ -49,21 +49,21 @@ extends AsmTestBase { "(4,1)\n" + "(5,0)"; - TestBaseUtils.compareResultAsText(vertexDegrees.collect(), expectedResult); + TestBaseUtils.compareResultAsText(inDegree.collect(), expectedResult); } @Test public void testWithEmptyGraph() throws Exception { - DataSet> vertexDegrees; + DataSet> inDegree; - vertexDegrees = emptyGraph + inDegree = emptyGraph .run(new VertexInDegree() .setIncludeZeroDegreeVertices(false)); - assertEquals(0, vertexDegrees.collect().size()); + assertEquals(0, inDegree.collect().size()); - vertexDegrees = emptyGraph + inDegree = emptyGraph .run(new VertexInDegree() .setIncludeZeroDegreeVertices(true)); @@ -72,17 +72,21 @@ extends AsmTestBase { "(1,0)\n" + "(2,0)"; - TestBaseUtils.compareResultAsText(vertexDegrees.collect(), expectedResult); + TestBaseUtils.compareResultAsText(inDegree.collect(), expectedResult); } @Test public void testWithRMatGraph() throws Exception { - ChecksumHashCode inDegreeChecksum = DataSetUtils.checksumHashCode(directedRMatGraph + DataSet> inDegree = directedRMatGraph .run(new VertexInDegree() - .setIncludeZeroDegreeVertices(true))); + .setIncludeZeroDegreeVertices(true)); + + Checksum checksum = new ChecksumHashCode>() + .run(inDegree) + .execute(); - assertEquals(902, inDegreeChecksum.getCount()); - assertEquals(0x0000000000e1d885L, inDegreeChecksum.getChecksum()); + assertEquals(902, checksum.getCount()); + assertEquals(0x0000000000e1d885L, checksum.getChecksum()); } } http://git-wip-us.apache.org/repos/asf/flink/blob/a7e5705c/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexOutDegreeTest.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexOutDegreeTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexOutDegreeTest.java index 8b5e981..7da3d8d 100644 --- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexOutDegreeTest.java +++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexOutDegreeTest.java @@ -19,10 +19,10 @@ package org.apache.flink.graph.asm.degree.annotate.directed; import org.apache.flink.api.java.DataSet; -import org.apache.flink.api.java.Utils.ChecksumHashCode; -import org.apache.flink.api.java.utils.DataSetUtils; import org.apache.flink.graph.Vertex; import org.apache.flink.graph.asm.AsmTestBase; +import org.apache.flink.graph.asm.dataset.ChecksumHashCode; +import org.apache.flink.graph.asm.dataset.ChecksumHashCode.Checksum; import org.apache.flink.test.util.TestBaseUtils; import org.apache.flink.types.IntValue; import org.apache.flink.types.LongValue; @@ -37,7 +37,7 @@ extends AsmTestBase { @Test public void testWithSimpleGraph() throws Exception { - DataSet> vertexDegrees = directedSimpleGraph + DataSet> outDegree = directedSimpleGraph .run(new VertexOutDegree() .setIncludeZeroDegreeVertices(true)); @@ -49,21 +49,21 @@ extends AsmTestBase { "(4,0)\n" + "(5,1)"; - TestBaseUtils.compareResultAsText(vertexDegrees.collect(), expectedResult); + TestBaseUtils.compareResultAsText(outDegree.collect(), expectedResult); } @Test public void testWithEmptyGraph() throws Exception { - DataSet> vertexDegrees; + DataSet> outDegree; - vertexDegrees = emptyGraph + outDegree = emptyGraph .run(new VertexOutDegree() .setIncludeZeroDegreeVertices(false)); - assertEquals(0, vertexDegrees.collect().size()); + assertEquals(0, outDegree.collect().size()); - vertexDegrees = emptyGraph + outDegree = emptyGraph .run(new VertexOutDegree() .setIncludeZeroDegreeVertices(true)); @@ -72,17 +72,21 @@ extends AsmTestBase { "(1,0)\n" + "(2,0)"; - TestBaseUtils.compareResultAsText(vertexDegrees.collect(), expectedResult); + TestBaseUtils.compareResultAsText(outDegree.collect(), expectedResult); } @Test public void testWithRMatGraph() throws Exception { - ChecksumHashCode outDegreeChecksum = DataSetUtils.checksumHashCode(directedRMatGraph + DataSet> outDegree = directedRMatGraph .run(new VertexOutDegree() - .setIncludeZeroDegreeVertices(true))); + .setIncludeZeroDegreeVertices(true)); + + Checksum checksum = new ChecksumHashCode>() + .run(outDegree) + .execute(); - assertEquals(902, outDegreeChecksum.getCount()); - assertEquals(0x0000000000e1d885L, outDegreeChecksum.getChecksum()); + assertEquals(902, checksum.getCount()); + assertEquals(0x0000000000e1d885L, checksum.getChecksum()); } } http://git-wip-us.apache.org/repos/asf/flink/blob/a7e5705c/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeDegreePairTest.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeDegreePairTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeDegreePairTest.java index 3a8636a..476b3fe 100644 --- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeDegreePairTest.java +++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeDegreePairTest.java @@ -19,11 +19,11 @@ package org.apache.flink.graph.asm.degree.annotate.undirected; import org.apache.flink.api.java.DataSet; -import org.apache.flink.api.java.Utils.ChecksumHashCode; import org.apache.flink.api.java.tuple.Tuple3; -import org.apache.flink.api.java.utils.DataSetUtils; import org.apache.flink.graph.Edge; import org.apache.flink.graph.asm.AsmTestBase; +import org.apache.flink.graph.asm.dataset.ChecksumHashCode; +import org.apache.flink.graph.asm.dataset.ChecksumHashCode.Checksum; import org.apache.flink.test.util.TestBaseUtils; import org.apache.flink.types.IntValue; import org.apache.flink.types.LongValue; @@ -54,31 +54,39 @@ extends AsmTestBase { "(4,3,((null),1,4))\n" + "(5,3,((null),1,4))"; - DataSet>> sourceDegree = undirectedSimpleGraph + DataSet>> degreePairOnSourceId = undirectedSimpleGraph .run(new EdgeDegreePair()); - TestBaseUtils.compareResultAsText(sourceDegree.collect(), expectedResult); + TestBaseUtils.compareResultAsText(degreePairOnSourceId.collect(), expectedResult); - DataSet>> targetDegree = undirectedSimpleGraph + DataSet>> degreePairOnTargetId = undirectedSimpleGraph .run(new EdgeDegreePair() .setReduceOnTargetId(true)); - TestBaseUtils.compareResultAsText(targetDegree.collect(), expectedResult); + TestBaseUtils.compareResultAsText(degreePairOnTargetId.collect(), expectedResult); } @Test public void testWithRMatGraph() throws Exception { - ChecksumHashCode sourceDegreeChecksum = DataSetUtils.checksumHashCode(undirectedRMatGraph - .run(new EdgeDegreePair())); + DataSet>> degreePairOnSourceId = undirectedRMatGraph + .run(new EdgeDegreePair()); - assertEquals(20884, sourceDegreeChecksum.getCount()); - assertEquals(0x00000001e051efe4L, sourceDegreeChecksum.getChecksum()); + Checksum checksumOnSourceId = new ChecksumHashCode>>() + .run(degreePairOnSourceId) + .execute(); - ChecksumHashCode targetDegreeChecksum = DataSetUtils.checksumHashCode(undirectedRMatGraph + assertEquals(20884, checksumOnSourceId.getCount()); + assertEquals(0x00000001e051efe4L, checksumOnSourceId.getChecksum()); + + DataSet>> degreePairOnTargetId = undirectedRMatGraph .run(new EdgeDegreePair() - .setReduceOnTargetId(true))); + .setReduceOnTargetId(true)); + + Checksum checksumOnTargetId = new ChecksumHashCode>>() + .run(degreePairOnTargetId) + .execute(); - assertEquals(sourceDegreeChecksum, targetDegreeChecksum); + assertEquals(checksumOnSourceId, checksumOnTargetId); } } http://git-wip-us.apache.org/repos/asf/flink/blob/a7e5705c/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeSourceDegreeTest.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeSourceDegreeTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeSourceDegreeTest.java index 9671461..0dc2178 100644 --- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeSourceDegreeTest.java +++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeSourceDegreeTest.java @@ -19,11 +19,11 @@ package org.apache.flink.graph.asm.degree.annotate.undirected; import org.apache.flink.api.java.DataSet; -import org.apache.flink.api.java.Utils.ChecksumHashCode; import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.api.java.utils.DataSetUtils; import org.apache.flink.graph.Edge; import org.apache.flink.graph.asm.AsmTestBase; +import org.apache.flink.graph.asm.dataset.ChecksumHashCode; +import org.apache.flink.graph.asm.dataset.ChecksumHashCode.Checksum; import org.apache.flink.test.util.TestBaseUtils; import org.apache.flink.types.IntValue; import org.apache.flink.types.LongValue; @@ -54,31 +54,39 @@ extends AsmTestBase { "(4,3,((null),1))\n" + "(5,3,((null),1))"; - DataSet>> sourceDegree = undirectedSimpleGraph + DataSet>> sourceDegreeOnSourceId = undirectedSimpleGraph .run(new EdgeSourceDegree()); - TestBaseUtils.compareResultAsText(sourceDegree.collect(), expectedResult); + TestBaseUtils.compareResultAsText(sourceDegreeOnSourceId.collect(), expectedResult); - DataSet>> targetDegree = undirectedSimpleGraph + DataSet>> sourceDegreeOnTargetId = undirectedSimpleGraph .run(new EdgeSourceDegree() .setReduceOnTargetId(true)); - TestBaseUtils.compareResultAsText(targetDegree.collect(), expectedResult); + TestBaseUtils.compareResultAsText(sourceDegreeOnTargetId.collect(), expectedResult); } @Test public void testWithRMatGraph() throws Exception { - ChecksumHashCode sourceDegreeChecksum = DataSetUtils.checksumHashCode(undirectedRMatGraph - .run(new EdgeSourceDegree())); + DataSet>> sourceDegreeOnSourceId = undirectedRMatGraph + .run(new EdgeSourceDegree()); - assertEquals(20884, sourceDegreeChecksum.getCount()); - assertEquals(0x000000019d8f0070L, sourceDegreeChecksum.getChecksum()); + Checksum checksumOnSourceId = new ChecksumHashCode>>() + .run(sourceDegreeOnSourceId) + .execute(); - ChecksumHashCode targetDegreeChecksum = DataSetUtils.checksumHashCode(undirectedRMatGraph + assertEquals(20884, checksumOnSourceId.getCount()); + assertEquals(0x000000019d8f0070L, checksumOnSourceId.getChecksum()); + + DataSet>> sourceDegreeOnTargetId = undirectedRMatGraph .run(new EdgeSourceDegree() - .setReduceOnTargetId(true))); + .setReduceOnTargetId(true)); + + Checksum checksumOnTargetId = new ChecksumHashCode>>() + .run(sourceDegreeOnTargetId) + .execute(); - assertEquals(sourceDegreeChecksum, targetDegreeChecksum); + assertEquals(checksumOnTargetId, checksumOnTargetId); } } http://git-wip-us.apache.org/repos/asf/flink/blob/a7e5705c/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeTargetDegreeTest.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeTargetDegreeTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeTargetDegreeTest.java index 54f2063..b14ddc0 100644 --- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeTargetDegreeTest.java +++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeTargetDegreeTest.java @@ -19,11 +19,11 @@ package org.apache.flink.graph.asm.degree.annotate.undirected; import org.apache.flink.api.java.DataSet; -import org.apache.flink.api.java.Utils.ChecksumHashCode; import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.api.java.utils.DataSetUtils; import org.apache.flink.graph.Edge; import org.apache.flink.graph.asm.AsmTestBase; +import org.apache.flink.graph.asm.dataset.ChecksumHashCode; +import org.apache.flink.graph.asm.dataset.ChecksumHashCode.Checksum; import org.apache.flink.test.util.TestBaseUtils; import org.apache.flink.types.IntValue; import org.apache.flink.types.LongValue; @@ -54,31 +54,39 @@ extends AsmTestBase { "(4,3,((null),4))\n" + "(5,3,((null),4))"; - DataSet>> sourceDegree = undirectedSimpleGraph + DataSet>> targetDegreeOnTargetId = undirectedSimpleGraph .run(new EdgeTargetDegree()); - TestBaseUtils.compareResultAsText(sourceDegree.collect(), expectedResult); + TestBaseUtils.compareResultAsText(targetDegreeOnTargetId.collect(), expectedResult); - DataSet>> targetDegree = undirectedSimpleGraph + DataSet>> targetDegreeOnSourceId = undirectedSimpleGraph .run(new EdgeTargetDegree() .setReduceOnSourceId(true)); - TestBaseUtils.compareResultAsText(targetDegree.collect(), expectedResult); + TestBaseUtils.compareResultAsText(targetDegreeOnSourceId.collect(), expectedResult); } @Test public void testWithRMatGraph() throws Exception { - ChecksumHashCode sourceDegreeChecksum = DataSetUtils.checksumHashCode(undirectedRMatGraph - .run(new EdgeSourceDegree())); + DataSet>> targetDegreeOnTargetId = undirectedRMatGraph + .run(new EdgeSourceDegree()); - assertEquals(20884, sourceDegreeChecksum.getCount()); - assertEquals(0x000000019d8f0070L, sourceDegreeChecksum.getChecksum()); + Checksum checksumOnTargetId = new ChecksumHashCode>>() + .run(targetDegreeOnTargetId) + .execute(); - ChecksumHashCode targetDegreeChecksum = DataSetUtils.checksumHashCode(undirectedRMatGraph + assertEquals(20884, checksumOnTargetId.getCount()); + assertEquals(0x000000019d8f0070L, checksumOnTargetId.getChecksum()); + + DataSet>> targetDegreeOnSourceId = undirectedRMatGraph .run(new EdgeTargetDegree() - .setReduceOnSourceId(true))); + .setReduceOnSourceId(true)); + + Checksum checksumOnSourceId = new ChecksumHashCode>>() + .run(targetDegreeOnSourceId) + .execute(); - assertEquals(sourceDegreeChecksum, targetDegreeChecksum); + assertEquals(checksumOnTargetId, checksumOnSourceId); } } http://git-wip-us.apache.org/repos/asf/flink/blob/a7e5705c/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/undirected/VertexDegreeTest.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/undirected/VertexDegreeTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/undirected/VertexDegreeTest.java index 37e1bb9..102beae 100644 --- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/undirected/VertexDegreeTest.java +++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/undirected/VertexDegreeTest.java @@ -19,10 +19,10 @@ package org.apache.flink.graph.asm.degree.annotate.undirected; import org.apache.flink.api.java.DataSet; -import org.apache.flink.api.java.Utils.ChecksumHashCode; -import org.apache.flink.api.java.utils.DataSetUtils; import org.apache.flink.graph.Vertex; import org.apache.flink.graph.asm.AsmTestBase; +import org.apache.flink.graph.asm.dataset.ChecksumHashCode; +import org.apache.flink.graph.asm.dataset.ChecksumHashCode.Checksum; import org.apache.flink.test.util.TestBaseUtils; import org.apache.flink.types.IntValue; import org.apache.flink.types.LongValue; @@ -45,16 +45,16 @@ extends AsmTestBase { "(4,1)\n" + "(5,1)"; - DataSet> sourceDegrees = undirectedSimpleGraph + DataSet> degreeOnSourceId = undirectedSimpleGraph .run(new VertexDegree()); - TestBaseUtils.compareResultAsText(sourceDegrees.collect(), expectedResult); + TestBaseUtils.compareResultAsText(degreeOnSourceId.collect(), expectedResult); - DataSet> targetDegrees = undirectedSimpleGraph + DataSet> degreeOnTargetId = undirectedSimpleGraph .run(new VertexDegree() .setReduceOnTargetId(true)); - TestBaseUtils.compareResultAsText(targetDegrees.collect(), expectedResult); + TestBaseUtils.compareResultAsText(degreeOnTargetId.collect(), expectedResult); } @Test @@ -62,18 +62,18 @@ extends AsmTestBase { throws Exception { long expectedDegree = completeGraphVertexCount - 1; - DataSet> sourceDegrees = completeGraph + DataSet> degreeOnSourceId = completeGraph .run(new VertexDegree()); - for (Vertex vertex : sourceDegrees.collect()) { + for (Vertex vertex : degreeOnSourceId.collect()) { assertEquals(expectedDegree, vertex.getValue().getValue()); } - DataSet> targetDegrees = completeGraph + DataSet> degreeOnTargetId = completeGraph .run(new VertexDegree() .setReduceOnTargetId(true)); - for (Vertex vertex : targetDegrees.collect()) { + for (Vertex vertex : degreeOnTargetId.collect()) { assertEquals(expectedDegree, vertex.getValue().getValue()); } } @@ -81,15 +81,15 @@ extends AsmTestBase { @Test public void testWithEmptyGraph() throws Exception { - DataSet> vertexDegrees; + DataSet> degree; - vertexDegrees = emptyGraph + degree = emptyGraph .run(new VertexDegree() .setIncludeZeroDegreeVertices(false)); - assertEquals(0, vertexDegrees.collect().size()); + assertEquals(0, degree.collect().size()); - vertexDegrees = emptyGraph + degree = emptyGraph .run(new VertexDegree() .setIncludeZeroDegreeVertices(true)); @@ -98,22 +98,30 @@ extends AsmTestBase { "(1,0)\n" + "(2,0)"; - TestBaseUtils.compareResultAsText(vertexDegrees.collect(), expectedResult); + TestBaseUtils.compareResultAsText(degree.collect(), expectedResult); } @Test public void testWithRMatGraph() throws Exception { - ChecksumHashCode sourceDegreeChecksum = DataSetUtils.checksumHashCode(undirectedRMatGraph - .run(new VertexDegree())); + DataSet> degreeOnSourceId = undirectedRMatGraph + .run(new VertexDegree()); + + Checksum checksumOnSourceId = new ChecksumHashCode>() + .run(degreeOnSourceId) + .execute(); - assertEquals(902, sourceDegreeChecksum.getCount()); - assertEquals(0x0000000000e1fb30L, sourceDegreeChecksum.getChecksum()); + assertEquals(902, checksumOnSourceId.getCount()); + assertEquals(0x0000000000e1fb30L, checksumOnSourceId.getChecksum()); - ChecksumHashCode targetDegreeChecksum = DataSetUtils.checksumHashCode(undirectedRMatGraph + DataSet> degreeOnTargetId = undirectedRMatGraph .run(new VertexDegree() - .setReduceOnTargetId(true))); + .setReduceOnTargetId(true)); + + Checksum checksumOnTargetId = new ChecksumHashCode>() + .run(degreeOnTargetId) + .execute(); - assertEquals(sourceDegreeChecksum, targetDegreeChecksum); + assertEquals(checksumOnSourceId, checksumOnTargetId); } } http://git-wip-us.apache.org/repos/asf/flink/blob/a7e5705c/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/filter/undirected/MaximumDegreeTest.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/filter/undirected/MaximumDegreeTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/filter/undirected/MaximumDegreeTest.java index ca96f24..55f7743 100644 --- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/filter/undirected/MaximumDegreeTest.java +++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/filter/undirected/MaximumDegreeTest.java @@ -18,9 +18,9 @@ package org.apache.flink.graph.asm.degree.filter.undirected; -import org.apache.flink.api.java.Utils; import org.apache.flink.graph.Graph; import org.apache.flink.graph.asm.AsmTestBase; +import org.apache.flink.graph.asm.dataset.ChecksumHashCode.Checksum; import org.apache.flink.graph.library.metric.ChecksumHashCode; import org.apache.flink.test.util.TestBaseUtils; import org.apache.flink.types.IntValue; @@ -62,7 +62,7 @@ extends AsmTestBase { @Test public void testWithRMatGraph() throws Exception { - Utils.ChecksumHashCode checksum = undirectedRMatGraph + Checksum checksum = undirectedRMatGraph .run(new MaximumDegree(16)) .run(new ChecksumHashCode()) .execute(); http://git-wip-us.apache.org/repos/asf/flink/blob/a7e5705c/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/clustering/directed/LocalClusteringCoefficientTest.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/clustering/directed/LocalClusteringCoefficientTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/clustering/directed/LocalClusteringCoefficientTest.java index d8d93ad..77d9dba 100644 --- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/clustering/directed/LocalClusteringCoefficientTest.java +++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/clustering/directed/LocalClusteringCoefficientTest.java @@ -20,9 +20,8 @@ package org.apache.flink.graph.library.clustering.directed; import org.apache.commons.math3.util.CombinatoricsUtils; import org.apache.flink.api.java.DataSet; -import org.apache.flink.api.java.Utils.ChecksumHashCode; -import org.apache.flink.api.java.utils.DataSetUtils; import org.apache.flink.graph.asm.AsmTestBase; +import org.apache.flink.graph.asm.dataset.ChecksumHashCode.Checksum; import org.apache.flink.graph.library.clustering.directed.LocalClusteringCoefficient.Result; import org.apache.flink.test.util.TestBaseUtils; import org.apache.flink.types.IntValue; @@ -76,8 +75,12 @@ extends AsmTestBase { @Test public void testRMatGraph() throws Exception { - ChecksumHashCode checksum = DataSetUtils.checksumHashCode(directedRMatGraph - .run(new LocalClusteringCoefficient())); + DataSet> cc = directedRMatGraph + .run(new LocalClusteringCoefficient()); + + Checksum checksum = new org.apache.flink.graph.asm.dataset.ChecksumHashCode>() + .run(cc) + .execute(); assertEquals(902, checksum.getCount()); assertEquals(0x000001bf83866775L, checksum.getChecksum()); http://git-wip-us.apache.org/repos/asf/flink/blob/a7e5705c/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/clustering/directed/TriangleListingTest.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/clustering/directed/TriangleListingTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/clustering/directed/TriangleListingTest.java index 3c86358..6ae9b90 100644 --- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/clustering/directed/TriangleListingTest.java +++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/clustering/directed/TriangleListingTest.java @@ -20,9 +20,9 @@ package org.apache.flink.graph.library.clustering.directed; import org.apache.commons.math3.util.CombinatoricsUtils; import org.apache.flink.api.java.DataSet; -import org.apache.flink.api.java.Utils.ChecksumHashCode; -import org.apache.flink.api.java.utils.DataSetUtils; import org.apache.flink.graph.asm.AsmTestBase; +import org.apache.flink.graph.asm.dataset.ChecksumHashCode; +import org.apache.flink.graph.asm.dataset.ChecksumHashCode.Checksum; import org.apache.flink.graph.library.clustering.directed.TriangleListing.Result; import org.apache.flink.test.util.TestBaseUtils; import org.apache.flink.types.IntValue; @@ -76,7 +76,9 @@ extends AsmTestBase { .run(new TriangleListing() .setSortTriangleVertices(true)); - ChecksumHashCode checksum = DataSetUtils.checksumHashCode(tl); + Checksum checksum = new ChecksumHashCode>() + .run(tl) + .execute(); assertEquals(75049, checksum.getCount()); assertEquals(0x00000033111f11baL, checksum.getChecksum()); http://git-wip-us.apache.org/repos/asf/flink/blob/a7e5705c/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/clustering/undirected/LocalClusteringCoefficientTest.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/clustering/undirected/LocalClusteringCoefficientTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/clustering/undirected/LocalClusteringCoefficientTest.java index f5416fb..ba0834c 100644 --- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/clustering/undirected/LocalClusteringCoefficientTest.java +++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/clustering/undirected/LocalClusteringCoefficientTest.java @@ -20,9 +20,9 @@ package org.apache.flink.graph.library.clustering.undirected; import org.apache.commons.math3.util.CombinatoricsUtils; import org.apache.flink.api.java.DataSet; -import org.apache.flink.api.java.Utils.ChecksumHashCode; -import org.apache.flink.api.java.utils.DataSetUtils; import org.apache.flink.graph.asm.AsmTestBase; +import org.apache.flink.graph.asm.dataset.ChecksumHashCode; +import org.apache.flink.graph.asm.dataset.ChecksumHashCode.Checksum; import org.apache.flink.graph.library.clustering.undirected.LocalClusteringCoefficient.Result; import org.apache.flink.test.util.TestBaseUtils; import org.apache.flink.types.IntValue; @@ -76,8 +76,12 @@ extends AsmTestBase { @Test public void testRMatGraph() throws Exception { - ChecksumHashCode checksum = DataSetUtils.checksumHashCode(undirectedRMatGraph - .run(new LocalClusteringCoefficient())); + DataSet> cc = undirectedRMatGraph + .run(new LocalClusteringCoefficient()); + + Checksum checksum = new ChecksumHashCode>() + .run(cc) + .execute(); assertEquals(902, checksum.getCount()); assertEquals(0x000001cab2d3677bL, checksum.getChecksum()); http://git-wip-us.apache.org/repos/asf/flink/blob/a7e5705c/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/clustering/undirected/TriangleListingTest.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/clustering/undirected/TriangleListingTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/clustering/undirected/TriangleListingTest.java index 0d1ebd0..bc3914e 100644 --- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/clustering/undirected/TriangleListingTest.java +++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/clustering/undirected/TriangleListingTest.java @@ -20,10 +20,10 @@ package org.apache.flink.graph.library.clustering.undirected; import org.apache.commons.math3.util.CombinatoricsUtils; import org.apache.flink.api.java.DataSet; -import org.apache.flink.api.java.Utils.ChecksumHashCode; import org.apache.flink.api.java.tuple.Tuple3; -import org.apache.flink.api.java.utils.DataSetUtils; import org.apache.flink.graph.asm.AsmTestBase; +import org.apache.flink.graph.asm.dataset.ChecksumHashCode; +import org.apache.flink.graph.asm.dataset.ChecksumHashCode.Checksum; import org.apache.flink.test.util.TestBaseUtils; import org.apache.flink.types.IntValue; import org.apache.flink.types.LongValue; @@ -58,7 +58,9 @@ extends AsmTestBase { DataSet> tl = completeGraph .run(new TriangleListing()); - ChecksumHashCode checksum = DataSetUtils.checksumHashCode(tl); + Checksum checksum = new ChecksumHashCode>() + .run(tl) + .execute(); assertEquals(expectedCount, checksum.getCount()); } @@ -70,7 +72,9 @@ extends AsmTestBase { .run(new TriangleListing() .setSortTriangleVertices(true)); - ChecksumHashCode checksum = DataSetUtils.checksumHashCode(tl); + Checksum checksum = new ChecksumHashCode>() + .run(tl) + .execute(); assertEquals(75049, checksum.getCount()); assertEquals(0x00000001a5b500afL, checksum.getChecksum()); http://git-wip-us.apache.org/repos/asf/flink/blob/a7e5705c/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/link_analysis/HITSTest.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/link_analysis/HITSTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/link_analysis/HITSTest.java index 679bf4f..2e5cebe 100644 --- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/link_analysis/HITSTest.java +++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/link_analysis/HITSTest.java @@ -19,10 +19,10 @@ package org.apache.flink.graph.library.link_analysis; import org.apache.flink.api.java.DataSet; -import org.apache.flink.api.java.Utils.ChecksumHashCode; import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.api.java.utils.DataSetUtils; import org.apache.flink.graph.asm.AsmTestBase; +import org.apache.flink.graph.asm.dataset.ChecksumHashCode; +import org.apache.flink.graph.asm.dataset.ChecksumHashCode.Checksum; import org.apache.flink.graph.library.link_analysis.HITS.Result; import org.apache.flink.types.IntValue; import org.apache.flink.types.LongValue; @@ -79,8 +79,12 @@ extends AsmTestBase { @Test public void testWithRMatGraph() throws Exception { - ChecksumHashCode checksum = DataSetUtils.checksumHashCode(directedRMatGraph - .run(new HITS(0.000001))); + DataSet> hits = directedRMatGraph + .run(new HITS(0.000001)); + + Checksum checksum = new ChecksumHashCode>() + .run(hits) + .execute(); assertEquals(902, checksum.getCount()); assertEquals(0x000001cbba6dbcd0L, checksum.getChecksum()); http://git-wip-us.apache.org/repos/asf/flink/blob/a7e5705c/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/metric/ChecksumHashCodeTest.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/metric/ChecksumHashCodeTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/metric/ChecksumHashCodeTest.java index 73545fb..24f0c2d 100644 --- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/metric/ChecksumHashCodeTest.java +++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/metric/ChecksumHashCodeTest.java @@ -18,9 +18,9 @@ package org.apache.flink.graph.library.metric; -import org.apache.flink.api.java.Utils; import org.apache.flink.graph.Graph; import org.apache.flink.graph.asm.AsmTestBase; +import org.apache.flink.graph.asm.dataset.ChecksumHashCode.Checksum; import org.apache.flink.graph.test.TestGraphUtils; import org.junit.Test; @@ -36,7 +36,7 @@ extends AsmTestBase { TestGraphUtils.getLongLongEdgeData(env), env); - Utils.ChecksumHashCode checksum = graph + Checksum checksum = graph .run(new ChecksumHashCode()) .execute(); http://git-wip-us.apache.org/repos/asf/flink/blob/a7e5705c/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/similarity/JaccardIndexTest.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/similarity/JaccardIndexTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/similarity/JaccardIndexTest.java index 58c986d..9490459 100644 --- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/similarity/JaccardIndexTest.java +++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/similarity/JaccardIndexTest.java @@ -20,10 +20,10 @@ package org.apache.flink.graph.library.similarity; import org.apache.commons.math3.random.JDKRandomGenerator; import org.apache.flink.api.java.DataSet; -import org.apache.flink.api.java.Utils.ChecksumHashCode; -import org.apache.flink.api.java.utils.DataSetUtils; import org.apache.flink.graph.Graph; import org.apache.flink.graph.asm.AsmTestBase; +import org.apache.flink.graph.asm.dataset.ChecksumHashCode; +import org.apache.flink.graph.asm.dataset.ChecksumHashCode.Checksum; import org.apache.flink.graph.asm.simple.undirected.Simplify; import org.apache.flink.graph.generator.RMatGraph; import org.apache.flink.graph.generator.random.JDKRandomGeneratorFactory; @@ -129,7 +129,9 @@ extends AsmTestBase { .run(new JaccardIndex() .setGroupSize(4)); - ChecksumHashCode checksum = DataSetUtils.checksumHashCode(ji); + Checksum checksum = new ChecksumHashCode>() + .run(ji) + .execute(); assertEquals(13954, checksum.getCount()); assertEquals(0x00001b1a1f7a9d0bL, checksum.getChecksum());