incubator-giraph-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ach...@apache.org
Subject svn commit: r1224678 - in /incubator/giraph/trunk: ./ src/main/java/org/apache/giraph/examples/ src/main/java/org/apache/giraph/graph/ src/main/java/org/apache/giraph/utils/ src/test/java/org/apache/giraph/examples/
Date Mon, 26 Dec 2011 05:05:50 GMT
Author: aching
Date: Mon Dec 26 05:05:49 2011
New Revision: 1224678

URL: http://svn.apache.org/viewvc?rev=1224678&view=rev
Log:
GIRAPH-115: Port of the HCC algorithm for identifying all connected
components of a graph. (ssc via aching)


Added:
    incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/ConnectedComponentsVertex.java
    incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/IntIntNullIntTextInputFormat.java
    incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/MinimumIntCombiner.java
    incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/VertexWithComponentTextOutputFormat.java
    incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/IntIntNullIntVertex.java
    incubator/giraph/trunk/src/main/java/org/apache/giraph/utils/UnmodifiableIntArrayIterator.java
    incubator/giraph/trunk/src/test/java/org/apache/giraph/examples/ConnectedComponentsVertexTest.java
    incubator/giraph/trunk/src/test/java/org/apache/giraph/examples/MinimumIntCombinerTest.java
Modified:
    incubator/giraph/trunk/CHANGELOG
    incubator/giraph/trunk/src/main/java/org/apache/giraph/utils/InternalVertexRunner.java
    incubator/giraph/trunk/src/test/java/org/apache/giraph/examples/SimpleShortestPathVertexTest.java

Modified: incubator/giraph/trunk/CHANGELOG
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/CHANGELOG?rev=1224678&r1=1224677&r2=1224678&view=diff
==============================================================================
--- incubator/giraph/trunk/CHANGELOG (original)
+++ incubator/giraph/trunk/CHANGELOG Mon Dec 26 05:05:49 2011
@@ -2,6 +2,9 @@ Giraph Change Log
 
 Release 0.70.0 - unreleased
 
+  GIRAPH-115: Port of the HCC algorithm for identifying all connected
+  components of a graph. (ssc via aching)
+
   GIRAPH-112: Use elements() properly in LongDoubleFloatDoubleVertex.
   (aching)
 

Added: incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/ConnectedComponentsVertex.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/ConnectedComponentsVertex.java?rev=1224678&view=auto
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/ConnectedComponentsVertex.java
(added)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/ConnectedComponentsVertex.java
Mon Dec 26 05:05:49 2011
@@ -0,0 +1,96 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.giraph.examples;
+
+import org.apache.giraph.graph.IntIntNullIntVertex;
+import org.apache.hadoop.io.IntWritable;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+/**
+ * Implementation of the HCC algorithm that identifies connected components and assigns each
+ * vertex its "component identifier" (the smallest vertex id in the component)
+ *
+ * The idea behind the algorithm is very simple: propagate the smallest vertex id along the
+ * edges to all vertices of a connected component. The number of supersteps necessary is
+ * equal to the length of the maximum diameter of all components + 1
+ *
+ * The original Hadoop-based variant of this algorithm was proposed by Kang, Charalampos
+ * Tsourakakis and Faloutsos in "PEGASUS: Mining Peta-Scale Graphs", 2010
+ *
+ * http://www.cs.cmu.edu/~ukang/papers/PegasusKAIS.pdf
+ */
+public class ConnectedComponentsVertex extends IntIntNullIntVertex {
+
+    /**
+     * Propagates the smallest vertex id to all neighbors. Will always choose to halt and
only
+     * reactivate if a smaller id has been sent to it.
+     *
+     * @param messages
+     * @throws IOException
+     */
+    @Override
+    public void compute(Iterator<IntWritable> messages) throws IOException {
+
+        int currentComponent = getVertexValue().get();
+
+        // first superstep is special, because we can simply look at the neighbors
+        if (getSuperstep() == 0) {
+            for (Iterator<IntWritable> edges = iterator(); edges.hasNext();) {
+                int neighbor = edges.next().get();
+                if (neighbor < currentComponent) {
+                    currentComponent = neighbor;
+                }
+            }
+            // only need to send value if it is not the own id
+            if (currentComponent != getVertexValue().get()) {
+                setVertexValue(new IntWritable(currentComponent));
+                for (Iterator<IntWritable> edges = iterator();
+                        edges.hasNext();) {
+                    int neighbor = edges.next().get();
+                    if (neighbor > currentComponent) {
+                        sendMsg(new IntWritable(neighbor), getVertexValue());
+                    }
+                }
+            }
+
+            voteToHalt();
+            return;
+        }
+
+        boolean changed = false;
+        // did we get a smaller id ?
+        while (messages.hasNext()) {
+            int candidateComponent = messages.next().get();
+            if (candidateComponent < currentComponent) {
+                currentComponent = candidateComponent;
+                changed = true;
+            }
+        }
+
+        // propagate new component id to the neighbors
+        if (changed) {
+            setVertexValue(new IntWritable(currentComponent));
+            sendMsgToAllEdges(getVertexValue());
+        }
+        voteToHalt();
+    }
+
+}

