Return-Path: X-Original-To: apmail-giraph-commits-archive@www.apache.org Delivered-To: apmail-giraph-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 528F2DBA7 for ; Tue, 24 Jul 2012 23:38:38 +0000 (UTC) Received: (qmail 6250 invoked by uid 500); 24 Jul 2012 23:38:38 -0000 Delivered-To: apmail-giraph-commits-archive@giraph.apache.org Received: (qmail 6219 invoked by uid 500); 24 Jul 2012 23:38:38 -0000 Mailing-List: contact commits-help@giraph.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@giraph.apache.org Delivered-To: mailing list commits@giraph.apache.org Received: (qmail 6212 invoked by uid 99); 24 Jul 2012 23:38:38 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 24 Jul 2012 23:38:38 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 24 Jul 2012 23:38:34 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 8B52323889C5; Tue, 24 Jul 2012 23:37:50 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1365352 [2/4] - in /giraph/trunk: ./ giraph-formats-contrib/src/test/java/org/apache/giraph/format/accumulo/ giraph-formats-contrib/src/test/java/org/apache/giraph/format/accumulo/edgemarker/ giraph-formats-contrib/src/test/java/org/apache... Date: Tue, 24 Jul 2012 23:37:45 -0000 To: commits@giraph.apache.org From: aching@apache.org X-Mailer: svnmailer-1.0.8-patched Message-Id: <20120724233750.8B52323889C5@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Modified: giraph/trunk/src/main/java/org/apache/giraph/examples/IdentityVertex.java URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/examples/IdentityVertex.java?rev=1365352&r1=1365351&r2=1365352&view=diff ============================================================================== --- giraph/trunk/src/main/java/org/apache/giraph/examples/IdentityVertex.java (original) +++ giraph/trunk/src/main/java/org/apache/giraph/examples/IdentityVertex.java Tue Jul 24 23:37:42 2012 @@ -18,11 +18,9 @@ package org.apache.giraph.examples; +import org.apache.giraph.graph.EdgeListVertex; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; -import org.apache.giraph.graph.EdgeListVertex; - -import java.util.Iterator; /** * User applications can subclass IdentityVertex, which @@ -41,7 +39,7 @@ public abstract class IdentityVertex { @Override - public void compute(Iterator msgIterator) { + public void compute(Iterable messages) { voteToHalt(); } } Modified: giraph/trunk/src/main/java/org/apache/giraph/examples/IntIntNullIntTextInputFormat.java URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/examples/IntIntNullIntTextInputFormat.java?rev=1365352&r1=1365351&r2=1365352&view=diff ============================================================================== --- giraph/trunk/src/main/java/org/apache/giraph/examples/IntIntNullIntTextInputFormat.java (original) +++ giraph/trunk/src/main/java/org/apache/giraph/examples/IntIntNullIntTextInputFormat.java Tue Jul 24 23:37:42 2012 @@ -18,10 +18,8 @@ package org.apache.giraph.examples; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; -import org.apache.giraph.graph.BasicVertex; import org.apache.giraph.graph.BspUtils; +import org.apache.giraph.graph.Vertex; import org.apache.giraph.graph.VertexReader; import org.apache.giraph.lib.TextVertexInputFormat; import org.apache.hadoop.io.IntWritable; @@ -32,6 +30,9 @@ import org.apache.hadoop.mapreduce.Input import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; + import java.io.IOException; import java.util.Map; import java.util.regex.Pattern; @@ -74,9 +75,9 @@ public class IntIntNullIntTextInputForma } @Override - public BasicVertex + public Vertex getCurrentVertex() throws IOException, InterruptedException { - BasicVertex + Vertex vertex = BspUtils.createVertex(getContext().getConfiguration()); Added: giraph/trunk/src/main/java/org/apache/giraph/examples/MinimumDoubleCombiner.java URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/examples/MinimumDoubleCombiner.java?rev=1365352&view=auto ============================================================================== --- giraph/trunk/src/main/java/org/apache/giraph/examples/MinimumDoubleCombiner.java (added) +++ giraph/trunk/src/main/java/org/apache/giraph/examples/MinimumDoubleCombiner.java Tue Jul 24 23:37:42 2012 @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.examples; + +import org.apache.giraph.graph.VertexCombiner; +import org.apache.hadoop.io.DoubleWritable; +import org.apache.hadoop.io.LongWritable; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +/** + * {@link VertexCombiner} that finds the minimum {@link DoubleWritable} + */ +public class MinimumDoubleCombiner extends + VertexCombiner { + @Override + public Iterable combine( + LongWritable target, + Iterable messages) throws IOException { + double minimum = Double.MAX_VALUE; + for (DoubleWritable message : messages) { + if (message.get() < minimum) { + minimum = message.get(); + } + } + List value = new ArrayList(); + value.add(new DoubleWritable(minimum)); + + return value; + } +} Modified: giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleCheckpointVertex.java URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleCheckpointVertex.java?rev=1365352&r1=1365351&r2=1365352&view=diff ============================================================================== --- giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleCheckpointVertex.java (original) +++ giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleCheckpointVertex.java Tue Jul 24 23:37:42 2012 @@ -24,6 +24,7 @@ import org.apache.commons.cli.HelpFormat import org.apache.commons.cli.Options; import org.apache.commons.cli.PosixParser; import org.apache.giraph.aggregators.LongSumAggregator; +import org.apache.giraph.graph.Edge; import org.apache.giraph.graph.EdgeListVertex; import org.apache.giraph.graph.GiraphJob; import org.apache.giraph.graph.WorkerContext; @@ -37,8 +38,6 @@ import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import org.apache.log4j.Logger; -import java.util.Iterator; - /** * An example that simply uses its id, value, and edges to compute new data * every iteration to verify that checkpoint restarting works. Fault injection @@ -64,7 +63,7 @@ public class SimpleCheckpointVertex exte private Configuration conf; @Override - public void compute(Iterator msgIterator) { + public void compute(Iterable messages) { SimpleCheckpointVertexWorkerContext workerContext = (SimpleCheckpointVertexWorkerContext) getWorkerContext(); @@ -76,7 +75,7 @@ public class SimpleCheckpointVertex exte if (enableFault && (getSuperstep() == FAULTING_SUPERSTEP) && (getContext().getTaskAttemptID().getId() == 0) && - (getVertexId().get() == FAULTING_VERTEX_ID)) { + (getId().get() == FAULTING_VERTEX_ID)) { LOG.info("compute: Forced a fault on the first " + "attempt of superstep " + FAULTING_SUPERSTEP + " and vertex id " + @@ -88,37 +87,34 @@ public class SimpleCheckpointVertex exte return; } LOG.info("compute: " + sumAggregator); - sumAggregator.aggregate(getVertexId().get()); + sumAggregator.aggregate(getId().get()); LOG.info("compute: sum = " + sumAggregator.getAggregatedValue().get() + - " for vertex " + getVertexId()); + " for vertex " + getId()); float msgValue = 0.0f; - while (msgIterator.hasNext()) { - float curMsgValue = msgIterator.next().get(); + for (FloatWritable message : messages) { + float curMsgValue = message.get(); msgValue += curMsgValue; LOG.info("compute: got msgValue = " + curMsgValue + - " for vertex " + getVertexId() + + " for vertex " + getId() + " on superstep " + getSuperstep()); } - int vertexValue = getVertexValue().get(); - setVertexValue(new IntWritable(vertexValue + (int) msgValue)); - LOG.info("compute: vertex " + getVertexId() + - " has value " + getVertexValue() + + int vertexValue = getValue().get(); + setValue(new IntWritable(vertexValue + (int) msgValue)); + LOG.info("compute: vertex " + getId() + + " has value " + getValue() + " on superstep " + getSuperstep()); - for (Iterator edges = getOutEdgesIterator(); - edges.hasNext();) { - LongWritable targetVertexId = edges.next(); - FloatWritable edgeValue = getEdgeValue(targetVertexId); - LOG.info("compute: vertex " + getVertexId() + - " sending edgeValue " + edgeValue + + for (Edge edge : getEdges()) { + FloatWritable newEdgeValue = new FloatWritable(edge.getValue().get() + + (float) vertexValue); + LOG.info("compute: vertex " + getId() + + " sending edgeValue " + edge.getValue() + " vertexValue " + vertexValue + - " total " + (edgeValue.get() + - (float) vertexValue) + - " to vertex " + targetVertexId + + " total " + newEdgeValue + + " to vertex " + edge.getTargetVertexId() + " on superstep " + getSuperstep()); - edgeValue.set(edgeValue.get() + (float) vertexValue); - addEdge(targetVertexId, edgeValue); - sendMsg(targetVertexId, new FloatWritable(edgeValue.get())); + addEdge(edge.getTargetVertexId(), newEdgeValue); + sendMessage(edge.getTargetVertexId(), newEdgeValue); } } Modified: giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleCombinerVertex.java URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleCombinerVertex.java?rev=1365352&r1=1365351&r2=1365352&view=diff ============================================================================== --- giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleCombinerVertex.java (original) +++ giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleCombinerVertex.java Tue Jul 24 23:37:42 2012 @@ -18,15 +18,12 @@ package org.apache.giraph.examples; -import java.util.Iterator; - +import org.apache.giraph.graph.EdgeListVertex; import org.apache.hadoop.io.FloatWritable; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.log4j.Logger; -import org.apache.giraph.graph.EdgeListVertex; - /** * Test whether messages can go through a combiner. */ @@ -36,20 +33,20 @@ public class SimpleCombinerVertex extend private static Logger LOG = Logger.getLogger(SimpleCombinerVertex.class); @Override - public void compute(Iterator msgIterator) { - if (getVertexId().equals(new LongWritable(2))) { - sendMsg(new LongWritable(1), new IntWritable(101)); - sendMsg(new LongWritable(1), new IntWritable(102)); - sendMsg(new LongWritable(1), new IntWritable(103)); + public void compute(Iterable messages) { + if (getId().equals(new LongWritable(2))) { + sendMessage(new LongWritable(1), new IntWritable(101)); + sendMessage(new LongWritable(1), new IntWritable(102)); + sendMessage(new LongWritable(1), new IntWritable(103)); } - if (!getVertexId().equals(new LongWritable(1))) { + if (!getId().equals(new LongWritable(1))) { voteToHalt(); } else { // Check the messages int sum = 0; int num = 0; - while (msgIterator != null && msgIterator.hasNext()) { - sum += msgIterator.next().get(); + for (IntWritable message : messages) { + sum += message.get(); num++; } LOG.info("TestCombinerVertex: Received a sum of " + sum + Modified: giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleFailVertex.java URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleFailVertex.java?rev=1365352&r1=1365351&r2=1365352&view=diff ============================================================================== --- giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleFailVertex.java (original) +++ giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleFailVertex.java Tue Jul 24 23:37:42 2012 @@ -24,8 +24,6 @@ import org.apache.hadoop.io.FloatWritabl import org.apache.hadoop.io.LongWritable; import org.apache.log4j.Logger; -import java.util.Iterator; - /** * Vertex to allow unit testing of failure detection */ @@ -37,18 +35,18 @@ public class SimpleFailVertex extends Ed private static long SUPERSTEP = 0; @Override - public void compute(Iterator msgIterator) { + public void compute(Iterable messages) { if (getSuperstep() >= 1) { double sum = 0; - while (msgIterator.hasNext()) { - sum += msgIterator.next().get(); + for (DoubleWritable message : messages) { + sum += message.get(); } DoubleWritable vertexValue = - new DoubleWritable((0.15f / getNumVertices()) + 0.85f * sum); - setVertexValue(vertexValue); + new DoubleWritable((0.15f / getTotalNumVertices()) + 0.85f * sum); + setValue(vertexValue); if (getSuperstep() < 30) { if (getSuperstep() == 20) { - if (getVertexId().get() == 10L) { + if (getId().get() == 10L) { try { Thread.sleep(2000); } catch (InterruptedException e) { @@ -59,9 +57,9 @@ public class SimpleFailVertex extends Ed return; } } - long edges = getNumOutEdges(); - sendMsgToAllEdges( - new DoubleWritable(getVertexValue().get() / edges)); + long edges = getNumEdges(); + sendMessageToAllEdges( + new DoubleWritable(getValue().get() / edges)); } else { voteToHalt(); } Modified: giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleLongDoubleDoubleDoubleIdentityVertex.java URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleLongDoubleDoubleDoubleIdentityVertex.java?rev=1365352&r1=1365351&r2=1365352&view=diff ============================================================================== --- giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleLongDoubleDoubleDoubleIdentityVertex.java (original) +++ giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleLongDoubleDoubleDoubleIdentityVertex.java Tue Jul 24 23:37:42 2012 @@ -27,6 +27,6 @@ import org.apache.hadoop.io.LongWritable * lib.LongDoubleDoubleAdjacencyListVertexInputFormat */ -public class SimpleLongDoubleDoubleDoubleIdentityVertex extends +public abstract class SimpleLongDoubleDoubleDoubleIdentityVertex extends IdentityVertex { } Modified: giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleMasterComputeVertex.java URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleMasterComputeVertex.java?rev=1365352&r1=1365351&r2=1365352&view=diff ============================================================================== --- giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleMasterComputeVertex.java (original) +++ giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleMasterComputeVertex.java Tue Jul 24 23:37:42 2012 @@ -28,7 +28,6 @@ import org.apache.log4j.Logger; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; -import java.util.Iterator; /** * Demonstrates a computation with a centralized part implemented via a @@ -42,13 +41,13 @@ public class SimpleMasterComputeVertex e Logger.getLogger(SimpleMasterComputeVertex.class); @Override - public void compute(Iterator msgIterator) { + public void compute(Iterable messages) { DoubleOverwriteAggregator agg = (DoubleOverwriteAggregator) getAggregator(SMC_AGG); - double oldSum = getSuperstep() == 0 ? 0 : getVertexValue().get(); + double oldSum = getSuperstep() == 0 ? 0 : getValue().get(); double newValue = agg.getAggregatedValue().get(); double newSum = oldSum + newValue; - setVertexValue(new DoubleWritable(newSum)); + setValue(new DoubleWritable(newSum)); SimpleMasterComputeWorkerContext workerContext = (SimpleMasterComputeWorkerContext) getWorkerContext(); workerContext.setFinalSum(newSum); Modified: giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleMsgVertex.java URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleMsgVertex.java?rev=1365352&r1=1365351&r2=1365352&view=diff ============================================================================== --- giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleMsgVertex.java (original) +++ giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleMsgVertex.java Tue Jul 24 23:37:42 2012 @@ -18,15 +18,12 @@ package org.apache.giraph.examples; -import java.util.Iterator; - +import org.apache.giraph.graph.EdgeListVertex; import org.apache.hadoop.io.FloatWritable; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.log4j.Logger; -import org.apache.giraph.graph.EdgeListVertex; - /** * Test whether messages can be sent and received by vertices. */ @@ -35,19 +32,19 @@ public class SimpleMsgVertex extends /** Class logger */ private static Logger LOG = Logger.getLogger(SimpleMsgVertex.class); @Override - public void compute(Iterator msgIterator) { - if (getVertexId().equals(new LongWritable(2))) { - sendMsg(new LongWritable(1), new IntWritable(101)); - sendMsg(new LongWritable(1), new IntWritable(102)); - sendMsg(new LongWritable(1), new IntWritable(103)); + public void compute(Iterable messages) { + if (getId().equals(new LongWritable(2))) { + sendMessage(new LongWritable(1), new IntWritable(101)); + sendMessage(new LongWritable(1), new IntWritable(102)); + sendMessage(new LongWritable(1), new IntWritable(103)); } - if (!getVertexId().equals(new LongWritable(1))) { + if (!getId().equals(new LongWritable(1))) { voteToHalt(); } else { /* Check the messages */ int sum = 0; - while (msgIterator != null && msgIterator.hasNext()) { - sum += msgIterator.next().get(); + for (IntWritable message : messages) { + sum += message.get(); } LOG.info("TestMsgVertex: Received a sum of " + sum + " (will stop on 306)"); Modified: giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleMutateGraphVertex.java URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleMutateGraphVertex.java?rev=1365352&r1=1365351&r2=1365352&view=diff ============================================================================== --- giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleMutateGraphVertex.java (original) +++ giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleMutateGraphVertex.java Tue Jul 24 23:37:42 2012 @@ -18,18 +18,16 @@ package org.apache.giraph.examples; -import java.io.IOException; -import java.util.Iterator; - +import org.apache.giraph.graph.Edge; +import org.apache.giraph.graph.EdgeListVertex; +import org.apache.giraph.graph.Vertex; +import org.apache.giraph.graph.WorkerContext; import org.apache.hadoop.io.DoubleWritable; import org.apache.hadoop.io.FloatWritable; import org.apache.hadoop.io.LongWritable; import org.apache.log4j.Logger; -import org.apache.giraph.graph.BasicVertex; -import org.apache.giraph.graph.Edge; -import org.apache.giraph.graph.EdgeListVertex; -import org.apache.giraph.graph.WorkerContext; +import java.io.IOException; /** * Vertex to allow unit testing of graph mutations. @@ -55,7 +53,7 @@ public class SimpleMutateGraphVertex ext } @Override - public void compute(Iterator msgIterator) + public void compute(Iterable messages) throws IOException { SimpleMutateGraphVertexWorkerContext workerContext = (SimpleMutateGraphVertexWorkerContext) getWorkerContext(); @@ -65,77 +63,77 @@ public class SimpleMutateGraphVertex ext // Send messages to vertices that are sure not to exist // (creating them) LongWritable destVertexId = - new LongWritable(rangeVertexIdStart(1) + getVertexId().get()); - sendMsg(destVertexId, new DoubleWritable(0.0)); + new LongWritable(rangeVertexIdStart(1) + getId().get()); + sendMessage(destVertexId, new DoubleWritable(0.0)); } else if (getSuperstep() == 2) { LOG.debug("Reached superstep " + getSuperstep()); } else if (getSuperstep() == 3) { long vertexCount = workerContext.getVertexCount(); - if (vertexCount * 2 != getNumVertices()) { + if (vertexCount * 2 != getTotalNumVertices()) { throw new IllegalStateException( - "Impossible to have " + getNumVertices() + + "Impossible to have " + getTotalNumVertices() + " vertices when should have " + vertexCount * 2 + " on superstep " + getSuperstep()); } long edgeCount = workerContext.getEdgeCount(); - if (edgeCount != getNumEdges()) { + if (edgeCount != getTotalNumEdges()) { throw new IllegalStateException( - "Impossible to have " + getNumEdges() + + "Impossible to have " + getTotalNumEdges() + " edges when should have " + edgeCount + " on superstep " + getSuperstep()); } // Create vertices that are sure not to exist (doubling vertices) LongWritable vertexIndex = - new LongWritable(rangeVertexIdStart(3) + getVertexId().get()); - BasicVertex vertex = - instantiateVertex(vertexIndex, null, null, null); + new LongWritable(rangeVertexIdStart(3) + getId().get()); + Vertex vertex = + instantiateVertex(vertexIndex, new DoubleWritable(0.0), null, null); addVertexRequest(vertex); // Add edges to those remote vertices as well addEdgeRequest(vertexIndex, new Edge( - getVertexId(), new FloatWritable(0.0f))); + getId(), new FloatWritable(0.0f))); } else if (getSuperstep() == 4) { LOG.debug("Reached superstep " + getSuperstep()); } else if (getSuperstep() == 5) { long vertexCount = workerContext.getVertexCount(); - if (vertexCount * 2 != getNumVertices()) { + if (vertexCount * 2 != getTotalNumVertices()) { throw new IllegalStateException( - "Impossible to have " + getNumVertices() + + "Impossible to have " + getTotalNumVertices() + " when should have " + vertexCount * 2 + " on superstep " + getSuperstep()); } long edgeCount = workerContext.getEdgeCount(); - if (edgeCount + vertexCount != getNumEdges()) { + if (edgeCount + vertexCount != getTotalNumEdges()) { throw new IllegalStateException( - "Impossible to have " + getNumEdges() + + "Impossible to have " + getTotalNumEdges() + " edges when should have " + edgeCount + vertexCount + " on superstep " + getSuperstep()); } // Remove the edges created in superstep 3 LongWritable vertexIndex = - new LongWritable(rangeVertexIdStart(3) + getVertexId().get()); + new LongWritable(rangeVertexIdStart(3) + getId().get()); workerContext.increaseEdgesRemoved(); - removeEdgeRequest(vertexIndex, getVertexId()); + removeEdgeRequest(vertexIndex, getId()); } else if (getSuperstep() == 6) { // Remove all the vertices created in superstep 3 - if (getVertexId().compareTo( + if (getId().compareTo( new LongWritable(rangeVertexIdStart(3))) >= 0) { - removeVertexRequest(getVertexId()); + removeVertexRequest(getId()); } } else if (getSuperstep() == 7) { long origEdgeCount = workerContext.getOrigEdgeCount(); - if (origEdgeCount != getNumEdges()) { + if (origEdgeCount != getTotalNumEdges()) { throw new IllegalStateException( - "Impossible to have " + getNumEdges() + + "Impossible to have " + getTotalNumEdges() + " edges when should have " + origEdgeCount + " on superstep " + getSuperstep()); } } else if (getSuperstep() == 8) { long vertexCount = workerContext.getVertexCount(); - if (vertexCount / 2 != getNumVertices()) { + if (vertexCount / 2 != getTotalNumVertices()) { throw new IllegalStateException( - "Impossible to have " + getNumVertices() + + "Impossible to have " + getTotalNumVertices() + " vertices when should have " + vertexCount / 2 + " on superstep " + getSuperstep()); } @@ -170,8 +168,8 @@ public class SimpleMutateGraphVertex ext @Override public void postSuperstep() { - vertexCount = getNumVertices(); - edgeCount = getNumEdges(); + vertexCount = getTotalNumVertices(); + edgeCount = getTotalNumEdges(); if (getSuperstep() == 1) { origEdgeCount = edgeCount; } Modified: giraph/trunk/src/main/java/org/apache/giraph/examples/SimplePageRankVertex.java URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/examples/SimplePageRankVertex.java?rev=1365352&r1=1365351&r2=1365352&view=diff ============================================================================== --- giraph/trunk/src/main/java/org/apache/giraph/examples/SimplePageRankVertex.java (original) +++ giraph/trunk/src/main/java/org/apache/giraph/examples/SimplePageRankVertex.java Tue Jul 24 23:37:42 2012 @@ -18,14 +18,12 @@ package org.apache.giraph.examples; -import com.google.common.collect.Maps; - import org.apache.giraph.aggregators.DoubleMaxAggregator; import org.apache.giraph.aggregators.DoubleMinAggregator; import org.apache.giraph.aggregators.LongSumAggregator; -import org.apache.giraph.graph.BasicVertex; import org.apache.giraph.graph.BspUtils; import org.apache.giraph.graph.LongDoubleFloatDoubleVertex; +import org.apache.giraph.graph.Vertex; import org.apache.giraph.graph.VertexReader; import org.apache.giraph.graph.VertexWriter; import org.apache.giraph.graph.WorkerContext; @@ -40,8 +38,9 @@ import org.apache.hadoop.mapreduce.Recor import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.log4j.Logger; +import com.google.common.collect.Maps; + import java.io.IOException; -import java.util.Iterator; import java.util.Map; /** @@ -58,30 +57,31 @@ public class SimplePageRankVertex extend Logger.getLogger(SimplePageRankVertex.class); @Override - public void compute(Iterator msgIterator) { + public void compute(Iterable messages) { LongSumAggregator sumAggreg = (LongSumAggregator) getAggregator("sum"); DoubleMinAggregator minAggreg = (DoubleMinAggregator) getAggregator("min"); DoubleMaxAggregator maxAggreg = (DoubleMaxAggregator) getAggregator("max"); + if (getSuperstep() >= 1) { double sum = 0; - while (msgIterator.hasNext()) { - sum += msgIterator.next().get(); + for (DoubleWritable message : messages) { + sum += message.get(); } DoubleWritable vertexValue = - new DoubleWritable((0.15f / getNumVertices()) + 0.85f * sum); - setVertexValue(vertexValue); + new DoubleWritable((0.15f / getTotalNumVertices()) + 0.85f * sum); + setValue(vertexValue); maxAggreg.aggregate(vertexValue); minAggreg.aggregate(vertexValue); sumAggreg.aggregate(1L); - LOG.info(getVertexId() + ": PageRank=" + vertexValue + + LOG.info(getId() + ": PageRank=" + vertexValue + " max=" + maxAggreg.getAggregatedValue() + " min=" + minAggreg.getAggregatedValue()); } if (getSuperstep() < MAX_SUPERSTEPS) { - long edges = getNumOutEdges(); - sendMsgToAllEdges( - new DoubleWritable(getVertexValue().get() / edges)); + long edges = getNumEdges(); + sendMessageToAllEdges( + new DoubleWritable(getValue().get() / edges)); } else { voteToHalt(); } @@ -150,11 +150,11 @@ public class SimplePageRankVertex extend if (getSuperstep() >= 3) { LOG.info("aggregatedNumVertices=" + sumAggreg.getAggregatedValue() + - " NumVertices=" + getNumVertices()); - if (sumAggreg.getAggregatedValue().get() != getNumVertices()) { + " NumVertices=" + getTotalNumVertices()); + if (sumAggreg.getAggregatedValue().get() != getTotalNumVertices()) { throw new RuntimeException("wrong value of SumAggreg: " + sumAggreg.getAggregatedValue() + ", should be: " + - getNumVertices()); + getTotalNumVertices()); } DoubleWritable maxPagerank = maxAggreg.getAggregatedValue(); LOG.info("aggregatedMaxPageRank=" + maxPagerank.get()); @@ -194,27 +194,27 @@ public class SimplePageRankVertex extend } @Override - public BasicVertex + public Vertex getCurrentVertex() throws IOException { - BasicVertex + Vertex vertex = BspUtils.createVertex(configuration); LongWritable vertexId = new LongWritable( (inputSplit.getSplitIndex() * totalRecords) + recordsRead); DoubleWritable vertexValue = new DoubleWritable(vertexId.get() * 10d); - long destVertexId = + long targetVertexId = (vertexId.get() + 1) % (inputSplit.getNumSplits() * totalRecords); float edgeValue = vertexId.get() * 100f; Map edges = Maps.newHashMap(); - edges.put(new LongWritable(destVertexId), new FloatWritable(edgeValue)); + edges.put(new LongWritable(targetVertexId), new FloatWritable(edgeValue)); vertex.initialize(vertexId, vertexValue, edges, null); ++recordsRead; if (LOG.isInfoEnabled()) { - LOG.info("next: Return vertexId=" + vertex.getVertexId().get() + - ", vertexValue=" + vertex.getVertexValue() + - ", destinationId=" + destVertexId + ", edgeValue=" + edgeValue); + LOG.info("next: Return vertexId=" + vertex.getId().get() + + ", vertexValue=" + vertex.getValue() + + ", targetVertexId=" + targetVertexId + ", edgeValue=" + edgeValue); } return vertex; } @@ -252,11 +252,11 @@ public class SimplePageRankVertex extend @Override public void writeVertex( - BasicVertex vertex) + Vertex vertex) throws IOException, InterruptedException { getRecordWriter().write( - new Text(vertex.getVertexId().toString()), - new Text(vertex.getVertexValue().toString())); + new Text(vertex.getId().toString()), + new Text(vertex.getValue().toString())); } } Modified: giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleShortestPathsVertex.java URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleShortestPathsVertex.java?rev=1365352&r1=1365351&r2=1365352&view=diff ============================================================================== --- giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleShortestPathsVertex.java (original) +++ giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleShortestPathsVertex.java Tue Jul 24 23:37:42 2012 @@ -18,14 +18,13 @@ package org.apache.giraph.examples; +import org.apache.giraph.graph.Edge; import org.apache.giraph.graph.EdgeListVertex; import org.apache.hadoop.io.DoubleWritable; import org.apache.hadoop.io.FloatWritable; import org.apache.hadoop.io.LongWritable; import org.apache.log4j.Logger; -import java.util.Iterator; - /** * Demonstrates the basic Pregel shortest paths implementation. */ @@ -50,37 +49,33 @@ public class SimpleShortestPathsVertex e * @return True if the source id */ private boolean isSource() { - return getVertexId().get() == + return getId().get() == getContext().getConfiguration().getLong(SOURCE_ID, SOURCE_ID_DEFAULT); } @Override - public void compute(Iterator msgIterator) { + public void compute(Iterable messages) { if (getSuperstep() == 0) { - setVertexValue(new DoubleWritable(Double.MAX_VALUE)); + setValue(new DoubleWritable(Double.MAX_VALUE)); } double minDist = isSource() ? 0d : Double.MAX_VALUE; - while (msgIterator.hasNext()) { - minDist = Math.min(minDist, msgIterator.next().get()); + for (DoubleWritable message : messages) { + minDist = Math.min(minDist, message.get()); } if (LOG.isDebugEnabled()) { - LOG.debug("Vertex " + getVertexId() + " got minDist = " + minDist + - " vertex value = " + getVertexValue()); + LOG.debug("Vertex " + getId() + " got minDist = " + minDist + + " vertex value = " + getValue()); } - if (minDist < getVertexValue().get()) { - setVertexValue(new DoubleWritable(minDist)); - for (Iterator edges = getOutEdgesIterator(); - edges.hasNext();) { - LongWritable targetVertexId = edges.next(); - FloatWritable edgeValue = getEdgeValue(targetVertexId); + if (minDist < getValue().get()) { + setValue(new DoubleWritable(minDist)); + for (Edge edge : getEdges()) { + double distance = minDist + edge.getValue().get(); if (LOG.isDebugEnabled()) { - LOG.debug("Vertex " + getVertexId() + " sent to " + - targetVertexId + " = " + - (minDist + edgeValue.get())); + LOG.debug("Vertex " + getId() + " sent to " + + edge.getTargetVertexId() + " = " + distance); } - sendMsg(targetVertexId, - new DoubleWritable(minDist + edgeValue.get())); + sendMessage(edge.getTargetVertexId(), new DoubleWritable(distance)); } } voteToHalt(); Modified: giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleSuperstepVertex.java URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleSuperstepVertex.java?rev=1365352&r1=1365351&r2=1365352&view=diff ============================================================================== --- giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleSuperstepVertex.java (original) +++ giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleSuperstepVertex.java Tue Jul 24 23:37:42 2012 @@ -18,10 +18,9 @@ package org.apache.giraph.examples; -import com.google.common.collect.Maps; -import org.apache.giraph.graph.BasicVertex; import org.apache.giraph.graph.BspUtils; import org.apache.giraph.graph.EdgeListVertex; +import org.apache.giraph.graph.Vertex; import org.apache.giraph.graph.VertexReader; import org.apache.giraph.graph.VertexWriter; import org.apache.giraph.lib.TextVertexOutputFormat; @@ -35,8 +34,9 @@ import org.apache.hadoop.mapreduce.Recor import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.log4j.Logger; +import com.google.common.collect.Maps; + import java.io.IOException; -import java.util.Iterator; import java.util.Map; /** @@ -46,7 +46,7 @@ import java.util.Map; public class SimpleSuperstepVertex extends EdgeListVertex { @Override - public void compute(Iterator msgIterator) { + public void compute(Iterable messages) { if (getSuperstep() > 3) { voteToHalt(); } @@ -74,11 +74,10 @@ public class SimpleSuperstepVertex exten } @Override - public BasicVertex getCurrentVertex() + public Vertex getCurrentVertex() throws IOException, InterruptedException { - BasicVertex vertex = + Vertex vertex = BspUtils.createVertex(configuration); long tmpId = reverseIdOrder ? @@ -89,18 +88,18 @@ public class SimpleSuperstepVertex exten IntWritable vertexValue = new IntWritable((int) (vertexId.get() * 10)); Map edgeMap = Maps.newHashMap(); - long destVertexId = + long targetVertexId = (vertexId.get() + 1) % (inputSplit.getNumSplits() * totalRecords); float edgeValue = vertexId.get() * 100f; - edgeMap.put(new LongWritable(destVertexId), + edgeMap.put(new LongWritable(targetVertexId), new FloatWritable(edgeValue)); vertex.initialize(vertexId, vertexValue, edgeMap, null); ++recordsRead; if (LOG.isInfoEnabled()) { - LOG.info("next: Return vertexId=" + vertex.getVertexId().get() + - ", vertexValue=" + vertex.getVertexValue() + - ", destinationId=" + destVertexId + + LOG.info("next: Return vertexId=" + vertex.getId().get() + + ", vertexValue=" + vertex.getValue() + + ", targetVertexId=" + targetVertexId + ", edgeValue=" + edgeValue); } return vertex; @@ -137,11 +136,11 @@ public class SimpleSuperstepVertex exten } @Override - public void writeVertex(BasicVertex vertex) throws IOException, InterruptedException { + public void writeVertex(Vertex vertex) throws IOException, InterruptedException { getRecordWriter().write( - new Text(vertex.getVertexId().toString()), - new Text(vertex.getVertexValue().toString())); + new Text(vertex.getId().toString()), + new Text(vertex.getValue().toString())); } } Modified: giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleTextVertexOutputFormat.java URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleTextVertexOutputFormat.java?rev=1365352&r1=1365351&r2=1365352&view=diff ============================================================================== --- giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleTextVertexOutputFormat.java (original) +++ giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleTextVertexOutputFormat.java Tue Jul 24 23:37:42 2012 @@ -25,7 +25,7 @@ import org.apache.hadoop.io.LongWritable import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.TaskAttemptContext; -import org.apache.giraph.graph.BasicVertex; +import org.apache.giraph.graph.Vertex; import org.apache.giraph.graph.VertexWriter; import org.apache.giraph.lib.TextVertexOutputFormat; @@ -51,11 +51,11 @@ public class SimpleTextVertexOutputForma @Override public void writeVertex( - BasicVertex vertex) + Vertex vertex) throws IOException, InterruptedException { getRecordWriter().write( - new Text(vertex.getVertexId().toString()), - new Text(vertex.getVertexValue().toString())); + new Text(vertex.getId().toString()), + new Text(vertex.getValue().toString())); } } Modified: giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleTriangleClosingVertex.java URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleTriangleClosingVertex.java?rev=1365352&r1=1365351&r2=1365352&view=diff ============================================================================== --- giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleTriangleClosingVertex.java (original) +++ giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleTriangleClosingVertex.java Tue Jul 24 23:37:42 2012 @@ -18,17 +18,17 @@ package org.apache.giraph.examples; -import org.apache.hadoop.io.Writable; -import org.apache.hadoop.io.NullWritable; -import org.apache.hadoop.io.IntWritable; -import org.apache.hadoop.io.ArrayWritable; +import org.apache.giraph.graph.Edge; import org.apache.giraph.graph.EdgeListVertex; +import org.apache.hadoop.io.ArrayWritable; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.Writable; -import java.util.Iterator; +import java.util.HashSet; import java.util.Map; -import java.util.TreeMap; import java.util.Set; -import java.util.HashSet; +import java.util.TreeMap; /** * Demonstrates triangle closing in simple, @@ -71,22 +71,20 @@ public class SimpleTriangleClosingVertex private Set recvSet = new HashSet(); @Override - public void compute(Iterator msgIterator) { + public void compute(Iterable messages) { if (getSuperstep() == 0) { // obtain list of all out-edges from THIS vertex - Iterator iterator = getOutEdgesIterator(); - while (iterator.hasNext()) { - sendMsgToAllEdges(iterator.next()); + for (Edge edge : getEdges()) { + sendMessageToAllEdges(edge.getTargetVertexId()); } } else { - while (msgIterator.hasNext()) { - IntWritable iw = msgIterator.next(); - int inId = iw.get(); + for (IntWritable message : messages) { + int inId = message.get(); if (recvSet.contains(inId)) { - int current = closeMap.get(iw) == null ? 0 : inId; - closeMap.put(iw, current + 1); + int current = closeMap.get(message) == null ? 0 : inId; + closeMap.put(message, current + 1); } - if (inId != getVertexId().get()) { + if (inId != getId().get()) { recvSet.add(inId); } } @@ -97,7 +95,7 @@ public class SimpleTriangleClosingVertex } IntArrayWritable result = new IntArrayWritable(); result.set(temp); - setVertexValue(result); + setValue(result); } voteToHalt(); } Modified: giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleVertexWithWorkerContext.java URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleVertexWithWorkerContext.java?rev=1365352&r1=1365351&r2=1365352&view=diff ============================================================================== --- giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleVertexWithWorkerContext.java (original) +++ giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleVertexWithWorkerContext.java Tue Jul 24 23:37:42 2012 @@ -18,14 +18,10 @@ package org.apache.giraph.examples; -import java.io.DataOutputStream; -import java.io.IOException; -import java.util.Iterator; - import org.apache.giraph.examples.SimpleSuperstepVertex. - SimpleSuperstepVertexInputFormat; -import org.apache.giraph.graph.GiraphJob; + SimpleSuperstepVertexInputFormat; import org.apache.giraph.graph.EdgeListVertex; +import org.apache.giraph.graph.GiraphJob; import org.apache.giraph.graph.WorkerContext; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; @@ -38,6 +34,9 @@ import org.apache.hadoop.mapreduce.Mappe import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; +import java.io.DataOutputStream; +import java.io.IOException; + /** * Fully runnable example of how to * emit worker data to HDFS during a graph @@ -52,15 +51,14 @@ public class SimpleVertexWithWorkerConte private static final int TESTLENGTH = 30; @Override - public void compute(Iterator msgIterator) - throws IOException { + public void compute(Iterable messages) throws IOException { long superstep = getSuperstep(); if (superstep < TESTLENGTH) { EmitterWorkerContext emitter = (EmitterWorkerContext) getWorkerContext(); - emitter.emit("vertexId=" + getVertexId() + + emitter.emit("vertexId=" + getId() + " superstep=" + superstep + "\n"); } else { voteToHalt(); Modified: giraph/trunk/src/main/java/org/apache/giraph/examples/VerifyMessage.java URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/examples/VerifyMessage.java?rev=1365352&r1=1365351&r2=1365352&view=diff ============================================================================== --- giraph/trunk/src/main/java/org/apache/giraph/examples/VerifyMessage.java (original) +++ giraph/trunk/src/main/java/org/apache/giraph/examples/VerifyMessage.java Tue Jul 24 23:37:42 2012 @@ -19,6 +19,7 @@ package org.apache.giraph.examples; import org.apache.giraph.aggregators.LongSumAggregator; +import org.apache.giraph.graph.Edge; import org.apache.giraph.graph.EdgeListVertex; import org.apache.giraph.graph.WorkerContext; import org.apache.hadoop.io.FloatWritable; @@ -30,7 +31,6 @@ import org.apache.log4j.Logger; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; -import java.util.Iterator; /** * An example that simply uses its id, value, and edges to compute new data @@ -142,7 +142,7 @@ public class VerifyMessage { } @Override - public void compute(Iterator msgIterator) { + public void compute(Iterable messages) { LongSumAggregator sumAggregator = (LongSumAggregator) getAggregator(LongSumAggregator.class.getName()); if (getSuperstep() > SUPERSTEPS) { @@ -152,61 +152,57 @@ public class VerifyMessage { if (LOG.isDebugEnabled()) { LOG.debug("compute: " + sumAggregator); } - sumAggregator.aggregate(getVertexId().get()); + sumAggregator.aggregate(getId().get()); if (LOG.isDebugEnabled()) { LOG.debug("compute: sum = " + sumAggregator.getAggregatedValue().get() + - " for vertex " + getVertexId()); + " for vertex " + getId()); } float msgValue = 0.0f; - while (msgIterator.hasNext()) { - VerifiableMessage msg = msgIterator.next(); - msgValue += msg.value; + for (VerifiableMessage message : messages) { + msgValue += message.value; if (LOG.isDebugEnabled()) { - LOG.debug("compute: got msg = " + msg + - " for vertex id " + getVertexId() + - ", vertex value " + getVertexValue() + + LOG.debug("compute: got msg = " + message + + " for vertex id " + getId() + + ", vertex value " + getValue() + " on superstep " + getSuperstep()); } - if (msg.superstep != getSuperstep() - 1) { + if (message.superstep != getSuperstep() - 1) { throw new IllegalStateException( "compute: Impossible to not get a messsage from " + "the previous superstep, current superstep = " + getSuperstep()); } - if ((msg.sourceVertexId != getVertexId().get() - 1) && - (getVertexId().get() != 0)) { + if ((message.sourceVertexId != getId().get() - 1) && + (getId().get() != 0)) { throw new IllegalStateException( "compute: Impossible that this message didn't come " + "from the previous vertex and came from " + - msg.sourceVertexId); + message.sourceVertexId); } } - int vertexValue = getVertexValue().get(); - setVertexValue(new IntWritable(vertexValue + (int) msgValue)); + int vertexValue = getValue().get(); + setValue(new IntWritable(vertexValue + (int) msgValue)); if (LOG.isDebugEnabled()) { - LOG.debug("compute: vertex " + getVertexId() + - " has value " + getVertexValue() + + LOG.debug("compute: vertex " + getId() + + " has value " + getValue() + " on superstep " + getSuperstep()); } - for (Iterator edges = getOutEdgesIterator(); - edges.hasNext();) { - LongWritable targetVertexId = edges.next(); - FloatWritable edgeValue = getEdgeValue(targetVertexId); + for (Edge edge : getEdges()) { + FloatWritable newEdgeValue = new FloatWritable( + edge.getValue().get() + (float) vertexValue); if (LOG.isDebugEnabled()) { - LOG.debug("compute: vertex " + getVertexId() + - " sending edgeValue " + edgeValue + + LOG.debug("compute: vertex " + getId() + + " sending edgeValue " + edge.getValue() + " vertexValue " + vertexValue + - " total " + - (edgeValue.get() + (float) vertexValue) + - " to vertex " + targetVertexId + + " total " + newEdgeValue + + " to vertex " + edge.getTargetVertexId() + " on superstep " + getSuperstep()); } - edgeValue.set(edgeValue.get() + (float) vertexValue); - addEdge(targetVertexId, edgeValue); - sendMsg(targetVertexId, + addEdge(edge.getTargetVertexId(), newEdgeValue); + sendMessage(edge.getTargetVertexId(), new VerifiableMessage( - getSuperstep(), getVertexId().get(), edgeValue.get())); + getSuperstep(), getId().get(), newEdgeValue.get())); } } } Modified: giraph/trunk/src/main/java/org/apache/giraph/examples/VertexWithComponentTextOutputFormat.java URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/examples/VertexWithComponentTextOutputFormat.java?rev=1365352&r1=1365351&r2=1365352&view=diff ============================================================================== --- giraph/trunk/src/main/java/org/apache/giraph/examples/VertexWithComponentTextOutputFormat.java (original) +++ giraph/trunk/src/main/java/org/apache/giraph/examples/VertexWithComponentTextOutputFormat.java Tue Jul 24 23:37:42 2012 @@ -18,7 +18,7 @@ package org.apache.giraph.examples; -import org.apache.giraph.graph.BasicVertex; +import org.apache.giraph.graph.Vertex; import org.apache.giraph.graph.VertexWriter; import org.apache.giraph.lib.TextVertexOutputFormat; import org.apache.hadoop.io.IntWritable; @@ -63,13 +63,13 @@ public class VertexWithComponentTextOutp } @Override - public void writeVertex(BasicVertex vertex) throws IOException, + public void writeVertex(Vertex vertex) throws IOException, InterruptedException { StringBuilder output = new StringBuilder(); - output.append(vertex.getVertexId().get()); + output.append(vertex.getId().get()); output.append('\t'); - output.append(vertex.getVertexValue().get()); + output.append(vertex.getValue().get()); getRecordWriter().write(new Text(output.toString()), null); } } Modified: giraph/trunk/src/main/java/org/apache/giraph/graph/BasicVertexResolver.java URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/graph/BasicVertexResolver.java?rev=1365352&r1=1365351&r2=1365352&view=diff ============================================================================== --- giraph/trunk/src/main/java/org/apache/giraph/graph/BasicVertexResolver.java (original) +++ giraph/trunk/src/main/java/org/apache/giraph/graph/BasicVertexResolver.java Tue Jul 24 23:37:42 2012 @@ -39,7 +39,7 @@ public interface BasicVertexResolver resolve(I vertexId, - BasicVertex vertex, + Vertex resolve(I vertexId, + Vertex vertex, VertexChanges vertexChanges, Iterable messages); @@ -57,5 +57,5 @@ public interface BasicVertexResolver instantiateVertex(); + Vertex instantiateVertex(); } Modified: giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceMaster.java URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceMaster.java?rev=1365352&r1=1365351&r2=1365352&view=diff ============================================================================== --- giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceMaster.java (original) +++ giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceMaster.java Tue Jul 24 23:37:42 2012 @@ -18,12 +18,16 @@ package org.apache.giraph.graph; -import net.iharder.Base64; import org.apache.giraph.bsp.ApplicationState; import org.apache.giraph.bsp.BspInputFormat; import org.apache.giraph.bsp.CentralizedServiceMaster; import org.apache.giraph.bsp.SuperstepState; import org.apache.giraph.graph.GraphMapper.MapFunctions; +import org.apache.giraph.graph.partition.MasterGraphPartitioner; +import org.apache.giraph.graph.partition.PartitionOwner; +import org.apache.giraph.graph.partition.PartitionStats; +import org.apache.giraph.graph.partition.PartitionUtils; +import org.apache.giraph.utils.WritableUtils; import org.apache.giraph.zk.BspEvent; import org.apache.giraph.zk.PredicateLock; import org.apache.hadoop.fs.FSDataOutputStream; @@ -48,6 +52,8 @@ import org.json.JSONArray; import org.json.JSONException; import org.json.JSONObject; +import net.iharder.Base64; + import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.DataInputStream; @@ -65,12 +71,6 @@ import java.util.Map; import java.util.Set; import java.util.TreeSet; -import org.apache.giraph.graph.partition.MasterGraphPartitioner; -import org.apache.giraph.graph.partition.PartitionOwner; -import org.apache.giraph.graph.partition.PartitionStats; -import org.apache.giraph.graph.partition.PartitionUtils; -import org.apache.giraph.utils.WritableUtils; - /** * ZooKeeper-based implementation of {@link CentralizedService}. * @@ -1556,8 +1556,8 @@ public class BspServiceMaster> entry : inputSplitCache.entrySet()) { if (!entry.getValue().getVertices().isEmpty()) { - commService.sendPartitionReq(entry.getKey().getWorkerInfo(), + commService.sendPartitionRequest(entry.getKey().getWorkerInfo(), entry.getValue()); entry.getValue().getVertices().clear(); } @@ -447,20 +447,20 @@ public class BspServiceWorker readerVertex = + Vertex readerVertex = vertexReader.getCurrentVertex(); - if (readerVertex.getVertexId() == null) { + if (readerVertex.getId() == null) { throw new IllegalArgumentException( "loadVertices: Vertex reader returned a vertex " + "without an id! - " + readerVertex); } - if (readerVertex.getVertexValue() == null) { - readerVertex.setVertexValue( + if (readerVertex.getValue() == null) { + readerVertex.setValue( BspUtils.createVertexValue(getConfiguration())); } PartitionOwner partitionOwner = workerGraphPartitioner.getPartitionOwner( - readerVertex.getVertexId()); + readerVertex.getId()); Partition partition = inputSplitCache.get(partitionOwner); if (partition == null) { @@ -469,23 +469,23 @@ public class BspServiceWorker oldVertex = + Vertex oldVertex = partition.putVertex(readerVertex); if (oldVertex != null) { LOG.warn("readVertices: Replacing vertex " + oldVertex + " with " + readerVertex); } if (partition.getVertices().size() >= maxVerticesPerPartition) { - commService.sendPartitionReq(partitionOwner.getWorkerInfo(), + commService.sendPartitionRequest(partitionOwner.getWorkerInfo(), partition); partition.getVertices().clear(); } ++vertexCount; - edgeCount += readerVertex.getNumOutEdges(); + edgeCount += readerVertex.getNumEdges(); getContext().progress(); ++totalVerticesLoaded; - totalEdgesLoaded += readerVertex.getNumOutEdges(); + totalEdgesLoaded += readerVertex.getNumEdges(); // Update status every half a million vertices if ((totalVerticesLoaded % 500000) == 0) { String status = "readVerticesFromInputSplit: Loaded " + @@ -519,9 +519,9 @@ public class BspServiceWorker vertex, - Iterable messageIterator) { - vertex.putMessages(messageIterator); + public void assignMessagesToVertex(Vertex vertex, + Iterable messages) { + vertex.putMessages(messages); } @Override @@ -674,7 +674,7 @@ public class BspServiceWorker(); for (Partition partition : getPartitionMap().values()) { PartitionStats partitionStats = - new PartitionStats(partition.getPartitionId(), + new PartitionStats(partition.getId(), partition.getVertices().size(), 0, partition.getEdgeCount()); @@ -1056,8 +1056,8 @@ public class BspServiceWorker getPartition(I vertexIndex) { - PartitionOwner partitionOwner = getVertexPartitionOwner(vertexIndex); + public Partition getPartition(I vertexId) { + PartitionOwner partitionOwner = getVertexPartitionOwner(vertexId); return workerPartitionMap.get(partitionOwner.getPartitionId()); } @Override - public BasicVertex getVertex(I vertexIndex) { - PartitionOwner partitionOwner = getVertexPartitionOwner(vertexIndex); + public Vertex getVertex(I vertexId) { + PartitionOwner partitionOwner = getVertexPartitionOwner(vertexId); if (workerPartitionMap.containsKey(partitionOwner.getPartitionId())) { return workerPartitionMap.get( - partitionOwner.getPartitionId()).getVertex(vertexIndex); + partitionOwner.getPartitionId()).getVertex(vertexId); } else { return null; } Modified: giraph/trunk/src/main/java/org/apache/giraph/graph/BspUtils.java URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/graph/BspUtils.java?rev=1365352&r1=1365351&r2=1365352&view=diff ============================================================================== --- giraph/trunk/src/main/java/org/apache/giraph/graph/BspUtils.java (original) +++ giraph/trunk/src/main/java/org/apache/giraph/graph/BspUtils.java Tue Jul 24 23:37:42 2012 @@ -356,7 +356,7 @@ public class BspUtils { } /** - * Get the user's subclassed {@link BasicVertex} + * Get the user's subclassed {@link Vertex} * * @param Vertex id * @param Vertex data @@ -368,11 +368,11 @@ public class BspUtils { @SuppressWarnings({ "rawtypes", "unchecked" }) public static - Class> getVertexClass(Configuration conf) { - return (Class>) + Class> getVertexClass(Configuration conf) { + return (Class>) conf.getClass(GiraphJob.VERTEX_CLASS, null, - BasicVertex.class); + Vertex.class); } /** @@ -387,10 +387,10 @@ public class BspUtils { */ @SuppressWarnings("rawtypes") public static BasicVertex + E extends Writable, M extends Writable> Vertex createVertex(Configuration conf) { - Class> vertexClass = getVertexClass(conf); - BasicVertex vertex = + Class> vertexClass = getVertexClass(conf); + Vertex vertex = ReflectionUtils.newInstance(vertexClass, conf); return vertex; } @@ -404,8 +404,8 @@ public class BspUtils { */ @SuppressWarnings("unchecked") public static Class - getVertexIndexClass(Configuration conf) { - return (Class) conf.getClass(GiraphJob.VERTEX_INDEX_CLASS, + getVertexIdClass(Configuration conf) { + return (Class) conf.getClass(GiraphJob.VERTEX_ID_CLASS, WritableComparable.class); } @@ -418,16 +418,16 @@ public class BspUtils { */ @SuppressWarnings("rawtypes") public static - I createVertexIndex(Configuration conf) { - Class vertexClass = getVertexIndexClass(conf); + I createVertexId(Configuration conf) { + Class vertexIdClass = getVertexIdClass(conf); try { - return vertexClass.newInstance(); + return vertexIdClass.newInstance(); } catch (InstantiationException e) { throw new IllegalArgumentException( - "createVertexIndex: Failed to instantiate", e); + "createVertexId: Failed to instantiate", e); } catch (IllegalAccessException e) { throw new IllegalArgumentException( - "createVertexIndex: Illegally accessed", e); + "createVertexId: Illegally accessed", e); } } Modified: giraph/trunk/src/main/java/org/apache/giraph/graph/Edge.java URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/graph/Edge.java?rev=1365352&r1=1365351&r2=1365352&view=diff ============================================================================== --- giraph/trunk/src/main/java/org/apache/giraph/graph/Edge.java (original) +++ giraph/trunk/src/main/java/org/apache/giraph/graph/Edge.java Tue Jul 24 23:37:42 2012 @@ -18,17 +18,11 @@ package org.apache.giraph.graph; -import org.apache.hadoop.conf.Configurable; -import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; -import java.io.DataInput; -import java.io.DataOutput; -import java.io.IOException; - /** - * A complete edge, the destination vertex and the edge value. Can only be one + * A complete edge, the target vertex and the edge value. Can only be one * edge with a destination vertex id per edge map. * * @param Vertex index @@ -36,13 +30,11 @@ import java.io.IOException; */ @SuppressWarnings("rawtypes") public class Edge - implements WritableComparable>, Configurable { - /** Destination vertex id */ - private I destVertexId = null; + implements Comparable> { + /** Target vertex id */ + private I targetVertexId = null; /** Edge value */ - private E edgeValue = null; - /** Configuration - Used to instantiate classes */ - private Configuration conf = null; + private E value = null; /** * Constructor for reflection @@ -52,21 +44,21 @@ public class EdgecreateVertexIndex(getConf()); - destVertexId.readFields(input); - edgeValue = BspUtils.createEdgeValue(getConf()); - edgeValue.readFields(input); - } - - @Override - public void write(DataOutput output) throws IOException { - if (destVertexId == null) { - throw new IllegalStateException( - "write: Null destination vertex index"); - } - if (edgeValue == null) { - throw new IllegalStateException( - "write: Null edge value"); - } - destVertexId.write(output); - edgeValue.write(output); - } - - @Override - public Configuration getConf() { - return conf; - } - - @Override - public void setConf(Configuration conf) { - this.conf = conf; + return "(TargetVertexId = " + targetVertexId + ", " + + "value = " + value + ")"; } @SuppressWarnings("unchecked") @Override public int compareTo(Edge edge) { - return destVertexId.compareTo(edge.getDestVertexId()); + return targetVertexId.compareTo(edge.getTargetVertexId()); } @Override @@ -151,12 +111,11 @@ public class Edge Vertex index value * @param Vertex value @@ -52,268 +48,134 @@ public abstract class EdgeListVertex { /** Class logger */ private static final Logger LOG = Logger.getLogger(EdgeListVertex.class); - /** Vertex id */ - private I vertexId = null; - /** Vertex value */ - private V vertexValue = null; - /** List of the dest edge indices */ - private List destEdgeIndexList; - /** List of the dest edge values */ - private List destEdgeValueList; + /** List of edges */ + private List> edgeList = Lists.newArrayList(); /** List of incoming messages from the previous superstep */ - private List msgList; + private List messageList = Lists.newArrayList(); @Override - public void initialize(I vertexId, V vertexValue, - Map edges, - Iterable messages) { - if (vertexId != null) { - setVertexId(vertexId); - } - if (vertexValue != null) { - setVertexValue(vertexValue); - } - if (edges != null && !edges.isEmpty()) { - destEdgeIndexList = Lists.newArrayListWithCapacity(edges.size()); - destEdgeValueList = Lists.newArrayListWithCapacity(edges.size()); - List sortedIndexList = new ArrayList(edges.keySet()); - Collections.sort(sortedIndexList, new VertexIdComparator()); - for (I index : sortedIndexList) { - destEdgeIndexList.add(index); - destEdgeValueList.add(edges.get(index)); + public void initialize(I id, V value, Map edges, Iterable messages) { + super.initialize(id, value); + if (edges != null) { + for (Map.Entry edge : edges.entrySet()) { + edgeList.add(new Edge(edge.getKey(), edge.getValue())); } - sortedIndexList.clear(); - } else { - destEdgeIndexList = Lists.newArrayListWithCapacity(0); - destEdgeValueList = Lists.newArrayListWithCapacity(0); } if (messages != null) { - msgList = Lists.newArrayListWithCapacity(Iterables.size(messages)); - Iterables.addAll(msgList, messages); - } else { - msgList = Lists.newArrayListWithCapacity(0); + Iterables.addAll(messageList, messages); } } @Override - public int hashCode() { - return vertexId.hashCode() * 37 + vertexValue.hashCode(); + public Iterable> getEdges() { + return edgeList; } @Override - public boolean equals(Object other) { - if (other instanceof EdgeListVertex) { - @SuppressWarnings("unchecked") - EdgeListVertex otherVertex = (EdgeListVertex) other; - if (!getVertexId().equals(otherVertex.getVertexId())) { + public final boolean addEdge(I targetVertexId, E value) { + for (Edge edge : getEdges()) { + if (edge.getTargetVertexId().equals(targetVertexId)) { + LOG.warn("addEdge: Vertex=" + getId() + + ": already added an edge value for target vertex id " + + targetVertexId); return false; } - if (!getVertexValue().equals(otherVertex.getVertexValue())) { - return false; - } - if (!ComparisonUtils.equal(getMessages(), - otherVertex.getMessages())) { - return false; - } - return ComparisonUtils.equal(getOutEdgesIterator(), - otherVertex.getOutEdgesIterator()); - } - return false; - } - - /** - * Comparator for the vertex id - */ - private class VertexIdComparator implements Comparator { - @SuppressWarnings("unchecked") - @Override - public int compare(I index1, I index2) { - return index1.compareTo(index2); } + edgeList.add(new Edge(targetVertexId, value)); + return true; } @Override - public final boolean addEdge(I targetVertexId, E edgeValue) { - int pos = Collections.binarySearch(destEdgeIndexList, - targetVertexId, - new VertexIdComparator()); - if (pos < 0) { - destEdgeIndexList.add(-1 * (pos + 1), targetVertexId); - destEdgeValueList.add(-1 * (pos + 1), edgeValue); - return true; - } else { - LOG.warn("addEdge: Vertex=" + vertexId + - ": already added an edge value for dest vertex id " + - targetVertexId); - return false; - } - } - - @Override - public long getSuperstep() { - return getGraphState().getSuperstep(); - } - - @Override - public final void setVertexId(I vertexId) { - this.vertexId = vertexId; - } - - @Override - public final I getVertexId() { - return vertexId; - } - - @Override - public final V getVertexValue() { - return vertexValue; - } - - @Override - public final void setVertexValue(V vertexValue) { - this.vertexValue = vertexValue; - } - - @Override - public E getEdgeValue(I targetVertexId) { - int pos = Collections.binarySearch(destEdgeIndexList, - targetVertexId, - new VertexIdComparator()); - if (pos < 0) { - return null; - } else { - return destEdgeValueList.get(pos); - } + public int getNumEdges() { + return edgeList.size(); } @Override - public boolean hasEdge(I targetVertexId) { - int pos = Collections.binarySearch(destEdgeIndexList, - targetVertexId, - new VertexIdComparator()); - if (pos < 0) { - return false; + public E removeEdge(I targetVertexId) { + for (Iterator> edges = edgeList.iterator(); edges.hasNext();) { + Edge edge = edges.next(); + if (edge.getTargetVertexId().equals(targetVertexId)) { + E edgeValue = edge.getValue(); + edges.remove(); + return edgeValue; + } } - return true; - } - - /** - * Get an iterator to the edges on this vertex. - * - * @return A sorted iterator, as defined by the sort-order - * of the vertex ids - */ - @Override - public Iterator getOutEdgesIterator() { - return destEdgeIndexList.iterator(); + return null; } @Override - public int getNumOutEdges() { - return destEdgeIndexList.size(); + void putMessages(Iterable messages) { + messageList.clear(); + Iterables.addAll(messageList, messages); } @Override - public E removeEdge(I targetVertexId) { - int pos = Collections.binarySearch(destEdgeIndexList, - targetVertexId, - new VertexIdComparator()); - if (pos < 0) { - return null; - } else { - destEdgeIndexList.remove(pos); - return destEdgeValueList.remove(pos); - } + public Iterable getMessages() { + return Iterables.unmodifiableIterable(messageList); } @Override - public final void sendMsgToAllEdges(M msg) { - if (msg == null) { - throw new IllegalArgumentException( - "sendMsgToAllEdges: Cannot send null message to all edges"); - } - for (I index : destEdgeIndexList) { - sendMsg(index, msg); - } + public int getNumMessages() { + return messageList.size(); } @Override public final void readFields(DataInput in) throws IOException { - vertexId = BspUtils.createVertexIndex(getConf()); + I vertexId = BspUtils.createVertexId(getConf()); vertexId.readFields(in); - boolean hasVertexValue = in.readBoolean(); - if (hasVertexValue) { - vertexValue = BspUtils.createVertexValue(getConf()); - vertexValue.readFields(in); - } - int edgeListCount = in.readInt(); - destEdgeIndexList = Lists.newArrayListWithCapacity(edgeListCount); - destEdgeValueList = Lists.newArrayListWithCapacity(edgeListCount); - for (int i = 0; i < edgeListCount; ++i) { - I destVertexId = BspUtils.createVertexIndex(getConf()); + V vertexValue = BspUtils.createVertexValue(getConf()); + vertexValue.readFields(in); + super.initialize(vertexId, vertexValue); + + int numEdges = in.readInt(); + edgeList = Lists.newArrayListWithCapacity(numEdges); + for (int i = 0; i < numEdges; ++i) { + I targetVertexId = BspUtils.createVertexId(getConf()); + targetVertexId.readFields(in); E edgeValue = BspUtils.createEdgeValue(getConf()); - destVertexId.readFields(in); edgeValue.readFields(in); - destEdgeIndexList.add(destVertexId); - destEdgeValueList.add(edgeValue); - } - int msgListSize = in.readInt(); - msgList = Lists.newArrayListWithCapacity(msgListSize); - for (int i = 0; i < msgListSize; ++i) { - M msg = BspUtils.createMessageValue(getConf()); - msg.readFields(in); - msgList.add(msg); + edgeList.add(new Edge(targetVertexId, edgeValue)); } - halt = in.readBoolean(); - } - @Override - public final void write(DataOutput out) throws IOException { - vertexId.write(out); - out.writeBoolean(vertexValue != null); - if (vertexValue != null) { - vertexValue.write(out); - } - out.writeInt(destEdgeIndexList.size()); - for (int i = 0; i < destEdgeIndexList.size(); ++i) { - destEdgeIndexList.get(i).write(out); - destEdgeValueList.get(i).write(out); + int numMessages = in.readInt(); + messageList = Lists.newArrayListWithCapacity(numMessages); + for (int i = 0; i < numMessages; ++i) { + M message = BspUtils.createMessageValue(getConf()); + message.readFields(in); + messageList.add(message); } - out.writeInt(msgList.size()); - for (M msg : msgList) { - msg.write(out); + + boolean halt = in.readBoolean(); + if (halt) { + voteToHalt(); + } else { + wakeUp(); } - out.writeBoolean(halt); } @Override - void putMessages(Iterable messages) { - msgList.clear(); - for (M message : messages) { - msgList.add(message); + public final void write(DataOutput out) throws IOException { + getId().write(out); + getValue().write(out); + + out.writeInt(edgeList.size()); + for (Edge edge : edgeList) { + edge.getTargetVertexId().write(out); + edge.getValue().write(out); } - } - @Override - public Iterable getMessages() { - return Iterables.unmodifiableIterable(msgList); - } + out.writeInt(messageList.size()); + for (M message : messageList) { + message.write(out); + } - @Override - public int getNumMessages() { - return msgList.size(); + out.writeBoolean(isHalted()); } @Override void releaseResources() { // Hint to GC to free the messages - msgList.clear(); - } - - @Override - public String toString() { - return "Vertex(id=" + getVertexId() + ",value=" + getVertexValue() + - ",#edges=" + getNumOutEdges() + ")"; + messageList.clear(); } } Modified: giraph/trunk/src/main/java/org/apache/giraph/graph/GiraphJob.java URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/graph/GiraphJob.java?rev=1365352&r1=1365351&r2=1365352&view=diff ============================================================================== --- giraph/trunk/src/main/java/org/apache/giraph/graph/GiraphJob.java (original) +++ giraph/trunk/src/main/java/org/apache/giraph/graph/GiraphJob.java Tue Jul 24 23:37:42 2012 @@ -61,7 +61,7 @@ public class GiraphJob { "giraph.graphPartitionerFactoryClass"; /** Vertex index class */ - public static final String VERTEX_INDEX_CLASS = "giraph.vertexIndexClass"; + public static final String VERTEX_ID_CLASS = "giraph.vertexIdClass"; /** Vertex value class */ public static final String VERTEX_VALUE_CLASS = "giraph.vertexValueClass"; /** Edge value class */ @@ -451,7 +451,7 @@ public class GiraphJob { * @param vertexClass Runs vertex computation */ public final void setVertexClass(Class vertexClass) { - getConfiguration().setClass(VERTEX_CLASS, vertexClass, BasicVertex.class); + getConfiguration().setClass(VERTEX_CLASS, vertexClass, Vertex.class); } /**