Return-Path: X-Original-To: apmail-incubator-hama-commits-archive@minotaur.apache.org Delivered-To: apmail-incubator-hama-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 14514988C for ; Fri, 18 May 2012 15:35:58 +0000 (UTC) Received: (qmail 11272 invoked by uid 500); 18 May 2012 15:35:57 -0000 Delivered-To: apmail-incubator-hama-commits-archive@incubator.apache.org Received: (qmail 11206 invoked by uid 500); 18 May 2012 15:35:57 -0000 Mailing-List: contact hama-commits-help@incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: hama-dev@incubator.apache.org Delivered-To: mailing list hama-commits@incubator.apache.org Received: (qmail 11194 invoked by uid 99); 18 May 2012 15:35:57 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 18 May 2012 15:35:57 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 18 May 2012 15:35:53 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 92C58238897A; Fri, 18 May 2012 15:35:31 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1340131 [2/2] - in /incubator/hama/trunk: ./ core/src/main/java/org/apache/hama/bsp/ core/src/main/java/org/apache/hama/bsp/sync/ core/src/main/java/org/apache/hama/util/ core/src/test/java/org/apache/hama/bsp/ examples/src/main/java/org/a... Date: Fri, 18 May 2012 15:35:30 -0000 To: hama-commits@incubator.apache.org From: tjungblut@apache.org X-Mailer: svnmailer-1.0.8-patched Message-Id: <20120518153531.92C58238897A@eris.apache.org> Modified: incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/MinAggregator.java URL: http://svn.apache.org/viewvc/incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/MinAggregator.java?rev=1340131&r1=1340130&r2=1340131&view=diff ============================================================================== --- incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/MinAggregator.java (original) +++ incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/MinAggregator.java Fri May 18 15:35:28 2012 @@ -23,12 +23,14 @@ public class MinAggregator extends Abstr int min = Integer.MAX_VALUE; + @Override public void aggregate(IntWritable value) { if (value.get() < min) { min = value.get(); } } + @Override public IntWritable getValue() { return new IntWritable(min); } Modified: incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/Vertex.java URL: http://svn.apache.org/viewvc/incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/Vertex.java?rev=1340131&r1=1340130&r2=1340131&view=diff ============================================================================== --- incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/Vertex.java (original) +++ incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/Vertex.java Fri May 18 15:35:28 2012 @@ -23,24 +23,24 @@ import java.util.List; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.MapWritable; -import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; import org.apache.hama.bsp.BSPPeer; -public abstract class Vertex implements VertexInterface { +public abstract class Vertex + implements VertexInterface { - private M value; - private String vertexID; - protected GraphJobRunner runner; - protected BSPPeer peer; - public List edges; + private MSG_TYPE value; + private ID_TYPE vertexID; + protected GraphJobRunner runner; + protected BSPPeer, VertexArrayWritable, Writable, Writable, Writable> peer; + public List> edges; public Configuration getConf() { return peer.getConfiguration(); } @Override - public String getVertexID() { + public ID_TYPE getVertexID() { return vertexID; } @@ -49,17 +49,17 @@ public abstract class Vertex e, MSG_TYPE msg) + throws IOException { MapWritable message = new MapWritable(); - message.put(new Text(e.getName()), msg); - - peer.send(e.getDestVertexID(), message); + message.put(e.getDestinationVertexID(), msg); + peer.send(e.getDestinationPeerName(), message); } @Override - public void sendMessageToNeighbors(M msg) throws IOException { - final List outEdges = this.getOutEdges(); - for (Edge e : outEdges) { + public void sendMessageToNeighbors(MSG_TYPE msg) throws IOException { + final List> outEdges = this.getOutEdges(); + for (Edge e : outEdges) { sendMessage(e, msg); } } @@ -70,21 +70,21 @@ public abstract class Vertex getOutEdges() { + public List> getOutEdges() { return edges; } @Override - public M getValue() { + public MSG_TYPE getValue() { return value; } @Override - public void setValue(M value) { + public void setValue(MSG_TYPE value) { this.value = value; } - public void setVertexID(String vertexID) { + public void setVertexID(ID_TYPE vertexID) { this.vertexID = vertexID; } @@ -97,8 +97,8 @@ public abstract class Vertex { +/** + * The vertex interface. + * + * @param this type must be writable and should also implement equals + * and hashcode. + * @param the type used for messaging, usually the value of a vertex. + * @param the type used for storing edge values, usually the + * value of an edge. + */ +public interface VertexInterface { /** * Used to setup a vertex. @@ -32,22 +41,23 @@ public interface VertexInterface messages) throws IOException; + public void compute(Iterator messages) throws IOException; /** @return a list of outgoing edges of this vertex in the input graph. */ - public List getOutEdges(); + public List> getOutEdges(); /** Sends a message to another vertex. */ - public void sendMessage(Edge e, MSGTYPE msg) throws IOException; + public void sendMessage(Edge e, MSG_TYPE msg) + throws IOException; /** Sends a message to neighbors */ - public void sendMessageToNeighbors(MSGTYPE msg) throws IOException; + public void sendMessageToNeighbors(MSG_TYPE msg) throws IOException; /** @return the superstep number of the current superstep (starting from 0). */ public long getSuperstepCount(); @@ -57,13 +67,13 @@ public interface VertexInterface { +public class VertexWritable implements + WritableComparable>, Configurable { - public String name; - public int weight; + /** + * This field is static because it doesn't need to be an instance variable. It + * is written in upper case, because it is considered constant per launched + * process. + */ + public static Configuration CONFIGURATION; + + VERTEX_ID vertexId; + VERTEX_VALUE value; + Class idCls; + Class valCls; public VertexWritable() { super(); } - public VertexWritable(String name) { - super(); - this.name = name; - this.weight = 0; + @SuppressWarnings("unchecked") + public VertexWritable(VERTEX_ID name, Class idCls) { + this.vertexId = name; + this.value = (VERTEX_VALUE) new IntWritable(0); + this.idCls = idCls; + this.valCls = org.apache.hadoop.util.ReflectionUtils.getClass(value); } + @SuppressWarnings("unchecked") public VertexWritable(int weight, String name) { - super(); - this.name = name; - this.weight = weight; + this.vertexId = (VERTEX_ID) new Text(name); + this.value = (VERTEX_VALUE) new IntWritable(weight); + this.idCls = org.apache.hadoop.util.ReflectionUtils.getClass(vertexId); + this.valCls = org.apache.hadoop.util.ReflectionUtils.getClass(value); + } + + @SuppressWarnings("unchecked") + public VertexWritable(String name) { + this.vertexId = (VERTEX_ID) new Text(name); + this.value = (VERTEX_VALUE) NullWritable.get(); + this.idCls = org.apache.hadoop.util.ReflectionUtils.getClass(vertexId); + this.valCls = org.apache.hadoop.util.ReflectionUtils.getClass(value); + } + + public VertexWritable(VERTEX_VALUE weight, VERTEX_ID name, + Class idCls, Class valCls) { + this.vertexId = name; + this.value = weight; + this.idCls = idCls; + this.valCls = valCls; } - public String getName() { - return name; + public VERTEX_ID getVertexId() { + return vertexId; } - public int getWeight() { - return weight; + public VERTEX_VALUE getVertexValue() { + return value; } @Override public String toString() { - return getName(); + return getVertexId().toString(); } + @SuppressWarnings("unchecked") @Override public void readFields(DataInput in) throws IOException { - this.name = in.readUTF(); - this.weight = in.readInt(); + try { + idCls = (Class) CONFIGURATION.getClassByName(in.readUTF()); + valCls = (Class) CONFIGURATION.getClassByName(in.readUTF()); + } catch (ClassNotFoundException e) { + throw new IOException(e); + } + vertexId = (VERTEX_ID) ObjectWritable.readObject(in, CONFIGURATION); + value = (VERTEX_VALUE) ObjectWritable.readObject(in, CONFIGURATION); } @Override public void write(DataOutput out) throws IOException { - out.writeUTF(name); - out.writeInt(weight); + out.writeUTF(idCls.getName()); + out.writeUTF(valCls.getName()); + ObjectWritable.writeObject(out, vertexId, idCls, CONFIGURATION); + ObjectWritable.writeObject(out, value, valCls, CONFIGURATION); } @Override public int hashCode() { final int prime = 31; int result = 1; - result = prime * result + ((name == null) ? 0 : name.hashCode()); + result = prime * result + ((vertexId == null) ? 0 : vertexId.hashCode()); return result; } @@ -87,19 +130,32 @@ public class VertexWritable implements W return false; if (getClass() != obj.getClass()) return false; - VertexWritable other = (VertexWritable) obj; - if (name == null) { - if (other.name != null) + @SuppressWarnings("unchecked") + VertexWritable other = (VertexWritable) obj; + if (vertexId == null) { + if (other.vertexId != null) return false; - } else if (!name.equals(other.name)) + } else if (!vertexId.equals(other.vertexId)) return false; return true; } + @SuppressWarnings("unchecked") + @Override + public int compareTo(VertexWritable o) { + VertexWritable that = o; + return ((Comparable>) this.vertexId) + .compareTo((VertexWritable) that.vertexId); + } + + @Override + public void setConf(Configuration conf) { + VertexWritable.CONFIGURATION = conf; + } + @Override - public int compareTo(VertexWritable o) { - VertexWritable that = (VertexWritable) o; - return this.name.compareTo(that.name); + public Configuration getConf() { + return CONFIGURATION; } }