Added: incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/IntIntNullIntTextInputFormat.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/IntIntNullIntTextInputFormat.java?rev=1224678&view=auto
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/IntIntNullIntTextInputFormat.java
(added)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/IntIntNullIntTextInputFormat.java
Mon Dec 26 05:05:49 2011
@@ -0,0 +1,97 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.giraph.examples;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.giraph.graph.BasicVertex;
+import org.apache.giraph.graph.BspUtils;
+import org.apache.giraph.graph.VertexReader;
+import org.apache.giraph.lib.TextVertexInputFormat;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.regex.Pattern;
+
+/**
+ * Simple text-based {@link org.apache.giraph.graph.VertexInputFormat} for unweighted
+ * graphs with int ids.
+ *
+ * Each line consists of: vertex neighbor1 neighbor2 ...
+ */
+public class IntIntNullIntTextInputFormat extends
+        TextVertexInputFormat<IntWritable, IntWritable, NullWritable,
+        IntWritable> {
+
+    @Override
+    public VertexReader<IntWritable, IntWritable, NullWritable, IntWritable>
+    createVertexReader(InputSplit split, TaskAttemptContext context)
+            throws IOException {
+        return new IntIntNullIntVertexReader(
+                textInputFormat.createRecordReader(split, context));
+    }
+
+    public static class IntIntNullIntVertexReader extends
+            TextVertexInputFormat.TextVertexReader<IntWritable, IntWritable,
+                    NullWritable, IntWritable> {
+
+        private static final Pattern SEPARATOR = Pattern.compile("[\t ]");
+
+        public IntIntNullIntVertexReader(RecordReader<LongWritable, Text>
+                lineReader) {
+            super(lineReader);
+        }
+
+        @Override
+        public BasicVertex<IntWritable, IntWritable, NullWritable, IntWritable>
+                getCurrentVertex() throws IOException, InterruptedException {
+            BasicVertex<IntWritable, IntWritable, NullWritable, IntWritable>
+                    vertex = BspUtils.<IntWritable, IntWritable, NullWritable,
+                    IntWritable>createVertex(getContext().getConfiguration());
+
+            String[] tokens = SEPARATOR.split(getRecordReader()
+                    .getCurrentValue().toString());
+            Map<IntWritable, NullWritable> edges =
+                    Maps.newHashMapWithExpectedSize(tokens.length - 1);
+            for (int n = 1; n < tokens.length; n++) {
+                edges.put(new IntWritable(Integer.parseInt(tokens[n])),
+                        NullWritable.get());
+            }
+
+            IntWritable vertexId = new IntWritable(Integer.parseInt(tokens[0]));
+            vertex.initialize(vertexId, vertexId, edges,
+                    Lists.<IntWritable>newArrayList());
+
+            return vertex;
+        }
+
+        @Override
+        public boolean nextVertex() throws IOException, InterruptedException {
+            return getRecordReader().nextKeyValue();
+        }
+    }
+
+}

Added: incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/MinimumIntCombiner.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/MinimumIntCombiner.java?rev=1224678&view=auto
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/MinimumIntCombiner.java
(added)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/MinimumIntCombiner.java
Mon Dec 26 05:05:49 2011
@@ -0,0 +1,44 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.giraph.examples;
+
+import org.apache.giraph.graph.VertexCombiner;
+import org.apache.hadoop.io.IntWritable;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * {@link VertexCombiner} that finds the minimum {@link IntWritable}
+ */
+public class MinimumIntCombiner
+        extends VertexCombiner<IntWritable, IntWritable> {
+
+    @Override
+    public IntWritable combine(IntWritable target,
+            List<IntWritable> messages) throws IOException {
+        int minimum = Integer.MAX_VALUE;
+        for (IntWritable message : messages) {
+            if (message.get() < minimum) {
+                minimum = message.get();
+            }
+        }
+        return new IntWritable(minimum);
+    }
+}

