Return-Path: X-Original-To: apmail-incubator-giraph-commits-archive@minotaur.apache.org Delivered-To: apmail-incubator-giraph-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 5DA058716 for ; Sat, 10 Sep 2011 00:40:48 +0000 (UTC) Received: (qmail 99526 invoked by uid 500); 10 Sep 2011 00:40:47 -0000 Delivered-To: apmail-incubator-giraph-commits-archive@incubator.apache.org Received: (qmail 99504 invoked by uid 500); 10 Sep 2011 00:40:47 -0000 Mailing-List: contact giraph-commits-help@incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: giraph-dev@incubator.apache.org Delivered-To: mailing list giraph-commits@incubator.apache.org Received: (qmail 99497 invoked by uid 99); 10 Sep 2011 00:40:46 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 10 Sep 2011 00:40:46 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 10 Sep 2011 00:40:43 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 0FD6923888E7; Sat, 10 Sep 2011 00:40:23 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1167420 - in /incubator/giraph/trunk: ./ src/main/java/org/apache/giraph/bsp/ src/main/java/org/apache/giraph/comm/ src/main/java/org/apache/giraph/graph/ src/test/java/org/apache/giraph/ Date: Sat, 10 Sep 2011 00:40:22 -0000 To: giraph-commits@incubator.apache.org From: aching@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20110910004023.0FD6923888E7@eris.apache.org> Author: aching Date: Sat Sep 10 00:40:21 2011 New Revision: 1167420 URL: http://svn.apache.org/viewvc?rev=1167420&view=rev Log: GIRAPH-27: Mutable static global state in Vertex.java should be refactored. jake.mannix via aching. Added: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GraphState.java Modified: incubator/giraph/trunk/CHANGELOG incubator/giraph/trunk/src/main/java/org/apache/giraph/bsp/CentralizedService.java incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/BasicRPCCommunications.java incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BasicVertex.java incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BspService.java incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BspUtils.java incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GraphMapper.java incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/MutableVertex.java incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/Vertex.java incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/VertexRange.java incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/VertexResolver.java incubator/giraph/trunk/src/test/java/org/apache/giraph/TestBspBasic.java Modified: incubator/giraph/trunk/CHANGELOG URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/CHANGELOG?rev=1167420&r1=1167419&r2=1167420&view=diff ============================================================================== --- incubator/giraph/trunk/CHANGELOG (original) +++ incubator/giraph/trunk/CHANGELOG Sat Sep 10 00:40:21 2011 @@ -1,28 +1,31 @@ Giraph Change Log Release 0.70.0 - unreleased + + GIRAPH-27: Mutable static global state in Vertex.java should be + refactored. jake.mannix via aching. - GIRAPH-25 NPE in BspServiceMaster when failing a job (aching on behalf - of dvryaboy) + GIRAPH-25: NPE in BspServiceMaster when failing a job. dvryaboy via + aching. - GIRAPH-24 Job-level statistics reports one superstep greater than + GIRAPH-24: Job-level statistics reports one superstep greater than workers. (jghoman) - GIRAPH-18 Refactor BspServiceWorker::loadVertices(). (jghoman) + GIRAPH-18: Refactor BspServiceWorker::loadVertices(). (jghoman) - GIRAPH-14 Support for the Facebook Hadoop branch. (aching) + GIRAPH-14: Support for the Facebook Hadoop branch. (aching) - GIRAPH-16 Add Apache RAT to the verify build step. (omalley) + GIRAPH-16: Add Apache RAT to the verify build step. (omalley) - GIRAPH-17 Giraph doesn't give up properly after the maximum connect + GIRAPH-17: Giraph doesn't give up properly after the maximum connect attempts to ZooKeeper. (aching) - GIRAPH-2 Make the project homepage. (jghoman) + GIRAPH-2: Make the project homepage. (jghoman) - GIRAPH-9 Change Yahoo License Header to Apache License Header (hyunsik) + GIRAPH-9: Change Yahoo License Header to Apache License Header (hyunsik) - GIRAPH-6 Remove Yahoo-specific code from pom.xml. (jghoman) + GIRAPH-6: Remove Yahoo-specific code from pom.xml. (jghoman) - GIRAPH-5 Remove Yahoo directories after svn import from Yahoo! (aching) + GIRAPH-5: Remove Yahoo directories after svn import from Yahoo! (aching) - GIRAPH-3 Vertex:sentMsgToAllEdges should be sendMsg. (jghoman) \ No newline at end of file + GIRAPH-3: Vertex:sentMsgToAllEdges should be sendMsg. (jghoman) \ No newline at end of file Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/bsp/CentralizedService.java URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/bsp/CentralizedService.java?rev=1167420&r1=1167419&r2=1167420&view=diff ============================================================================== --- incubator/giraph/trunk/src/main/java/org/apache/giraph/bsp/CentralizedService.java (original) +++ incubator/giraph/trunk/src/main/java/org/apache/giraph/bsp/CentralizedService.java Sat Sep 10 00:40:21 2011 @@ -20,7 +20,7 @@ package org.apache.giraph.bsp; import java.io.IOException; -import org.apache.giraph.graph.BasicVertex; +import org.apache.giraph.graph.Vertex; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; @@ -44,7 +44,7 @@ public interface CentralizedService getRepresentativeVertex(); + Vertex getRepresentativeVertex(); /** * Get the current global superstep of the application to work on. Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/BasicRPCCommunications.java URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/BasicRPCCommunications.java?rev=1167420&r1=1167419&r2=1167420&view=diff ============================================================================== --- incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/BasicRPCCommunications.java (original) +++ incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/BasicRPCCommunications.java Sat Sep 10 00:40:21 2011 @@ -1017,10 +1017,11 @@ end[HADOOP_FACEBOOK]*/ // Resolve all graph mutations for (I vertexIndex : resolveVertexIndexSet) { VertexResolver vertexResolver = - BspUtils.createVertexResolver(conf); + BspUtils.createVertexResolver( + conf, service.getGraphMapper().getGraphState()); VertexRange vertexRange = service.getVertexRange(service.getSuperstep() - 1, vertexIndex); - BasicVertex originalVertex = + Vertex originalVertex = vertexRange.getVertexMap().get(vertexIndex); List msgList = inMessages.get(vertexIndex); if (originalVertex != null) { @@ -1043,7 +1044,8 @@ end[HADOOP_FACEBOOK]*/ if (vertex != null) { ((MutableVertex) vertex).setVertexId(vertexIndex); - vertexRange.getVertexMap().put(vertex.getVertexId(), vertex); + vertexRange.getVertexMap().put(vertex.getVertexId(), + (Vertex) vertex); } else if (originalVertex != null) { vertexRange.getVertexMap().remove(originalVertex.getVertexId()); } Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BasicVertex.java URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BasicVertex.java?rev=1167420&r1=1167419&r2=1167420&view=diff ============================================================================== --- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BasicVertex.java (original) +++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BasicVertex.java Sat Sep 10 00:40:21 2011 @@ -1,4 +1,4 @@ -/* + /* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -18,14 +18,15 @@ package org.apache.giraph.graph; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.mapreduce.Mapper; + import java.io.IOException; import java.util.Iterator; import java.util.List; import java.util.SortedMap; -import org.apache.hadoop.io.Writable; -import org.apache.hadoop.io.WritableComparable; - /** * Basic interface for writing a BSP application for computation. * @@ -35,11 +36,12 @@ import org.apache.hadoop.io.WritableComp * @param message data */ @SuppressWarnings("rawtypes") -public interface BasicVertex - extends AggregatorUsage { +public abstract class BasicVertex + implements AggregatorUsage { + /** Global graph state **/ + private GraphState graphState; + /** * Optionally defined by the user to be executed once on all workers * before application has started. @@ -47,26 +49,26 @@ public interface BasicVertex msgIterator) throws IOException; + public abstract void compute(Iterator msgIterator) throws IOException; /** * Retrieves the current superstep. * * @return Current superstep */ - long getSuperstep(); + public long getSuperstep() { + return getGraphState().getSuperstep(); + } /** * Get the vertex id */ - I getVertexId(); + public abstract I getVertexId(); /** * Get the vertex value (data stored with vertex) * * @return Vertex value */ - V getVertexValue(); + public abstract V getVertexValue(); /** * Set the vertex data (immediately visible in the computation) * * @param vertexValue Vertex data to be set */ - void setVertexValue(V vertexValue); + public abstract void setVertexValue(V vertexValue); /** * Get the total (all workers) number of vertices that @@ -109,7 +113,9 @@ public interface BasicVertex> getOutEdgeMap(); + public abstract SortedMap> getOutEdgeMap(); /** * Send a message to a vertex id. @@ -133,12 +141,19 @@ public interface BasicVertex getMsgList(); + public abstract List getMsgList(); + + /** + * Get the graph state for all workers. + * + * @return Graph state for all workers + */ + GraphState getGraphState() { + return graphState; + } + + /** + * Set the graph state for all workers + * + * @param graphState Graph state for all workers + */ + void setGraphState(GraphState graphState) { + this.graphState = graphState; + } + + /** + * Get the mapper context + * + * @return Mapper context + */ + public Mapper.Context getContext() { + return getGraphState().getContext(); + } + + @Override + public final Aggregator registerAggregator( + String name, + Class> aggregatorClass) + throws InstantiationException, IllegalAccessException { + return getGraphState().getGraphMapper().getAggregatorUsage(). + registerAggregator(name, aggregatorClass); + } + + @Override + public final Aggregator getAggregator(String name) { + return getGraphState().getGraphMapper().getAggregatorUsage(). + getAggregator(name); + } + + @Override + public final boolean useAggregator(String name) { + return getGraphState().getGraphMapper().getAggregatorUsage(). + useAggregator(name); + } } Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BspService.java URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BspService.java?rev=1167420&r1=1167419&r2=1167420&view=diff ============================================================================== --- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BspService.java (original) +++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BspService.java Sat Sep 10 00:40:21 2011 @@ -18,41 +18,39 @@ package org.apache.giraph.graph; -import java.io.IOException; -import java.security.InvalidParameterException; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; -import java.util.NavigableMap; -import java.util.TreeMap; -import java.net.InetAddress; -import java.net.UnknownHostException; - -import org.apache.log4j.Logger; +import org.apache.giraph.bsp.CentralizedService; +import org.apache.giraph.zk.BspEvent; +import org.apache.giraph.zk.PredicateLock; +import org.apache.giraph.zk.ZooKeeperExt; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapreduce.Mapper; - +import org.apache.log4j.Logger; +import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.Watcher.Event.EventType; import org.apache.zookeeper.Watcher.Event.KeeperState; -import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.ZooDefs.Ids; import org.json.JSONArray; import org.json.JSONException; import org.json.JSONObject; -import org.apache.giraph.bsp.CentralizedService; -import org.apache.giraph.zk.BspEvent; -import org.apache.giraph.zk.PredicateLock; -import org.apache.giraph.zk.ZooKeeperExt; +import java.io.IOException; +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.security.InvalidParameterException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.NavigableMap; +import java.util.TreeMap; /** * Zookeeper-based implementation of {@link CentralizedService}. @@ -631,7 +629,9 @@ public abstract class BspService < this.hostnamePartitionId = hostname + "_" + getTaskPartition(); this.representativeVertex = - BspUtils.createVertex(getConfiguration()); + BspUtils.createVertex( + getConfiguration(), + getGraphMapper().getGraphState()); this.checkpointFrequency = conf.getInt(GiraphJob.CHECKPOINT_FREQUENCY, @@ -670,7 +670,12 @@ public abstract class BspService < return jobId; } - final public BasicVertex getRepresentativeVertex() { + /** + * Get the representative vertex + * + * @return Representative vertex for this service. + */ + final public Vertex getRepresentativeVertex() { return representativeVertex; } Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java?rev=1167420&r1=1167419&r2=1167420&view=diff ============================================================================== --- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java (original) +++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java Sat Sep 10 00:40:21 2011 @@ -18,6 +18,28 @@ package org.apache.giraph.graph; +import net.iharder.Base64; +import org.apache.giraph.bsp.ApplicationState; +import org.apache.giraph.bsp.CentralizedServiceWorker; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hadoop.util.ReflectionUtils; +import org.apache.log4j.Logger; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.WatchedEvent; +import org.apache.zookeeper.Watcher.Event.EventType; +import org.apache.zookeeper.ZooDefs.Ids; +import org.apache.zookeeper.data.Stat; +import org.json.JSONArray; +import org.json.JSONException; +import org.json.JSONObject; + import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.DataInputStream; @@ -36,33 +58,6 @@ import java.util.SortedMap; import java.util.TreeMap; import java.util.TreeSet; -import net.iharder.Base64; - -import org.json.JSONArray; -import org.json.JSONException; -import org.json.JSONObject; - -import org.apache.log4j.Logger; -import org.apache.giraph.bsp.CentralizedServiceWorker; -import org.apache.hadoop.fs.FSDataOutputStream; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.io.Writable; -import org.apache.hadoop.io.WritableComparable; - -import org.apache.hadoop.mapreduce.InputSplit; -import org.apache.hadoop.mapreduce.Mapper; -import org.apache.hadoop.util.ReflectionUtils; - -import org.apache.zookeeper.KeeperException; -import org.apache.zookeeper.CreateMode; -import org.apache.zookeeper.WatchedEvent; -import org.apache.zookeeper.Watcher.Event.EventType; -import org.apache.zookeeper.ZooDefs.Ids; -import org.apache.zookeeper.data.Stat; - -import org.apache.giraph.bsp.ApplicationState; - /** * ZooKeeper-based implementation of {@link CentralizedServiceWorker}. */ @@ -387,16 +382,17 @@ public class BspServiceWorker< * @throws InterruptedException */ private List> readVerticesFromInputSplit( - InputSplit inputSplit) throws IOException, InterruptedException { - List> vertexList = new ArrayList>(); + InputSplit inputSplit) throws IOException, InterruptedException { + List> vertexList = + new ArrayList>(); VertexInputFormat vertexInputFormat = BspUtils.createVertexInputFormat(getConfiguration()); VertexReader vertexReader = vertexInputFormat.createVertexReader(inputSplit, getContext()); vertexReader.initialize(inputSplit, getContext()); Vertex readerVertex = - BspUtils.createVertex(getConfiguration()); - + BspUtils.createVertex( + getConfiguration(), getGraphMapper().getGraphState()); while (vertexReader.next(readerVertex)) { if (readerVertex.getVertexId() == null) { throw new IllegalArgumentException( @@ -422,7 +418,8 @@ public class BspServiceWorker< } } vertexList.add(readerVertex); - readerVertex = BspUtils.createVertex(getConfiguration()); + readerVertex = BspUtils.createVertex(getConfiguration(), + getGraphMapper().getGraphState()); getContext().progress(); } vertexReader.close(); @@ -510,8 +507,7 @@ public class BspServiceWorker< } VertexRange range = vertexRangeMap.get(currentVertexIndexMax); - SortedMap> vertexMap = - range.getVertexMap(); + SortedMap> vertexMap = range.getVertexMap(); if (vertexMap.put(vertex.getVertexId(), vertex) != null) { throw new IllegalStateException( "loadVertices: Already contains vertex " + @@ -1179,7 +1175,9 @@ public class BspServiceWorker< VertexRange vertexRange = getVertexRangeMap().get(maxIndex); for (int i = 0; i < vertexCount; ++i) { Vertex vertex = - BspUtils.createVertex(getConfiguration()); + BspUtils.createVertex( + getConfiguration(), + getGraphMapper().getGraphState()); vertex.readFields(dataStream); // Add the vertex if (vertexRange.getVertexMap().put(vertex.getVertexId(), vertex) @@ -1364,7 +1362,7 @@ public class BspServiceWorker< continue; } - SortedMap> vertexMap = + SortedMap> vertexMap = getVertexRangeMap().get(entry.getKey()).getVertexMap(); if (vertexMap.size() != 0) { throw new RuntimeException( @@ -1378,7 +1376,7 @@ public class BspServiceWorker< entry.getValue().size() + " vertices for max index " + entry.getKey()); } - for (BasicVertex vertex : entry.getValue()) { + for (Vertex vertex : entry.getValue()) { if (vertexMap.put(vertex.getVertexId(), vertex) != null) { throw new IllegalStateException( "exchangeVertexRanges: Vertex " + vertex + Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BspUtils.java URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BspUtils.java?rev=1167420&r1=1167419&r2=1167420&view=diff ============================================================================== --- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BspUtils.java (original) +++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BspUtils.java Sat Sep 10 00:40:21 2011 @@ -152,7 +152,8 @@ public class BspUtils { */ @SuppressWarnings("rawtypes") public static VertexRangeBalancer + E extends Writable, M extends Writable> + VertexRangeBalancer createVertexRangeBalancer(Configuration conf) { Class> vertexRangeBalancerClass = getVertexRangeBalancerClass(conf); @@ -187,10 +188,14 @@ public class BspUtils { @SuppressWarnings("rawtypes") public static VertexResolver - createVertexResolver(Configuration conf) { + createVertexResolver(Configuration conf, + GraphState graphState) { Class> vertexResolverClass = getVertexResolverClass(conf); - return ReflectionUtils.newInstance(vertexResolverClass, conf); + VertexResolver resolver = + ReflectionUtils.newInstance(vertexResolverClass, conf); + resolver.setGraphState(graphState); + return resolver; } /** @@ -221,10 +226,14 @@ public class BspUtils { @SuppressWarnings("rawtypes") public static Vertex - createVertex(Configuration conf) { + createVertex(Configuration conf, + GraphState graphState) { Class> vertexClass = getVertexClass(conf); - return ReflectionUtils.newInstance(vertexClass, conf); + Vertex vertex = + ReflectionUtils.newInstance(vertexClass, conf); + vertex.setGraphState(graphState); + return vertex; } /** Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GraphMapper.java URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GraphMapper.java?rev=1167420&r1=1167419&r2=1167420&view=diff ============================================================================== --- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GraphMapper.java (original) +++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GraphMapper.java Sat Sep 10 00:40:21 2011 @@ -18,15 +18,6 @@ package org.apache.giraph.graph; -import java.io.IOException; -import java.lang.reflect.Type; -import java.net.URL; -import java.net.URLDecoder; -import java.util.Enumeration; -import java.util.Iterator; -import java.util.List; -import java.util.Map; - import org.apache.giraph.bsp.CentralizedServiceWorker; import org.apache.giraph.comm.RPCCommunications; import org.apache.giraph.comm.ServerInterface; @@ -39,6 +30,15 @@ import org.apache.hadoop.io.WritableComp import org.apache.hadoop.mapreduce.Mapper; import org.apache.log4j.Logger; +import java.io.IOException; +import java.lang.reflect.Type; +import java.net.URL; +import java.net.URLDecoder; +import java.util.Enumeration; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + /** * This mapper that will execute the BSP graph tasks. Since this mapper will * not be passing data by key-value pairs through the MR framework, the @@ -66,6 +66,11 @@ public class GraphMapper graphState = new GraphState(); /** What kinds of functions to run on this mapper */ public enum MapFunctions { @@ -103,6 +108,10 @@ public class GraphMapper getGraphState() { + return graphState; + } + /** * Default handler for uncaught exceptions. */ @@ -380,6 +389,7 @@ public class GraphMapper vertex : + for (Vertex vertex : entry.getValue().getVertexMap().values()) { + // Make sure every vertex has the current + // graphState before computing + vertex.setGraphState(graphState); if (vertex.isHalted() && !vertex.getMsgList().isEmpty()) { Vertex activatedVertex = Added: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GraphState.java URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GraphState.java?rev=1167420&view=auto ============================================================================== --- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GraphState.java (added) +++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GraphState.java Sat Sep 10 00:40:21 2011 @@ -0,0 +1,75 @@ +package org.apache.giraph.graph; + +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.mapreduce.Mapper; + +/* + * Global state of the graph. Should be treated as a singleton (but is kept + * as a regular bean to facilitate ease of unit testing) + * + * @param vertex id + * @param vertex data + * @param edge data + * @param message data + */ +@SuppressWarnings("rawtypes") +public class GraphState { + /** Graph-wide superstep */ + private long superstep = 0; + /** Graph-wide number of vertices */ + private long numVertices = -1; + /** Graph-wide number of edges */ + private long numEdges = -1; + /** Graph-wide map context */ + private Mapper.Context context = null; + /** Graph-wide BSP Mapper for this Vertex */ + private GraphMapper graphMapper = null; + + public long getSuperstep() { + return superstep; + } + + public GraphState setSuperstep(long superstep) { + this.superstep = superstep; + return this; + } + + public long getNumVertices() { + return numVertices; + } + + public GraphState setNumVertices(long numVertices) { + this.numVertices = numVertices; + return this; + } + + public long getNumEdges() { + return numEdges; + } + + public GraphState setNumEdges(long numEdges) { + this.numEdges = numEdges; + return this; + } + + public Mapper.Context getContext() { + return context; + } + + public GraphState setContext(Mapper.Context context) { + this.context = context; + return this; + } + + public GraphMapper getGraphMapper() { + return graphMapper; + } + + public GraphState setGraphMapper( + GraphMapper graphMapper) { + this.graphMapper = graphMapper; + return this; + } +} Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/MutableVertex.java URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/MutableVertex.java?rev=1167420&r1=1167419&r2=1167420&view=diff ============================================================================== --- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/MutableVertex.java (original) +++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/MutableVertex.java Sat Sep 10 00:40:21 2011 @@ -28,17 +28,15 @@ import org.apache.hadoop.io.WritableComp * or mutate the graph. */ @SuppressWarnings("rawtypes") -public interface MutableVertex - extends BasicVertex, Writable { +public abstract class MutableVertex + extends BasicVertex implements Writable { /** * Set the vertex id * * @param id Vertex id is set to this (instantiated by the user) */ - void setVertexId(I id); + public abstract void setVertexId(I id); /** * Add an edge for this vertex (happens immediately) @@ -46,7 +44,7 @@ public interface MutableVertex edge); + public abstract boolean addEdge(Edge edge); /** * Create a vertex for use in addVertexRequest(). Still need to get the @@ -54,7 +52,12 @@ public interface MutableVertex instantiateVertex(); + public MutableVertex instantiateVertex() { + Vertex mutableVertex = + BspUtils.createVertex(getContext().getConfiguration(), + getGraphState()); + return mutableVertex; + } /** * Sends a request to create a vertex that will be available during the @@ -62,7 +65,11 @@ public interface MutableVertex vertex) throws IOException; + public void addVertexRequest(MutableVertex vertex) + throws IOException { + getGraphState().getGraphMapper().getWorkerCommunications(). + addVertexReq(vertex); +} /** * Request to remove a vertex from the graph @@ -70,7 +77,10 @@ public interface MutableVertex edge) throws IOException; + public void addEdgeRequest(I sourceVertexId, Edge edge) + throws IOException { + getGraphState().getGraphMapper().getWorkerCommunications(). + addEdgeReq(sourceVertexId, edge); + } /** * Request to remove an edge of a vertex from the graph @@ -88,5 +102,9 @@ public interface MutableVertex Message value */ @SuppressWarnings("rawtypes") -public abstract class Vertex< - I extends WritableComparable, - V extends Writable, - E extends Writable, - M extends Writable> - implements MutableVertex { +public abstract class Vertex + extends MutableVertex { /** Class logger */ private static final Logger LOG = Logger.getLogger(Vertex.class); - /** Class-wide superstep */ - private static long superstep = 0; - /** Class-wide number of vertices */ - private static long numVertices = -1; - /** Class-wide number of edges */ - private static long numEdges = -1; - /** Class-wide map context */ - private static Mapper.Context context = null; - /** Class-wide BSP Mapper for this Vertex */ - private static GraphMapper graphMapper = null; /** Vertex id */ private I vertexId = null; /** Vertex value */ @@ -119,32 +103,6 @@ public abstract class Vertex< return vertexId; } - /** - * Set the GraphMapper for this vertex (internal use). - * - * @param graphMapper Mapper to use for communication - */ - final static void - setGraphMapper(GraphMapper graphMapper) { - Vertex.graphMapper = graphMapper; - } - - /** - * Set the global superstep for all the vertices (internal use) - * - * @param superstep New superstep - */ - static void setSuperstep(long superstep) { - Vertex.superstep = superstep; - } - - @Override - public final long getSuperstep() { - return superstep; - } - @Override public final V getVertexValue() { return vertexValue; @@ -155,50 +113,11 @@ public abstract class Vertex< this.vertexValue = vertexValue; } - /** - * Set the total number of vertices from the last superstep. - * - * @param numVertices Aggregate vertices in the last superstep - */ - static void setNumVertices(long numVertices) { - Vertex.numVertices = numVertices; - } - - @Override - public final long getNumVertices() { - return numVertices; - } - - /** - * Set the total number of edges from the last superstep. - * - * @param numEdges Aggregate edges in the last superstep - */ - static void setNumEdges(long numEdges) { - Vertex.numEdges = numEdges; - } - - @Override - public final long getNumEdges() { - return numEdges; - } - @Override public final SortedMap> getOutEdgeMap() { return destEdgeMap; } - @SuppressWarnings("unchecked") - @Override - public final void sendMsg(I id, M msg) { - if (msg == null) { - throw new IllegalArgumentException( - "sendMsg: Cannot send null message to " + id); - } - ((GraphMapper) graphMapper). - getWorkerCommunications().sendMessageReq(id, msg); - } - @Override public final void sendMsgToAllEdges(M msg) { if (msg == null) { @@ -211,45 +130,6 @@ public abstract class Vertex< } @Override - public MutableVertex instantiateVertex() { - Vertex mutableVertex = - BspUtils.createVertex(getContext().getConfiguration()); - return mutableVertex; - } - - @SuppressWarnings("unchecked") - @Override - public void addVertexRequest(MutableVertex vertex) - throws IOException { - ((GraphMapper) graphMapper). - getWorkerCommunications().addVertexReq(vertex); - } - - @SuppressWarnings("unchecked") - @Override - public void removeVertexRequest(I vertexId) throws IOException { - ((GraphMapper) graphMapper). - getWorkerCommunications().removeVertexReq(vertexId); - } - - @SuppressWarnings("unchecked") - @Override - public void addEdgeRequest(I vertexIndex, - Edge edge) throws IOException { - ((GraphMapper) graphMapper). - getWorkerCommunications().addEdgeReq(vertexIndex, edge); - } - - @SuppressWarnings("unchecked") - @Override - public void removeEdgeRequest(I sourceVertexId, - I destVertexId) throws IOException { - ((GraphMapper) graphMapper). - getWorkerCommunications().removeEdgeReq(sourceVertexId, - destVertexId); - } - - @Override public final void voteToHalt() { halt = true; } @@ -262,12 +142,12 @@ public abstract class Vertex< @Override final public void readFields(DataInput in) throws IOException { vertexId = - BspUtils.createVertexIndex(getContext().getConfiguration()); + BspUtils.createVertexIndex(getContext().getConfiguration()); vertexId.readFields(in); boolean hasVertexValue = in.readBoolean(); if (hasVertexValue) { vertexValue = - BspUtils.createVertexValue(getContext().getConfiguration()); + BspUtils.createVertexValue(getContext().getConfiguration()); vertexValue.readFields(in); } long edgeMapSize = in.readLong(); @@ -280,7 +160,7 @@ public abstract class Vertex< long msgListSize = in.readLong(); for (long i = 0; i < msgListSize; ++i) { M msg = - BspUtils.createMessageValue(getContext().getConfiguration()); + BspUtils.createMessageValue(getContext().getConfiguration()); msg.readFields(in); msgList.add(msg); } @@ -306,37 +186,10 @@ public abstract class Vertex< } @Override - public final Aggregator registerAggregator( - String name, - Class> aggregatorClass) - throws InstantiationException, IllegalAccessException { - return graphMapper.getAggregatorUsage().registerAggregator( - name, aggregatorClass); - } - - @Override - public final Aggregator getAggregator(String name) { - return graphMapper.getAggregatorUsage().getAggregator(name); - } - - @Override - public final boolean useAggregator(String name) { - return graphMapper.getAggregatorUsage().useAggregator(name); - } - - @Override public List getMsgList() { return msgList; } - public final Mapper.Context getContext() { - return context; - } - - final static void setContext(Mapper.Context context) { - Vertex.context = context; - } - @Override public String toString() { return "Vertex(id=" + getVertexId() + ",value=" + getVertexValue() + Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/VertexRange.java URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/VertexRange.java?rev=1167420&r1=1167419&r2=1167420&view=diff ============================================================================== --- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/VertexRange.java (original) +++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/VertexRange.java Sat Sep 10 00:40:21 2011 @@ -68,8 +68,8 @@ public class VertexRange> vertexMap = - new TreeMap>(); + private final SortedMap> vertexMap = + new TreeMap>(); /** Class logger */ private static final Logger LOG = Logger.getLogger(VertexRange.class); @@ -207,7 +207,7 @@ public class VertexRange> getVertexMap() { + public SortedMap> getVertexMap() { return vertexMap; } Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/VertexResolver.java URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/VertexResolver.java?rev=1167420&r1=1167419&r2=1167420&view=diff ============================================================================== --- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/VertexResolver.java (original) +++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/VertexResolver.java Sat Sep 10 00:40:21 2011 @@ -18,14 +18,14 @@ package org.apache.giraph.graph; -import java.util.List; - import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; import org.apache.log4j.Logger; +import java.util.List; + /** * Default implementation of how to resolve vertex creation/removal, messages * to nonexistent vertices, etc. @@ -41,6 +41,9 @@ public class VertexResolver, Configurable { /** Configuration */ private Configuration conf = null; + + private GraphState graphState; + /** Class logger */ private static final Logger LOG = Logger.getLogger(VertexResolver.class); @@ -107,7 +110,7 @@ public class VertexResolver instantiateVertex() { - return BspUtils.createVertex(getConf()); + return BspUtils.createVertex(getConf(), graphState); } @Override @@ -119,4 +122,8 @@ public class VertexResolver graphState) { + this.graphState = graphState; + } } Modified: incubator/giraph/trunk/src/test/java/org/apache/giraph/TestBspBasic.java URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/test/java/org/apache/giraph/TestBspBasic.java?rev=1167420&r1=1167419&r2=1167420&view=diff ============================================================================== --- incubator/giraph/trunk/src/test/java/org/apache/giraph/TestBspBasic.java (original) +++ incubator/giraph/trunk/src/test/java/org/apache/giraph/TestBspBasic.java Sat Sep 10 00:40:21 2011 @@ -18,35 +18,41 @@ package org.apache.giraph; -import java.io.ByteArrayOutputStream; -import java.io.DataOutputStream; -import java.io.IOException; -import java.lang.reflect.InvocationTargetException; - -import java.util.List; - -import org.apache.hadoop.io.Writable; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.mapreduce.InputSplit; -import org.apache.hadoop.mapreduce.JobContext; -import org.apache.hadoop.mapreduce.JobID; +import junit.framework.Test; +import junit.framework.TestSuite; import org.apache.giraph.examples.GeneratedVertexReader; import org.apache.giraph.examples.SimpleCombinerVertex; import org.apache.giraph.examples.SimpleFailVertex; import org.apache.giraph.examples.SimpleMsgVertex; import org.apache.giraph.examples.SimplePageRankVertex; -import org.apache.giraph.examples.SimpleShortestPathsVertex; import org.apache.giraph.examples.SimplePageRankVertex.SimplePageRankVertexInputFormat; +import org.apache.giraph.examples.SimpleShortestPathsVertex; import org.apache.giraph.examples.SimpleShortestPathsVertex.SimpleShortestPathsVertexOutputFormat; import org.apache.giraph.examples.SimpleSumCombiner; import org.apache.giraph.examples.SimpleSuperstepVertex; import org.apache.giraph.examples.SimpleSuperstepVertex.SimpleSuperstepVertexInputFormat; import org.apache.giraph.examples.SimpleSuperstepVertex.SimpleSuperstepVertexOutputFormat; +import org.apache.giraph.graph.BspUtils; import org.apache.giraph.graph.GiraphJob; -import junit.framework.Test; -import junit.framework.TestSuite; +import org.apache.giraph.graph.GraphState; +import org.apache.giraph.graph.Vertex; +import org.apache.giraph.graph.VertexInputFormat; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.FloatWritable; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.JobID; + +import java.io.ByteArrayOutputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.lang.reflect.InvocationTargetException; +import java.util.List; /** * Unit test for many simple BSP applications. @@ -87,16 +93,22 @@ public class TestBspBasic extends BspCas InvocationTargetException, SecurityException, NoSuchMethodException { System.out.println("testInstantiateVertex: java.class.path=" + System.getProperty("java.class.path")); - java.lang.reflect.Constructor ctor = - SimpleSuperstepVertex.class.getConstructor(); - assertNotNull(ctor); - SimpleSuperstepVertex test = - (SimpleSuperstepVertex) ctor.newInstance(); + GiraphJob job = new GiraphJob(getCallingMethodName()); + job.setVertexClass(SimpleSuperstepVertex.class); + job.setVertexInputFormatClass( + SimpleSuperstepVertex.SimpleSuperstepVertexInputFormat.class); + GraphState gs = + new GraphState(); + Vertex vertex = + BspUtils. + createVertex(job.getConfiguration(), gs); System.out.println("testInstantiateVertex: superstep=" + - test.getSuperstep()); - SimpleSuperstepVertex.SimpleSuperstepVertexInputFormat inputFormat = - SimpleSuperstepVertex.SimpleSuperstepVertexInputFormat - .class.newInstance(); + vertex.getSuperstep()); + VertexInputFormat + inputFormat = + BspUtils. + createVertexInputFormat(job.getConfiguration()); List splitArray = inputFormat.getSplits( new JobContext(new Configuration(), new JobID()), 1);