Return-Path: X-Original-To: apmail-flink-commits-archive@minotaur.apache.org Delivered-To: apmail-flink-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 62FC3119FF for ; Wed, 18 Jun 2014 16:42:16 +0000 (UTC) Received: (qmail 26108 invoked by uid 500); 18 Jun 2014 16:42:16 -0000 Delivered-To: apmail-flink-commits-archive@flink.apache.org Received: (qmail 26092 invoked by uid 500); 18 Jun 2014 16:42:16 -0000 Mailing-List: contact commits-help@flink.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.incubator.apache.org Delivered-To: mailing list commits@flink.incubator.apache.org Received: (qmail 26081 invoked by uid 99); 18 Jun 2014 16:42:16 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 18 Jun 2014 16:42:16 +0000 X-ASF-Spam-Status: No, hits=-2000.7 required=5.0 tests=ALL_TRUSTED,RP_MATCHES_RCVD X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO mail.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with SMTP; Wed, 18 Jun 2014 16:42:15 +0000 Received: (qmail 25086 invoked by uid 99); 18 Jun 2014 16:41:50 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 18 Jun 2014 16:41:50 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 0C20B83BDBE; Wed, 18 Jun 2014 16:41:50 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sewen@apache.org To: commits@flink.incubator.apache.org Date: Wed, 18 Jun 2014 16:41:50 -0000 Message-Id: <831f0427fc5948f0b7ef7ef82d406c8f@git.apache.org> In-Reply-To: <207a1054e8d84dfab7ecee7e58fed6e7@git.apache.org> References: <207a1054e8d84dfab7ecee7e58fed6e7@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [2/2] git commit: [FLINK-940] Added additional test for cogroup with delta iteration solution set on second input. X-Virus-Checked: Checked by ClamAV on apache.org [FLINK-940] Added additional test for cogroup with delta iteration solution set on second input. Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/7c4c4a9d Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/7c4c4a9d Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/7c4c4a9d Branch: refs/heads/master Commit: 7c4c4a9d2fbc7b2ffdbd4668a36c21d503c672b2 Parents: 1a1a13e Author: Stephan Ewen Authored: Wed Jun 18 01:28:34 2014 +0200 Committer: Stephan Ewen Committed: Wed Jun 18 18:29:11 2014 +0200 ---------------------------------------------------------------------- .../test/testdata/ConnectedComponentsData.java | 19 +++ .../CoGroupConnectedComponentsSecondITCase.java | 139 +++++++++++++++++++ 2 files changed, 158 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/7c4c4a9d/stratosphere-test-utils/src/main/java/eu/stratosphere/test/testdata/ConnectedComponentsData.java ---------------------------------------------------------------------- diff --git a/stratosphere-test-utils/src/main/java/eu/stratosphere/test/testdata/ConnectedComponentsData.java b/stratosphere-test-utils/src/main/java/eu/stratosphere/test/testdata/ConnectedComponentsData.java index 76c7099..5bb8f9e 100644 --- a/stratosphere-test-utils/src/main/java/eu/stratosphere/test/testdata/ConnectedComponentsData.java +++ b/stratosphere-test-utils/src/main/java/eu/stratosphere/test/testdata/ConnectedComponentsData.java @@ -16,11 +16,14 @@ package eu.stratosphere.test.testdata; import java.io.BufferedReader; import java.io.IOException; +import java.util.List; import java.util.Random; import java.util.regex.Pattern; import org.junit.Assert; +import eu.stratosphere.api.java.tuple.Tuple2; + public class ConnectedComponentsData { @@ -106,6 +109,22 @@ public class ConnectedComponentsData { } } } + + public static void checkOddEvenResult(List> lines) throws IOException { + for (Tuple2 line : lines) { + try { + long vertex = line.f0; + long component = line.f1; + long should = vertex % 2; + if (should == 0) { + should = 2; + } + Assert.assertEquals("Vertex is in wrong component.", should, component); + } catch (NumberFormatException e) { + Assert.fail("Malformed result."); + } + } + } private ConnectedComponentsData() {} } http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/7c4c4a9d/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/CoGroupConnectedComponentsSecondITCase.java ---------------------------------------------------------------------- diff --git a/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/CoGroupConnectedComponentsSecondITCase.java b/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/CoGroupConnectedComponentsSecondITCase.java new file mode 100644 index 0000000..1a0f443 --- /dev/null +++ b/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/CoGroupConnectedComponentsSecondITCase.java @@ -0,0 +1,139 @@ +/*********************************************************************************************************************** + * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu) + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + **********************************************************************************************************************/ + +package eu.stratosphere.test.iterative; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; + +import eu.stratosphere.api.java.DataSet; +import eu.stratosphere.api.java.DeltaIteration; +import eu.stratosphere.api.java.ExecutionEnvironment; +import eu.stratosphere.api.java.functions.CoGroupFunction; +import eu.stratosphere.api.java.functions.FlatMapFunction; +import eu.stratosphere.api.java.functions.FunctionAnnotation.ConstantFieldsFirst; +import eu.stratosphere.api.java.functions.FunctionAnnotation.ConstantFieldsSecond; +import eu.stratosphere.api.java.functions.MapFunction; +import eu.stratosphere.api.java.io.LocalCollectionOutputFormat; +import eu.stratosphere.api.java.tuple.Tuple2; +import eu.stratosphere.example.java.graph.ConnectedComponents.DuplicateValue; +import eu.stratosphere.example.java.graph.ConnectedComponents.NeighborWithComponentIDJoin; +import eu.stratosphere.test.testdata.ConnectedComponentsData; +import eu.stratosphere.test.util.JavaProgramTestBase; +import eu.stratosphere.util.Collector; +import eu.stratosphere.util.LogUtils; + + +@SuppressWarnings("serial") +public class CoGroupConnectedComponentsSecondITCase extends JavaProgramTestBase { + + private static final long SEED = 0xBADC0FFEEBEEFL; + + private static final int NUM_VERTICES = 1000; + + private static final int NUM_EDGES = 10000; + + + @Override + protected void testProgram() throws Exception { + + // set up execution environment + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + LogUtils.initializeDefaultConsoleLogger(); + + // read vertex and edge data + DataSet vertices = env.fromElements(ConnectedComponentsData.getEnumeratingVertices(NUM_VERTICES).split("\n")) + .map(new VertexParser()); + + DataSet> edges = env.fromElements(ConnectedComponentsData.getRandomOddEvenEdges(NUM_EDGES, NUM_VERTICES, SEED).split("\n")) + .flatMap(new EdgeParser()); + + // assign the initial components (equal to the vertex id) + DataSet> verticesWithInitialId = vertices.map(new DuplicateValue()); + + // open a delta iteration + DeltaIteration, Tuple2> iteration = + verticesWithInitialId.iterateDelta(verticesWithInitialId, 100, 0); + + // apply the step logic: join with the edges, select the minimum neighbor, update if the component of the candidate is smaller + DataSet> changes = iteration + .getWorkset().join(edges).where(0).equalTo(0).with(new NeighborWithComponentIDJoin()) + .coGroup(iteration.getSolutionSet()).where(0).equalTo(0) + .with(new MinIdAndUpdate()); + + // close the delta iteration (delta and new workset are identical) + DataSet> result = iteration.closeWith(changes, changes); + + + // emit result + List> resutTuples = new ArrayList>(); + result.output(new LocalCollectionOutputFormat>(resutTuples)); + + env.execute(); + } + + // -------------------------------------------------------------------------------------------- + // The test program + // -------------------------------------------------------------------------------------------- + + public static final class VertexParser extends MapFunction { + + @Override + public Long map(String value) throws Exception { + return Long.parseLong(value); + } + } + + public static final class EdgeParser extends FlatMapFunction> { + + @Override + public void flatMap(String value, Collector> out) throws Exception { + String[] parts = value.split(" "); + long v1 = Long.parseLong(parts[0]); + long v2 = Long.parseLong(parts[1]); + + out.collect(new Tuple2(v1, v2)); + out.collect(new Tuple2(v2, v1)); + } + } + + @ConstantFieldsFirst("0") + @ConstantFieldsSecond("0") + public static final class MinIdAndUpdate extends CoGroupFunction, Tuple2, Tuple2> { + + @Override + public void coGroup(Iterator> candidates, Iterator> current, Collector> out) { + if (!current.hasNext()) { + throw new RuntimeException("Error: Id not encountered before."); + } + + Tuple2 old = current.next(); + + long minimumComponentID = Long.MAX_VALUE; + + while (candidates.hasNext()) { + long candidateComponentID = candidates.next().f1; + if (candidateComponentID < minimumComponentID) { + minimumComponentID = candidateComponentID; + } + } + + if (minimumComponentID < old.f1) { + old.f1 = minimumComponentID; + out.collect(old); + } + } + } +}