Added: incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/VertexWithComponentTextOutputFormat.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/VertexWithComponentTextOutputFormat.java?rev=1224678&view=auto
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/VertexWithComponentTextOutputFormat.java
(added)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/VertexWithComponentTextOutputFormat.java
Mon Dec 26 05:05:49 2011
@@ -0,0 +1,71 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.giraph.examples;
+
+import org.apache.giraph.graph.BasicVertex;
+import org.apache.giraph.graph.VertexWriter;
+import org.apache.giraph.lib.TextVertexOutputFormat;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import java.io.IOException;
+
+/**
+ * Text-based {@link org.apache.giraph.graph.VertexOutputFormat} for usage with
+ * {@link ConnectedComponentsVertex}
+ *
+ * Each line consists of a vertex and its associated component (represented by the smallest
+ * vertex id in the component)
+ */
+public class VertexWithComponentTextOutputFormat extends
+        TextVertexOutputFormat<IntWritable, IntWritable, NullWritable> {
+
+    @Override
+    public VertexWriter<IntWritable, IntWritable, NullWritable>
+            createVertexWriter(TaskAttemptContext context)
+            throws IOException, InterruptedException {
+        RecordWriter<Text, Text> recordWriter =
+                textOutputFormat.getRecordWriter(context);
+        return new VertexWithComponentWriter(recordWriter);
+    }
+
+    public static class VertexWithComponentWriter extends
+            TextVertexOutputFormat.TextVertexWriter<IntWritable, IntWritable,
+            NullWritable> {
+
+        public VertexWithComponentWriter(RecordWriter<Text, Text> writer) {
+            super(writer);
+        }
+
+        @Override
+        public void writeVertex(BasicVertex<IntWritable, IntWritable,
+                NullWritable,?> vertex) throws IOException,
+                InterruptedException {
+            StringBuilder output = new StringBuilder();
+            output.append(vertex.getVertexId().get());
+            output.append('\t');
+            output.append(vertex.getVertexValue().get());
+            getRecordWriter().write(new Text(output.toString()), null);
+        }
+
+    }
+}
\ No newline at end of file

Added: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/IntIntNullIntVertex.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/IntIntNullIntVertex.java?rev=1224678&view=auto
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/IntIntNullIntVertex.java
(added)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/IntIntNullIntVertex.java
Mon Dec 26 05:05:49 2011
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.graph;
+
+import com.google.common.collect.Iterables;
+import org.apache.giraph.utils.UnmodifiableIntArrayIterator;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.NullWritable;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Simple implementation of {@link BasicVertex} using an int as id, value and message.
+ * Edges are immutable and unweighted. This class aims to be as memory efficient as possible.
+ */
+public abstract class IntIntNullIntVertex extends
+        BasicVertex<IntWritable, IntWritable, NullWritable,IntWritable> {
+
+    private int id;
+    private int value;
+
+    private int[] neighbors;
+    private int[] messages;
+
+    @Override
+    public void initialize(IntWritable vertexId, IntWritable vertexValue,
+            Map<IntWritable, NullWritable> edges, List<IntWritable> messages)
{
+        id = vertexId.get();
+        value = vertexValue.get();
+        this.neighbors = new int[edges.size()];
+        int n = 0;
+        for (IntWritable neighbor : edges.keySet()) {
+            this.neighbors[n++] = neighbor.get();
+        }
+        this.messages = new int[messages.size()];
+        n = 0;
+        for (IntWritable message : messages) {
+            this.messages[n++] = message.get();
+        }
+    }
+
+    @Override
+    public IntWritable getVertexId() {
+        return new IntWritable(id);
+    }
+
+    @Override
+    public IntWritable getVertexValue() {
+        return new IntWritable(value);
+    }
+
+    @Override
+    public void setVertexValue(IntWritable vertexValue) {
+        value = vertexValue.get();
+    }
+
+    @Override
+    public Iterator<IntWritable> iterator() {
+        return new UnmodifiableIntArrayIterator(neighbors);
+    }
+
+    @Override
+    public NullWritable getEdgeValue(IntWritable targetVertexId) {
+        return NullWritable.get();
+    }
+
+    @Override
+    public boolean hasEdge(IntWritable targetVertexId) {
+        for (int neighbor : neighbors) {
+            if (neighbor == targetVertexId.get()) {
+                return true;
+            }
+        }
+        return false;
+    }
+
+    @Override
+    public int getNumOutEdges() {
+        return neighbors.length;
+    }
+
+    @Override
+    public void sendMsgToAllEdges(final IntWritable message) {
+        for (int neighbor : neighbors) {
+            sendMsg(new IntWritable(neighbor), message);
+        }
+    }
+
+    @Override
+    public Iterable<IntWritable> getMessages() {
+        return new Iterable<IntWritable>() {
+            @Override
+            public Iterator<IntWritable> iterator() {
+                return new UnmodifiableIntArrayIterator(messages);
+            }
+        };
+    }
+
+    @Override
+    public void setMessages(Iterable<IntWritable> newMessages) {
+        messages = new int[Iterables.size(newMessages)];
+        int n = 0;
+        for (IntWritable message : newMessages) {
+            messages[n++] = message.get();
+        }
+    }
+
+    @Override
+    void releaseResources() {
+        messages = new int[0];
+    }
+
+    @Override
+    public void write(final DataOutput out) throws IOException {
+        out.writeInt(id);
+        out.writeInt(value);
+        out.writeInt(neighbors.length);
+        for (int n = 0; n < neighbors.length; n++) {
+            out.writeInt(neighbors[n]);
+        }
+        out.writeInt(messages.length);
+        for (int n = 0; n < messages.length; n++) {
+            out.writeInt(messages[n]);
+        }
+    }
+
+    @Override
+    public void readFields(DataInput in) throws IOException {
+        id = in.readInt();
+        value = in.readInt();
+        int numEdges = in.readInt();
+        neighbors = new int[numEdges];
+        for (int n = 0; n < numEdges; n++) {
+            neighbors[n] = in.readInt();
+        }
+        int numMessages = in.readInt();
+        messages = new int[numMessages];
+        for (int n = 0; n < numMessages; n++) {
+            messages[n] = in.readInt();
+        }
+    }
+
+}

Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/utils/InternalVertexRunner.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/utils/InternalVertexRunner.java?rev=1224678&r1=1224677&r2=1224678&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/utils/InternalVertexRunner.java
(original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/utils/InternalVertexRunner.java
Mon Dec 26 05:05:49 2011
@@ -40,13 +40,13 @@ import java.util.concurrent.ExecutorServ
 import java.util.concurrent.Executors;
 
 /**
- * a base class for running internal tests on a vertex
+ * A base class for running internal tests on a vertex
  *
  * Extending classes only have to invoke the run() method to test their vertex. All data
  * is written to a local tmp directory that is removed afterwards. A local zookeeper
  * instance is started in an extra thread and shutdown at the end.
  *
- * heavily inspired from Apache Mahout's MahoutTestCase
+ * Heavily inspired from Apache Mahout's MahoutTestCase
  */
 public class InternalVertexRunner {
 
@@ -69,24 +69,49 @@ public class InternalVertexRunner {
      */
     public static Iterable<String> run(Class<?> vertexClass,
             Class<?> vertexInputFormatClass, Class<?> vertexOutputFormatClass,
-            Map<String,String> params, String... data) throws Exception {
+            Map<String, String> params, String... data) throws Exception {
+        return run(vertexClass, null, vertexInputFormatClass,
+                vertexOutputFormatClass, params, data);
+    }
+    
+    /**
+     *  Attempts to run the vertex internally in the current JVM, reading from and writing
to a
+     *  temporary folder on local disk. Will start an own zookeeper instance.
+     *
+     * @param vertexClass the vertex class to instantiate
+     * @param vertexCombinerClass the vertex combiner to use (or null)
+     * @param vertexInputFormatClass the inputformat to use
+     * @param vertexOutputFormatClass the outputformat to use
+     * @param params a map of parameters to add to the hadoop configuration
+     * @param data linewise input data
+     * @return linewise output data
+     * @throws Exception
+     */
+    public static Iterable<String> run(Class<?> vertexClass,
+            Class<?> vertexCombinerClass, Class<?> vertexInputFormatClass, 
+            Class<?> vertexOutputFormatClass, Map<String, String> params,
+            String... data) throws Exception {
 
         File tmpDir = null;
         try {
-            /* prepare input file, output folder and zookeeper folder */
+            // prepare input file, output folder and zookeeper folder
             tmpDir = createTestDir(vertexClass);
             File inputFile = createTempFile(tmpDir, "graph.txt");
             File outputDir = createTempDir(tmpDir, "output");
             File zkDir = createTempDir(tmpDir, "zooKeeper");
 
-            /* write input data to disk */
+            // write input data to disk
             writeLines(inputFile, data);
 
-            /* create and configure the job to run the vertex */
+            // create and configure the job to run the vertex
             GiraphJob job = new GiraphJob(vertexClass.getName());
             job.setVertexClass(vertexClass);
             job.setVertexInputFormatClass(vertexInputFormatClass);
             job.setVertexOutputFormatClass(vertexOutputFormatClass);
+            
+            if (vertexCombinerClass != null) {
+                job.setVertexCombinerClass(vertexCombinerClass);
+            }
 
             job.setWorkerConfiguration(1, 1, 100.0f);
             Configuration conf = job.getConfiguration();
@@ -102,7 +127,7 @@ public class InternalVertexRunner {
             FileInputFormat.addInputPath(job, new Path(inputFile.toString()));
             FileOutputFormat.setOutputPath(job, new Path(outputDir.toString()));
 
-            /* configure a local zookeeper instance */
+            // configure a local zookeeper instance
             Properties zkProperties = new Properties();
             zkProperties.setProperty("tickTime", "2000");
             zkProperties.setProperty("dataDir", zkDir.getAbsolutePath());
@@ -118,8 +143,8 @@ public class InternalVertexRunner {
             QuorumPeerConfig qpConfig = new QuorumPeerConfig();
             qpConfig.parseProperties(zkProperties);
 
-            /* create and run the zookeeper instance */
-            final ZooKeeperServerMain zookeeper = new ZooKeeperServerMain();
+            // create and run the zookeeper instance
+            final InternalZooKeeper zookeeper = new InternalZooKeeper();
             final ServerConfig zkConfig = new ServerConfig();
             zkConfig.readFrom(qpConfig);
 
@@ -138,6 +163,7 @@ public class InternalVertexRunner {
                 job.run(true);
             } finally {
                 executorService.shutdown();
+                zookeeper.end();
             }
 
             return Files.readLines(new File(outputDir, "part-m-00000"),
@@ -149,7 +175,9 @@ public class InternalVertexRunner {
         }
     }
 
-    /* create a temporary folder that will be removed after the test */
+    /**
+     *  Create a temporary folder that will be removed after the test
+     */
     private static final File createTestDir(Class<?> vertexClass)
             throws IOException {
         String systemTmpDir = System.getProperty("java.io.tmpdir");
@@ -209,4 +237,13 @@ public class InternalVertexRunner {
         }
     }
 
+    /**
+     * Extension of {@link ZooKeeperServerMain} that allows programmatic shutdown
+     */
+    private static class InternalZooKeeper extends ZooKeeperServerMain {
+        void end() {
+            shutdown();
+        }
+    }
+
 }

Added: incubator/giraph/trunk/src/main/java/org/apache/giraph/utils/UnmodifiableIntArrayIterator.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/utils/UnmodifiableIntArrayIterator.java?rev=1224678&view=auto
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/utils/UnmodifiableIntArrayIterator.java
(added)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/utils/UnmodifiableIntArrayIterator.java
Mon Dec 26 05:05:49 2011
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.utils;
+
+import com.google.common.collect.UnmodifiableIterator;
+import org.apache.hadoop.io.IntWritable;
+
+/**
+ * {@link UnmodifiableIterator} over a primitive int array
+ */
+public class UnmodifiableIntArrayIterator
+        extends UnmodifiableIterator<IntWritable> {
+
+    private final int[] arr;
+    private int offset;
+
+    public UnmodifiableIntArrayIterator(int[] arr) {
+        this.arr = arr;
+        offset = 0;
+    }
+
+    @Override
+    public boolean hasNext() {
+        return offset < arr.length;
+    }
+
+    @Override
+    public IntWritable next() {
+        return new IntWritable(arr[offset++]);
+    }
+}
\ No newline at end of file

Added: incubator/giraph/trunk/src/test/java/org/apache/giraph/examples/ConnectedComponentsVertexTest.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/test/java/org/apache/giraph/examples/ConnectedComponentsVertexTest.java?rev=1224678&view=auto
==============================================================================
--- incubator/giraph/trunk/src/test/java/org/apache/giraph/examples/ConnectedComponentsVertexTest.java
(added)
+++ incubator/giraph/trunk/src/test/java/org/apache/giraph/examples/ConnectedComponentsVertexTest.java
Mon Dec 26 05:05:49 2011
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.examples;
+
+import com.google.common.base.Splitter;
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Maps;
+import com.google.common.collect.SetMultimap;
+import junit.framework.TestCase;
+
+import org.apache.giraph.utils.InternalVertexRunner;
+
+import java.util.Set;
+
+/**
+ *  Tests for {@link ConnectedComponentsVertex}
+ */
+public class ConnectedComponentsVertexTest extends TestCase {
+
+    /**
+     * A local integration test on toy data
+     */
+    public void testToyData() throws Exception {
+
+        // a small graph with three components
+        String[] graph = new String[] {
+                "1 2 3",
+                "2 1 4 5",
+                "3 1 4",
+                "4 2 3 5 13",
+                "5 2 4 12 13",
+                "12 5 13",
+                "13 4 5 12",
+
+                "6 7 8",
+                "7 6 10 11",
+                "8 6 10",
+                "10 7 8 11",
+                "11 7 10",
+
+                "9" };
+
+        // run internally
+        Iterable<String> results = InternalVertexRunner.run(
+                ConnectedComponentsVertex.class,
+                MinimumIntCombiner.class,
+                IntIntNullIntTextInputFormat.class,
+                VertexWithComponentTextOutputFormat.class,
+                Maps.<String,String>newHashMap(), graph);
+
+        SetMultimap<Integer,Integer> components = parseResults(results);
+
+        Set<Integer> componentIDs = components.keySet();
+        assertEquals(3, componentIDs.size());
+        assertTrue(componentIDs.contains(1));
+        assertTrue(componentIDs.contains(6));
+        assertTrue(componentIDs.contains(9));
+
+        Set<Integer> componentOne = components.get(1);
+        assertEquals(7, componentOne.size());
+        assertTrue(componentOne.contains(1));
+        assertTrue(componentOne.contains(2));
+        assertTrue(componentOne.contains(3));
+        assertTrue(componentOne.contains(4));
+        assertTrue(componentOne.contains(5));
+        assertTrue(componentOne.contains(12));
+        assertTrue(componentOne.contains(13));
+
+        Set<Integer> componentTwo = components.get(6);
+        assertEquals(5, componentTwo.size());
+        assertTrue(componentTwo.contains(6));
+        assertTrue(componentTwo.contains(7));
+        assertTrue(componentTwo.contains(8));
+        assertTrue(componentTwo.contains(10));
+        assertTrue(componentTwo.contains(11));
+
+        Set<Integer> componentThree = components.get(9);
+        assertEquals(1, componentThree.size());
+        assertTrue(componentThree.contains(9));
+    }
+
+    private SetMultimap<Integer,Integer> parseResults(
+            Iterable<String> results) {
+        SetMultimap<Integer,Integer> components = HashMultimap.create();
+        for (String result : results) {
+            Iterable<String> parts = Splitter.on('\t').split(result);
+            int vertex = Integer.parseInt(Iterables.get(parts, 0));
+            int component = Integer.parseInt(Iterables.get(parts, 1));
+            components.put(component, vertex);
+        }
+        return components;
+    }
+}

Added: incubator/giraph/trunk/src/test/java/org/apache/giraph/examples/MinimumIntCombinerTest.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/test/java/org/apache/giraph/examples/MinimumIntCombinerTest.java?rev=1224678&view=auto
==============================================================================
--- incubator/giraph/trunk/src/test/java/org/apache/giraph/examples/MinimumIntCombinerTest.java
(added)
+++ incubator/giraph/trunk/src/test/java/org/apache/giraph/examples/MinimumIntCombinerTest.java
Mon Dec 26 05:05:49 2011
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.examples;
+
+import junit.framework.TestCase;
+import org.apache.giraph.graph.VertexCombiner;
+import org.apache.hadoop.io.IntWritable;
+
+import java.util.Arrays;
+
+public class MinimumIntCombinerTest extends TestCase {
+
+    public void testCombiner() throws Exception {
+
+        VertexCombiner<IntWritable, IntWritable> combiner =
+                new MinimumIntCombiner();
+
+        IntWritable result = combiner.combine(new IntWritable(1), Arrays.asList(
+                new IntWritable(39947466), new IntWritable(199),
+                new IntWritable(19998888), new IntWritable(42)));
+
+        assertEquals(42, result.get());
+    }
+}

Modified: incubator/giraph/trunk/src/test/java/org/apache/giraph/examples/SimpleShortestPathVertexTest.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/test/java/org/apache/giraph/examples/SimpleShortestPathVertexTest.java?rev=1224678&r1=1224677&r2=1224678&view=diff
==============================================================================
--- incubator/giraph/trunk/src/test/java/org/apache/giraph/examples/SimpleShortestPathVertexTest.java
(original)
+++ incubator/giraph/trunk/src/test/java/org/apache/giraph/examples/SimpleShortestPathVertexTest.java
Mon Dec 26 05:05:49 2011
@@ -33,10 +33,14 @@ import org.mockito.Mockito;
 
 import java.util.Map;
 
-/** contains a simple unit test for {@link SimpleShortestPathsVertex} */
+/**
+ * Contains a simple unit test for {@link SimpleShortestPathsVertex}
+ */
 public class SimpleShortestPathVertexTest extends TestCase {
 
-    /** test the behavior when a shorter path to a vertex has been found */
+    /**
+     * Test the behavior when a shorter path to a vertex has been found
+     */
     public void testOnShorterPathFound() throws Exception {
 
         SimpleShortestPathsVertex vertex = new SimpleShortestPathsVertex();
@@ -62,7 +66,9 @@ public class SimpleShortestPathVertexTes
         env.verifyMessageSent(new LongWritable(20L), new DoubleWritable(2));
     }
 
-    /** test the behavior when a new, but not shorter path to a vertex has been found */
+    /**
+     * Test the behavior when a new, but not shorter path to a vertex has been found
+     */
     public void testOnNoShorterPathFound() throws Exception {
 
         SimpleShortestPathsVertex vertex = new SimpleShortestPathsVertex();
@@ -86,21 +92,23 @@ public class SimpleShortestPathVertexTes
         env.verifyNoMessageSent();
     }
 
-    /** a local integration test on toy data */
+    /**
+     * A local integration test on toy data
+     */
     public void testToyData() throws Exception {
 
-        /* a small four vertex graph */
+        // a small four vertex graph
         String[] graph = new String[] {
                 "[1,0,[[2,1],[3,3]]]",
                 "[2,0,[[3,1],[4,10]]]",
                 "[3,0,[[4,2]]]",
                 "[4,0,[]]" };
 
-        /* start from vertex 1 */
-        Map<String,String> params = Maps.newHashMap();
+        // start from vertex 1
+        Map<String, String> params = Maps.newHashMap();
         params.put(SimpleShortestPathsVertex.SOURCE_ID, "1");
 
-        /* run internally */
+        // run internally
         Iterable<String> results = InternalVertexRunner.run(
                 SimpleShortestPathsVertex.class,
                 SimpleShortestPathsVertex.
@@ -111,7 +119,7 @@ public class SimpleShortestPathVertexTes
 
         Map<Long, Double> distances = parseDistances(results);
 
-        /* verify results */
+        // verify results
         assertNotNull(distances);
         assertEquals(4, distances.size());
         assertEquals(0.0, distances.get(1L));
@@ -120,8 +128,8 @@ public class SimpleShortestPathVertexTes
         assertEquals(4.0, distances.get(4L));
     }
 
-    private Map<Long,Double> parseDistances(Iterable<String> results) {
-        Map<Long,Double> distances =
+    private Map<Long, Double> parseDistances(Iterable<String> results) {
+        Map<Long, Double> distances =
                 Maps.newHashMapWithExpectedSize(Iterables.size(results));
         for (String line : results) {
             try {



Mime
View raw message