giraph-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ach...@apache.org
Subject svn commit: r1399090 [2/3] - in /giraph/trunk: ./ giraph-formats-contrib/src/main/java/org/apache/giraph/io/hbase/ giraph/src/main/java/org/apache/giraph/ giraph/src/main/java/org/apache/giraph/bsp/ giraph/src/main/java/org/apache/giraph/comm/ giraph/s...
Date Wed, 17 Oct 2012 04:33:18 GMT
Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyWorkerServer.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyWorkerServer.java?rev=1399090&r1=1399089&r2=1399090&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyWorkerServer.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyWorkerServer.java Wed Oct 17 04:33:16 2012
@@ -32,6 +32,7 @@ import org.apache.giraph.comm.messages.M
 import org.apache.giraph.comm.messages.MessageStoreFactory;
 import org.apache.giraph.comm.messages.SequentialFileMessageStore;
 import org.apache.giraph.comm.messages.SimpleMessageStore;
+import org.apache.giraph.graph.GraphState;
 import org.apache.giraph.graph.Vertex;
 import org.apache.giraph.graph.VertexMutations;
 import org.apache.giraph.graph.VertexResolver;
@@ -115,7 +116,7 @@ public class NettyWorkerServer<I extends
   }
 
   @Override
-  public void prepareSuperstep() {
+  public void prepareSuperstep(GraphState<I, V, E, M> graphState) {
     serverData.prepareSuperstep();
 
     Set<I> resolveVertexIndexSet = Sets.newHashSet();
@@ -146,8 +147,7 @@ public class NettyWorkerServer<I extends
     // Resolve all graph mutations
     for (I vertexIndex : resolveVertexIndexSet) {
       VertexResolver<I, V, E, M> vertexResolver =
-          conf.createVertexResolver(
-              service.getGraphMapper().getGraphState());
+          conf.createVertexResolver(graphState);
       Vertex<I, V, E, M> originalVertex =
           service.getVertex(vertexIndex);
 
@@ -164,7 +164,7 @@ public class NettyWorkerServer<I extends
           vertexIndex, originalVertex, mutations,
           serverData.getCurrentMessageStore().
               hasMessagesForVertex(vertexIndex));
-      service.getGraphMapper().getGraphState().getContext().progress();
+      graphState.getContext().progress();
 
       if (LOG.isDebugEnabled()) {
         LOG.debug("prepareSuperstep: Resolved vertex index " +

Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/handler/AddressRequestIdGenerator.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/handler/AddressRequestIdGenerator.java?rev=1399090&r1=1399089&r2=1399090&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/handler/AddressRequestIdGenerator.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/handler/AddressRequestIdGenerator.java Wed Oct 17 04:33:16 2012
@@ -20,36 +20,34 @@ package org.apache.giraph.comm.netty.han
 
 import com.google.common.collect.Maps;
 import java.net.InetSocketAddress;
-import java.util.Map;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicLong;
 
 /**
  * Generate different request ids based on the address of the well known
- * port on the workers
+ * port on the workers.  Thread-safe.
  */
 public class AddressRequestIdGenerator {
   /** Address request generator map */
-  private final Map<InetSocketAddress, Long> addressRequestGeneratorMap =
-      Maps.newHashMap();
+  private final ConcurrentMap<InetSocketAddress, AtomicLong>
+  addressRequestGeneratorMap = Maps.newConcurrentMap();
 
   /**
-   * Get the next request id for a given destination.  Not thread-safe.
+   * Get the next request id for a given destination.  Thread-safe.
    *
    * @param address Address of the worker (consistent during a superstep)
    * @return Valid request id
    */
   public Long getNextRequestId(InetSocketAddress address) {
-    Long requestGenerator = addressRequestGeneratorMap.get(address);
+    AtomicLong requestGenerator = addressRequestGeneratorMap.get(address);
     if (requestGenerator == null) {
-      requestGenerator = Long.valueOf(0);
-      if (addressRequestGeneratorMap.put(address, requestGenerator) != null) {
-        throw new IllegalStateException("getNextRequestId: Illegal put for " +
-            "address " + address);
+      requestGenerator = new AtomicLong(0);
+      AtomicLong oldRequestGenerator =
+          addressRequestGeneratorMap.putIfAbsent(address, requestGenerator);
+      if (oldRequestGenerator != null) {
+        requestGenerator = oldRequestGenerator;
       }
-      return requestGenerator;
     }
-
-    requestGenerator = requestGenerator + 1;
-    addressRequestGeneratorMap.put(address, requestGenerator);
-    return requestGenerator;
+    return requestGenerator.getAndIncrement();
   }
 }

Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/examples/SimpleSuperstepVertex.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/examples/SimpleSuperstepVertex.java?rev=1399090&r1=1399089&r2=1399090&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/examples/SimpleSuperstepVertex.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/examples/SimpleSuperstepVertex.java Wed Oct 17 04:33:16 2012
@@ -44,6 +44,20 @@ public class SimpleSuperstepVertex exten
     EdgeListVertex<LongWritable, IntWritable, FloatWritable, IntWritable> {
   @Override
   public void compute(Iterable<IntWritable> messages) {
+    // Some checks for additional testing
+    if (getTotalNumVertices() < 1) {
+      throw new IllegalStateException("compute: Illegal total vertices " +
+          getTotalNumVertices());
+    }
+    if (getTotalNumEdges() < 0) {
+      throw new IllegalStateException("compute: Illegal total edges " +
+          getTotalNumEdges());
+    }
+    if (isHalted()) {
+      throw new IllegalStateException("compute: Impossible to be halted - " +
+          isHalted());
+    }
+
     if (getSuperstep() > 3) {
       voteToHalt();
     }

Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/AggregatorWrapper.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/AggregatorWrapper.java?rev=1399090&r1=1399089&r2=1399090&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/AggregatorWrapper.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/AggregatorWrapper.java Wed Oct 17 04:33:16 2012
@@ -94,7 +94,7 @@ public class AggregatorWrapper<A extends
    *
    * @param value Value to be aggregated
    */
-  public void aggregateCurrent(A value) {
+  public synchronized void aggregateCurrent(A value) {
     changed = true;
     currentAggregator.aggregate(value);
   }

Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BspServiceMaster.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BspServiceMaster.java?rev=1399090&r1=1399089&r2=1399090&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BspServiceMaster.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BspServiceMaster.java Wed Oct 17 04:33:16 2012
@@ -1382,26 +1382,23 @@ public class BspServiceMaster<I extends 
    * @param superstep superstep for which to run the master.compute()
    */
   private void runMasterCompute(long superstep) {
-    GraphState<I, V, E, M> graphState = getGraphMapper().getGraphState();
     // The master.compute() should run logically before the workers, so
     // increase the superstep counter it uses by one
-    graphState.setSuperstep(superstep + 1);
-    graphState.setTotalNumVertices(vertexCounter.getValue());
-    graphState.setTotalNumEdges(edgeCounter.getValue());
-    graphState.setContext(getContext());
-    graphState.setGraphMapper(getGraphMapper());
+    GraphState<I, V, E, M> graphState =
+        new GraphState<I, V, E, M>(superstep + 1, vertexCounter.getValue(),
+            edgeCounter.getValue(), getContext(), getGraphMapper(), null);
     masterCompute.setGraphState(graphState);
     if (superstep == INPUT_SUPERSTEP) {
       try {
         masterCompute.initialize();
       } catch (InstantiationException e) {
-        LOG.fatal("map: MasterCompute.initialize failed in instantiation", e);
+        LOG.fatal("runMasterCompute: Failed in instantiation", e);
         throw new RuntimeException(
-            "map: MasterCompute.initialize failed in instantiation", e);
+            "runMasterCompute: Failed in instantiation", e);
       } catch (IllegalAccessException e) {
-        LOG.fatal("map: MasterCompute.initialize failed in access", e);
+        LOG.fatal("runMasterCompute: Failed in access", e);
         throw new RuntimeException(
-            "map: MasterCompute.initialize failed in access", e);
+            "runMasterCompute: Failed in access", e);
       }
     }
     masterCompute.compute();

Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BspServiceWorker.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BspServiceWorker.java?rev=1399090&r1=1399089&r2=1399090&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BspServiceWorker.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BspServiceWorker.java Wed Oct 17 04:33:16 2012
@@ -18,30 +18,40 @@
 
 package org.apache.giraph.graph;
 
+import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 import org.apache.giraph.GiraphConfiguration;
 import org.apache.giraph.bsp.ApplicationState;
 import org.apache.giraph.bsp.CentralizedServiceWorker;
 import org.apache.giraph.comm.ServerData;
-import org.apache.giraph.comm.WorkerClientServer;
-import org.apache.giraph.comm.netty.NettyWorkerClientServer;
+import org.apache.giraph.comm.WorkerClient;
+import org.apache.giraph.comm.WorkerClientRequestProcessor;
+import org.apache.giraph.comm.WorkerServer;
+import org.apache.giraph.comm.netty.NettyWorkerClient;
+import org.apache.giraph.comm.netty.NettyWorkerClientRequestProcessor;
+import org.apache.giraph.comm.netty.NettyWorkerServer;
 import org.apache.giraph.graph.partition.Partition;
 import org.apache.giraph.graph.partition.PartitionExchange;
 import org.apache.giraph.graph.partition.PartitionOwner;
 import org.apache.giraph.graph.partition.PartitionStats;
 import org.apache.giraph.graph.partition.PartitionStore;
 import org.apache.giraph.graph.partition.WorkerGraphPartitioner;
+import org.apache.giraph.utils.LoggerUtils;
 import org.apache.giraph.utils.MemoryUtils;
+import org.apache.giraph.utils.ProgressableUtils;
 import org.apache.giraph.utils.WritableUtils;
 import org.apache.giraph.zk.BspEvent;
 import org.apache.giraph.zk.PredicateLock;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.log4j.Level;
 import org.apache.log4j.Logger;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
@@ -55,7 +65,6 @@ import org.json.JSONObject;
 
 import net.iharder.Base64;
 
-import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.DataInputStream;
 import java.io.DataOutput;
@@ -64,7 +73,6 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -92,20 +100,16 @@ public class BspServiceWorker<I extends 
   private final WorkerInfo workerInfo;
   /** Worker graph partitioner */
   private final WorkerGraphPartitioner<I, V, E, M> workerGraphPartitioner;
-  /** Input split vertex cache (only used when loading from input split) */
-  private final Map<PartitionOwner, Partition<I, V, E, M>>
-  inputSplitCache = new HashMap<PartitionOwner, Partition<I, V, E, M>>();
-  /** Communication service */
-  private final WorkerClientServer<I, V, E, M> commService;
+
+  /** IPC Client */
+  private final WorkerClient<I, V, E, M> workerClient;
+  /** IPC Server */
+  private final WorkerServer<I, V, E, M> workerServer;
   /** Master info */
   private WorkerInfo masterInfo = new WorkerInfo();
   /** Have the partition exchange children (workers) changed? */
   private final BspEvent partitionExchangeChildrenChanged;
-  /** Regulates the size of outgoing Collections of vertices read
-   * by the local worker during INPUT_SUPERSTEP that are to be
-   * transfered from <code>inputSplitCache</code> to the owner
-   * of their initial, master-assigned Partition.*/
-  private GiraphTransferRegulator transferRegulator;
+
   /** Worker Context */
   private final WorkerContext workerContext;
   /** Total vertices loaded */
@@ -130,7 +134,6 @@ public class BspServiceWorker<I extends 
    * @param sessionMsecTimeout Msecs to timeout connecting to ZooKeeper
    * @param context Mapper context
    * @param graphMapper Graph mapper
-   * @param graphState Global graph state
    * @throws IOException
    * @throws InterruptedException
    */
@@ -138,44 +141,39 @@ public class BspServiceWorker<I extends 
     String serverPortList,
     int sessionMsecTimeout,
     Mapper<?, ?, ?, ?>.Context context,
-    GraphMapper<I, V, E, M> graphMapper,
-    GraphState<I, V, E, M> graphState)
+    GraphMapper<I, V, E, M> graphMapper)
     throws IOException, InterruptedException {
     super(serverPortList, sessionMsecTimeout, context, graphMapper);
     partitionExchangeChildrenChanged = new PredicateLock(context);
     registerBspEvent(partitionExchangeChildrenChanged);
-    transferRegulator =
-        new GiraphTransferRegulator(getConfiguration());
-    inputSplitMaxVertices =
-        getConfiguration().getLong(
-            GiraphConfiguration.INPUT_SPLIT_MAX_VERTICES,
-            GiraphConfiguration.INPUT_SPLIT_MAX_VERTICES_DEFAULT);
+    inputSplitMaxVertices = getConfiguration().getInputSplitMaxVertices();
     workerGraphPartitioner =
         getGraphPartitionerFactory().createWorkerGraphPartitioner();
-    commService =  new NettyWorkerClientServer<I, V, E, M>(
-        context, getConfiguration(), this);
+    workerServer = new NettyWorkerServer<I, V, E, M>(getConfiguration(),
+        this, context);
+    workerClient = new NettyWorkerClient<I, V, E, M>(context,
+        getConfiguration(), this);
 
-    if (LOG.isInfoEnabled()) {
-      LOG.info("BspServiceWorker: maxVerticesPerTransfer = " +
-          transferRegulator.getMaxVerticesPerTransfer());
-      LOG.info("BspServiceWorker: maxEdgesPerTransfer = " +
-          transferRegulator.getMaxEdgesPerTransfer());
-    }
 
-    workerInfo = new WorkerInfo(
-        getHostname(), getTaskPartition(), commService.getPort());
 
-    graphState.setWorkerCommunications(commService);
+    workerInfo = new WorkerInfo(
+        getHostname(), getTaskPartition(), workerServer.getPort());
     this.workerContext =
-        getConfiguration().createWorkerContext(graphMapper.getGraphState());
+        getConfiguration().createWorkerContext(null);
 
     aggregatorHandler = new WorkerAggregatorHandler();
   }
 
+  @Override
   public WorkerContext getWorkerContext() {
     return workerContext;
   }
 
+  @Override
+  public WorkerClient<I, V, E, M> getWorkerClient() {
+    return workerClient;
+  }
+
   /**
    * Intended to check the health of the node.  For instance, can it ssh,
    * dmesg, etc. For now, does nothing.
@@ -265,7 +263,6 @@ public class BspServiceWorker<I extends 
             " InputSplits are finished.");
       }
       if (reservedInputSplits == splitOrganizer.getPathListSize()) {
-        transferRegulator = null; // don't need this anymore
         return null;
       }
       getContext().progress();
@@ -286,6 +283,8 @@ public class BspServiceWorker<I extends 
    * ensure the input split cache is flushed prior to marking the last input
    * split complete.
    *
+   * Use one or more threads to do the loading.
+   *
    * @return Statistics of the vertices loaded
    * @throws IOException
    * @throws IllegalAccessException
@@ -297,217 +296,44 @@ public class BspServiceWorker<I extends 
   private VertexEdgeCount loadVertices() throws IOException,
     ClassNotFoundException, InterruptedException, InstantiationException,
     IllegalAccessException, KeeperException {
-    String inputSplitPath = null;
     VertexEdgeCount vertexEdgeCount = new VertexEdgeCount();
-    while ((inputSplitPath = reserveInputSplit()) != null) {
-      vertexEdgeCount = vertexEdgeCount.incrVertexEdgeCount(
-          loadVerticesFromInputSplit(inputSplitPath));
-    }
-
-    // Flush the remaining cached vertices
-    for (Entry<PartitionOwner, Partition<I, V, E, M>> entry :
-      inputSplitCache.entrySet()) {
-      if (!entry.getValue().getVertices().isEmpty()) {
-        getContext().progress();
-        commService.sendPartitionRequest(entry.getKey().getWorkerInfo(),
-            entry.getValue());
-      }
-    }
-    inputSplitCache.clear();
-    commService.flush();
 
-    return vertexEdgeCount;
-  }
-
-  /**
-   * Mark an input split path as completed by this worker.  This notifies
-   * the master and the other workers that this input split has not only
-   * been reserved, but also marked processed.
-   *
-   * @param inputSplitPath Path to the input split.
-   */
-  private void markInputSplitPathFinished(String inputSplitPath) {
-    String inputSplitFinishedPath =
-        inputSplitPath + INPUT_SPLIT_FINISHED_NODE;
-    try {
-      getZkExt().createExt(inputSplitFinishedPath,
-          null,
-          Ids.OPEN_ACL_UNSAFE,
-          CreateMode.PERSISTENT,
-          true);
-    } catch (KeeperException.NodeExistsException e) {
-      LOG.warn("loadVertices: " + inputSplitFinishedPath +
-          " already exists!");
-    } catch (KeeperException e) {
-      throw new IllegalStateException(
-          "loadVertices: KeeperException on " +
-              inputSplitFinishedPath, e);
-    } catch (InterruptedException e) {
-      throw new IllegalStateException(
-          "loadVertices: InterruptedException on " +
-              inputSplitFinishedPath, e);
-    }
-  }
-
-  /**
-   * Extract vertices from input split, saving them into a mini cache of
-   * partitions.  Periodically flush the cache of vertices when a limit is
-   * reached in readVerticeFromInputSplit.
-   * Mark the input split finished when done.
-   *
-   * @param inputSplitPath ZK location of input split
-   * @return Mapping of vertex indices and statistics, or null if no data read
-   * @throws IOException
-   * @throws ClassNotFoundException
-   * @throws InterruptedException
-   * @throws InstantiationException
-   * @throws IllegalAccessException
-   */
-  private VertexEdgeCount loadVerticesFromInputSplit(String inputSplitPath)
-    throws IOException, ClassNotFoundException, InterruptedException,
-    InstantiationException, IllegalAccessException {
-    InputSplit inputSplit = getInputSplitForVertices(inputSplitPath);
-    VertexEdgeCount vertexEdgeCount =
-        readVerticesFromInputSplit(inputSplit);
+    GraphState<I, V, E, M> graphState = new GraphState<I, V, E, M>(
+        INPUT_SUPERSTEP, 0, 0, getContext(), getGraphMapper(),
+        null);
+    int numThreads = getConfiguration().getNumInputSplitsThreads();
+    ExecutorService inputSplitsExecutor =
+        Executors.newFixedThreadPool(numThreads,
+            new ThreadFactoryBuilder().setNameFormat("load-%d").build());
+    List<Future<VertexEdgeCount>> threadsFutures =
+        Lists.newArrayListWithCapacity(numThreads);
     if (LOG.isInfoEnabled()) {
-      LOG.info("loadVerticesFromInputSplit: Finished loading " +
-          inputSplitPath + " " + vertexEdgeCount);
+      LOG.info("loadVertices: Using " + numThreads + " threads.");
     }
-    markInputSplitPathFinished(inputSplitPath);
-    return vertexEdgeCount;
-  }
-
-  /**
-   * Talk to ZooKeeper to convert the input split path to the actual
-   * InputSplit containing the vertices to read.
-   *
-   * @param inputSplitPath Location in ZK of input split
-   * @return instance of InputSplit containing vertices to read
-   * @throws IOException
-   * @throws ClassNotFoundException
-   */
-  private InputSplit getInputSplitForVertices(String inputSplitPath)
-    throws IOException, ClassNotFoundException {
-    byte[] splitList;
-    try {
-      splitList = getZkExt().getData(inputSplitPath, false, null);
-    } catch (KeeperException e) {
-      throw new IllegalStateException(
-          "loadVertices: KeeperException on " + inputSplitPath, e);
-    } catch (InterruptedException e) {
-      throw new IllegalStateException(
-          "loadVertices: IllegalStateException on " + inputSplitPath, e);
-    }
-    getContext().progress();
-
-    DataInputStream inputStream =
-        new DataInputStream(new ByteArrayInputStream(splitList));
-    Text.readString(inputStream); // location data unused here, skip
-    String inputSplitClass = Text.readString(inputStream);
-    InputSplit inputSplit = (InputSplit)
-        ReflectionUtils.newInstance(
-            getConfiguration().getClassByName(inputSplitClass),
-            getConfiguration());
-    ((Writable) inputSplit).readFields(inputStream);
-
-    if (LOG.isInfoEnabled()) {
-      LOG.info("getInputSplitForVertices: Reserved " + inputSplitPath +
-          " from ZooKeeper and got input split '" +
-          inputSplit.toString() + "'");
-    }
-    return inputSplit;
-  }
-
-  /**
-   * Read vertices from input split.  If testing, the user may request a
-   * maximum number of vertices to be read from an input split.
-   *
-   * @param inputSplit Input split to process with vertex reader
-   * @return List of vertices.
-   * @throws IOException
-   * @throws InterruptedException
-   */
-  private VertexEdgeCount readVerticesFromInputSplit(
-      InputSplit inputSplit) throws IOException, InterruptedException {
-    VertexInputFormat<I, V, E, M> vertexInputFormat =
-        getConfiguration().createVertexInputFormat();
-    VertexReader<I, V, E, M> vertexReader =
-        vertexInputFormat.createVertexReader(inputSplit, getContext());
-    vertexReader.initialize(inputSplit, getContext());
-    transferRegulator.clearCounters();
-    while (vertexReader.nextVertex()) {
-      Vertex<I, V, E, M> readerVertex =
-          vertexReader.getCurrentVertex();
-      if (readerVertex.getId() == null) {
-        throw new IllegalArgumentException(
-            "readVerticesFromInputSplit: Vertex reader returned a vertex " +
-                "without an id!  - " + readerVertex);
-      }
-      if (readerVertex.getValue() == null) {
-        readerVertex.setValue(getConfiguration().createVertexValue());
-      }
-      readerVertex.setConf(getConfiguration());
-      readerVertex.setGraphState(getGraphMapper().getGraphState());
-      PartitionOwner partitionOwner =
-          workerGraphPartitioner.getPartitionOwner(
-              readerVertex.getId());
-      Partition<I, V, E, M> partition =
-          inputSplitCache.get(partitionOwner);
-      if (partition == null) {
-        partition = new Partition<I, V, E, M>(
-            getConfiguration(),
-            partitionOwner.getPartitionId(),
-            getContext());
-        inputSplitCache.put(partitionOwner, partition);
-      }
-      Vertex<I, V, E, M> oldVertex =
-          partition.putVertex(readerVertex);
-      if (oldVertex != null) {
-        LOG.warn("readVertices: Replacing vertex " + oldVertex +
-            " with " + readerVertex);
-      }
-      getContext().progress(); // do this before potential data transfer
-      transferRegulator.incrementCounters(partitionOwner, readerVertex);
-      if (transferRegulator.transferThisPartition(partitionOwner)) {
-        commService.sendPartitionRequest(partitionOwner.getWorkerInfo(),
-            partition);
-        inputSplitCache.remove(partitionOwner);
-      }
-      ++totalVerticesLoaded;
-      totalEdgesLoaded += readerVertex.getNumEdges();
-
-      // Update status every 250k vertices
-      if ((totalVerticesLoaded % 250000) == 0) {
-        String status = "readVerticesFromInputSplit: Loaded " +
-            totalVerticesLoaded + " vertices and " +
-            totalEdgesLoaded + " edges " +
-            MemoryUtils.getRuntimeMemoryStats() + " " +
-            getGraphMapper().getMapFunctions().toString() +
-            " - Attempt=" + getApplicationAttempt() +
-            ", Superstep=" + getSuperstep();
-        if (LOG.isInfoEnabled()) {
-          LOG.info(status);
-        }
-        getContext().setStatus(status);
-      }
-
-      // For sampling, or to limit outlier input splits, the number of
-      // records per input split can be limited
-      if (inputSplitMaxVertices > 0 &&
-        transferRegulator.getTotalVertices() >=
-        inputSplitMaxVertices) {
-        if (LOG.isInfoEnabled()) {
-          LOG.info("readVerticesFromInputSplit: Leaving the input " +
-              "split early, reached maximum vertices " +
-              transferRegulator.getTotalVertices());
-        }
-        break;
-      }
+    for (int i = 0; i < numThreads; ++i) {
+      Callable<VertexEdgeCount> inputSplitsCallable =
+          new InputSplitsCallable<I, V, E, M>(
+              getContext(),
+              graphState,
+              getConfiguration(),
+              this,
+              inputSplitsPath,
+              getWorkerInfo(),
+              getZkExt());
+      threadsFutures.add(inputSplitsExecutor.submit(inputSplitsCallable));
+    }
+
+    // Wait until all the threads are done to wait on all requests
+    for (Future<VertexEdgeCount> threadFuture : threadsFutures) {
+      VertexEdgeCount threadVertexEdgeCount =
+          ProgressableUtils.getFutureResult(threadFuture, getContext());
+      vertexEdgeCount =
+          vertexEdgeCount.incrVertexEdgeCount(threadVertexEdgeCount);
     }
-    vertexReader.close();
 
-    return new VertexEdgeCount(transferRegulator.getTotalVertices(),
-      transferRegulator.getTotalEdges());
+    workerClient.waitAllRequests();
+    inputSplitsExecutor.shutdown();
+    return vertexEdgeCount;
   }
 
   @Override
@@ -516,7 +342,7 @@ public class BspServiceWorker<I extends 
   }
 
   @Override
-  public void setup() {
+  public FinishedSuperstepStats setup() {
     // Unless doing a restart, prepare for computation:
     // 1. Start superstep INPUT_SUPERSTEP (no computation)
     // 2. Wait until the INPUT_SPLIT_ALL_READY_PATH node has been created
@@ -525,7 +351,7 @@ public class BspServiceWorker<I extends 
     // 5. Wait for superstep INPUT_SUPERSTEP to complete.
     if (getRestartedSuperstep() != UNSET_SUPERSTEP) {
       setCachedSuperstep(getRestartedSuperstep());
-      return;
+      return new FinishedSuperstepStats(false, -1, -1);
     }
 
     JSONObject jobState = getJobState();
@@ -542,7 +368,7 @@ public class BspServiceWorker<I extends 
                 getApplicationAttempt());
           }
           setRestartedSuperstep(getSuperstep());
-          return;
+          return new FinishedSuperstepStats(false, -1, -1);
         }
       } catch (JSONException e) {
         throw new RuntimeException(
@@ -552,15 +378,18 @@ public class BspServiceWorker<I extends 
     }
 
     // Add the partitions for that this worker owns
+    GraphState<I, V, E, M> graphState =
+        new GraphState<I, V, E, M>(INPUT_SUPERSTEP, 0, 0,
+            getContext(), getGraphMapper(), null);
     Collection<? extends PartitionOwner> masterSetPartitionOwners =
-        startSuperstep();
+        startSuperstep(graphState);
     workerGraphPartitioner.updatePartitionOwners(
         getWorkerInfo(), masterSetPartitionOwners, getPartitionStore());
 
 /*if[HADOOP_NON_SECURE]
-    commService.setup();
+    workerClient.setup();
 else[HADOOP_NON_SECURE]*/
-    commService.setup(getConfiguration().authenticate());
+    workerClient.setup(getConfiguration().authenticate());
 /*end[HADOOP_NON_SECURE]*/
 
     // Ensure the InputSplits are ready for processing before processing
@@ -670,13 +499,14 @@ else[HADOOP_NON_SECURE]*/
           new PartitionStats(partition.getId(),
               partition.getVertices().size(),
               0,
-              partition.getEdgeCount());
+              partition.getEdgeCount(),
+              0);
       partitionStatsList.add(partitionStats);
     }
     workerGraphPartitioner.finalizePartitionStats(
         partitionStatsList, getPartitionStore());
 
-    finishSuperstep(partitionStatsList);
+    return finishSuperstep(graphState, partitionStatsList);
   }
 
   /**
@@ -759,7 +589,8 @@ else[HADOOP_NON_SECURE]*/
   }
 
   @Override
-  public Collection<? extends PartitionOwner> startSuperstep() {
+  public Collection<? extends PartitionOwner> startSuperstep(
+      GraphState<I, V, E, M> graphState) {
     // Algorithm:
     // 1. Communication service will combine message from previous
     //    superstep
@@ -767,7 +598,7 @@ else[HADOOP_NON_SECURE]*/
     // 3. Wait until the partition assignment is complete and get it
     // 4. Get the aggregator values from the previous superstep
     if (getSuperstep() != INPUT_SUPERSTEP) {
-      commService.prepareSuperstep();
+      workerServer.prepareSuperstep(graphState);
     }
 
     registerHealth(getSuperstep());
@@ -825,35 +656,35 @@ else[HADOOP_NON_SECURE]*/
   }
 
   @Override
-  public boolean finishSuperstep(List<PartitionStats> partitionStatsList) {
+  public FinishedSuperstepStats finishSuperstep(
+      GraphState<I, V, E, M> graphState,
+      List<PartitionStats> partitionStatsList) {
     // This barrier blocks until success (or the master signals it to
     // restart).
     //
     // Master will coordinate the barriers and aggregate "doneness" of all
     // the vertices.  Each worker will:
-    // 1. Flush the unsent messages
+    // 1. Ensure that the requests are complete
     // 2. Execute user postSuperstep() if necessary.
     // 3. Save aggregator values that are in use.
     // 4. Report the statistics (vertices, edges, messages, etc.)
     //    of this worker
     // 5. Let the master know it is finished.
     // 6. Wait for the master's global stats, and check if done
-
-    getContext().setStatus("Flushing started: " +
-        getGraphMapper().getMapFunctions().toString() +
-        " - Attempt=" + getApplicationAttempt() +
-        ", Superstep=" + getSuperstep());
+    if (LOG.isInfoEnabled()) {
+      LOG.info("finishSuperstep: Waiting on all requests, superstep " +
+          getSuperstep() + " " +
+          MemoryUtils.getRuntimeMemoryStats());
+    }
+    workerClient.waitAllRequests();
 
     long workerSentMessages = 0;
-    try {
-      commService.flush();
-      workerSentMessages = commService.resetMessageCount();
-    } catch (IOException e) {
-      throw new IllegalStateException(
-          "finishSuperstep: flush failed", e);
+    for (PartitionStats partitionStats : partitionStatsList) {
+      workerSentMessages += partitionStats.getMessagesSentCount();
     }
 
     if (getSuperstep() != INPUT_SUPERSTEP) {
+      getWorkerContext().setGraphState(graphState);
       getWorkerContext().postSuperstep();
       getContext().progress();
     }
@@ -903,11 +734,13 @@ else[HADOOP_NON_SECURE]*/
       throw new IllegalStateException("Creating " + finishedWorkerPath +
           " failed with InterruptedException", e);
     }
-    getContext().setStatus("finishSuperstep: (waiting for rest " +
-        "of workers) " +
-        getGraphMapper().getMapFunctions().toString() +
-        " - Attempt=" + getApplicationAttempt() +
-        ", Superstep=" + getSuperstep());
+
+    LoggerUtils.setStatusAndLog(getContext(), LOG, Level.INFO,
+        "finishSuperstep: (waiting for rest " +
+            "of workers) " +
+            getGraphMapper().getMapFunctions().toString() +
+            " - Attempt=" + getApplicationAttempt() +
+            ", Superstep=" + getSuperstep());
 
     String superstepFinishedNode =
         getSuperstepFinishedPath(getApplicationAttempt(), getSuperstep());
@@ -937,10 +770,10 @@ else[HADOOP_NON_SECURE]*/
         getGraphMapper().getMapFunctions().toString() +
         " - Attempt=" + getApplicationAttempt() +
         ", Superstep=" + getSuperstep());
-    getGraphMapper().getGraphState().
-        setTotalNumEdges(globalStats.getEdgeCount()).
-        setTotalNumVertices(globalStats.getVertexCount());
-    return globalStats.getHaltComputation();
+    return new FinishedSuperstepStats(
+        globalStats.getHaltComputation(),
+        globalStats.getVertexCount(),
+        globalStats.getEdgeCount());
   }
 
   /**
@@ -956,6 +789,8 @@ else[HADOOP_NON_SECURE]*/
       return;
     }
 
+    LoggerUtils.setStatusAndLog(getContext(), LOG, Level.INFO,
+        "saveVertices: Starting to save vertices");
     VertexOutputFormat<I, V, E> vertexOutputFormat =
         getConfiguration().createVertexOutputFormat();
     VertexWriter<I, V, E> vertexWriter =
@@ -970,11 +805,13 @@ else[HADOOP_NON_SECURE]*/
       getContext().progress();
     }
     vertexWriter.close(getContext());
+    LoggerUtils.setStatusAndLog(getContext(), LOG, Level.INFO,
+        "saveVertices: Done saving vertices");
   }
 
   @Override
   public void cleanup() throws IOException, InterruptedException {
-    commService.closeConnections();
+    workerClient.closeConnections();
     setCachedSuperstep(getSuperstep() - 1);
     saveVertices();
     // All worker processes should denote they are done by adding special
@@ -1018,15 +855,16 @@ else[HADOOP_NON_SECURE]*/
     // Preferably would shut down the service only after
     // all clients have disconnected (or the exceptions on the
     // client side ignored).
-    commService.close();
+    workerServer.close();
   }
 
   @Override
   public void storeCheckpoint() throws IOException {
-    getContext().setStatus("storeCheckpoint: Starting checkpoint " +
-        getGraphMapper().getMapFunctions().toString() +
-        " - Attempt=" + getApplicationAttempt() +
-        ", Superstep=" + getSuperstep());
+    LoggerUtils.setStatusAndLog(getContext(), LOG, Level.INFO,
+        "storeCheckpoint: Starting checkpoint " +
+            getGraphMapper().getMapFunctions().toString() +
+            " - Attempt=" + getApplicationAttempt() +
+            ", Superstep=" + getSuperstep());
 
     // Algorithm:
     // For each partition, dump vertices and messages
@@ -1115,13 +953,14 @@ else[HADOOP_NON_SECURE]*/
       throw new IllegalStateException("Creating " + workerWroteCheckpoint +
           " failed with KeeperException", e);
     } catch (InterruptedException e) {
-      throw new IllegalStateException("Creating " + workerWroteCheckpoint +
+      throw new IllegalStateException("Creating " +
+          workerWroteCheckpoint +
           " failed with InterruptedException", e);
     }
   }
 
   @Override
-  public void loadCheckpoint(long superstep) {
+  public VertexEdgeCount loadCheckpoint(long superstep) {
     try {
       // clear old message stores
       getServerData().getIncomingMessageStore().clearAll();
@@ -1209,16 +1048,14 @@ else[HADOOP_NON_SECURE]*/
     }
 
     // Load global statistics
+    GlobalStats globalStats = null;
     String finalizedCheckpointPath =
         getCheckpointBasePath(superstep) + CHECKPOINT_FINALIZED_POSTFIX;
     try {
       DataInputStream finalizedStream =
           getFs().open(new Path(finalizedCheckpointPath));
-      GlobalStats globalStats = new GlobalStats();
+      globalStats = new GlobalStats();
       globalStats.readFields(finalizedStream);
-      getGraphMapper().getGraphState().
-          setTotalNumEdges(globalStats.getEdgeCount()).
-          setTotalNumVertices(globalStats.getVertexCount());
     } catch (IOException e) {
       throw new IllegalStateException(
           "loadCheckpoint: Failed to load global statistics", e);
@@ -1227,10 +1064,12 @@ else[HADOOP_NON_SECURE]*/
     // Communication service needs to setup the connections prior to
     // processing vertices
 /*if[HADOOP_NON_SECURE]
-    commService.setup();
+    workerClient.setup();
 else[HADOOP_NON_SECURE]*/
-    commService.setup(getConfiguration().authenticate());
+    workerClient.setup(getConfiguration().authenticate());
 /*end[HADOOP_NON_SECURE]*/
+    return new VertexEdgeCount(globalStats.getVertexCount(),
+        globalStats.getEdgeCount());
   }
 
   /**
@@ -1245,6 +1084,9 @@ else[HADOOP_NON_SECURE]*/
         new ArrayList<Entry<WorkerInfo, List<Integer>>>(
             workerPartitionMap.entrySet());
     Collections.shuffle(randomEntryList);
+    WorkerClientRequestProcessor<I, V, E, M> workerClientRequestProcessor =
+        new NettyWorkerClientRequestProcessor<I, V, E, M>(getContext(),
+            getConfiguration(), this);
     for (Entry<WorkerInfo, List<Integer>> workerPartitionList :
       randomEntryList) {
       for (Integer partitionId : workerPartitionList.getValue()) {
@@ -1261,14 +1103,16 @@ else[HADOOP_NON_SECURE]*/
               workerPartitionList.getKey() + " partition " +
               partitionId);
         }
-        getGraphMapper().getGraphState().getWorkerCommunications().
-            sendPartitionRequest(workerPartitionList.getKey(),
-                partition);
+        workerClientRequestProcessor.sendPartitionRequest(
+            workerPartitionList.getKey(),
+            partition);
       }
     }
 
+
     try {
-      getGraphMapper().getGraphState().getWorkerCommunications().flush();
+      workerClientRequestProcessor.flush();
+      workerClient.waitAllRequests();
     } catch (IOException e) {
       throw new IllegalStateException("sendWorkerPartitions: Flush failed", e);
     }
@@ -1307,7 +1151,7 @@ else[HADOOP_NON_SECURE]*/
     PartitionExchange partitionExchange =
         workerGraphPartitioner.updatePartitionOwners(
             getWorkerInfo(), masterSetPartitionOwners, getPartitionStore());
-    commService.fixPartitionIdToSocketAddrMap();
+    workerClient.openConnections(getPartitionOwners());
 
     Map<WorkerInfo, List<Integer>> sendWorkerPartitionMap =
         partitionExchange.getSendWorkerPartitionMap();
@@ -1457,7 +1301,7 @@ else[HADOOP_NON_SECURE]*/
 
   @Override
   public ServerData<I, V, E, M> getServerData() {
-    return commService.getServerData();
+    return workerServer.getServerData();
   }
 
   @Override

Added: giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/ComputeCallable.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/ComputeCallable.java?rev=1399090&view=auto
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/ComputeCallable.java (added)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/ComputeCallable.java Wed Oct 17 04:33:16 2012
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.graph;
+
+import com.google.common.collect.Lists;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Callable;
+import org.apache.giraph.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.bsp.CentralizedServiceWorker;
+import org.apache.giraph.comm.WorkerClientRequestProcessor;
+import org.apache.giraph.comm.messages.MessageStoreByPartition;
+import org.apache.giraph.comm.netty.NettyWorkerClientRequestProcessor;
+import org.apache.giraph.graph.partition.Partition;
+import org.apache.giraph.graph.partition.PartitionStats;
+import org.apache.giraph.utils.MemoryUtils;
+import org.apache.giraph.utils.SystemTime;
+import org.apache.giraph.utils.Time;
+import org.apache.giraph.utils.TimedLogger;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.log4j.Logger;
+
+/**
+ * Compute as many vertex partitions as possible.  Every thread will has its
+ * own instance of WorkerClientRequestProcessor to send requests.  Note that
+ * the partition ids are used in the partitionIdQueue rather than the actual
+ * partitions since that would cause the partitions to be loaded into memory
+ * when using the out-of-core graph partition store.  We should only load on
+ * demand.
+ *
+ * @param <I> Vertex index value
+ * @param <V> Vertex value
+ * @param <E> Edge value
+ * @param <M> Message data
+ */
+public class ComputeCallable<I extends WritableComparable, V extends Writable,
+    E extends Writable, M extends Writable> implements Callable {
+  /** Class logger */
+  private static final Logger LOG  = Logger.getLogger(ComputeCallable.class);
+  /** Class time object */
+  private static final Time TIME = SystemTime.getInstance();
+  /** Context */
+  private final Mapper<?, ?, ?, ?>.Context context;
+  /** Graph state (note that it is recreated in call() for locality) */
+  private GraphState<I, V, E, M> graphState;
+  /** Thread-safe queue of all partition ids */
+  private final BlockingQueue<Integer> partitionIdQueue;
+  /** Message store */
+  private final MessageStoreByPartition<I, M> messageStore;
+  /** Configuration */
+  private final ImmutableClassesGiraphConfiguration<I, V, E, M> configuration;
+  /** Worker (for NettyWorkerClientRequestProcessor) */
+  private final CentralizedServiceWorker<I, V, E, M> serviceWorker;
+  /** Dump some progress every 30 seconds */
+  private final TimedLogger timedLogger = new TimedLogger(30 * 1000, LOG);
+  /** Sends the messages (unique per Callable) */
+  private WorkerClientRequestProcessor<I, V, E, M>
+  workerClientRequestProcessor;
+  /** Get the start time in nanos */
+  private final long startNanos = TIME.getNanoseconds();
+
+  /**
+   * Constructor
+   *
+   * @param context Context
+   * @param graphState Current graph state (use to create own graph state)
+   * @param messageStore Message store
+   * @param partitionIdQueue Queue of partition ids (thread-safe)
+   * @param configuration Configuration
+   * @param serviceWorker Service worker
+   */
+  public ComputeCallable(
+      Mapper<?, ?, ?, ?>.Context context, GraphState<I, V, E, M> graphState,
+      MessageStoreByPartition<I, M> messageStore,
+      BlockingQueue<Integer> partitionIdQueue,
+      ImmutableClassesGiraphConfiguration<I, V, E, M> configuration,
+      CentralizedServiceWorker<I, V, E, M> serviceWorker) {
+    this.context = context;
+    this.configuration = configuration;
+    this.partitionIdQueue = partitionIdQueue;
+    this.messageStore = messageStore;
+    this.serviceWorker = serviceWorker;
+    // Will be replaced later in call() for locality
+    this.graphState = graphState;
+  }
+
+  @Override
+  public Collection<PartitionStats> call() {
+    // Thread initialization (for locality)
+    this.workerClientRequestProcessor =
+        new NettyWorkerClientRequestProcessor<I, V, E, M>(
+            context, configuration, serviceWorker);
+    this.graphState = new GraphState<I, V, E, M>(graphState.getSuperstep(),
+        graphState.getTotalNumVertices(), graphState.getTotalNumEdges(),
+        context, graphState.getGraphMapper(), workerClientRequestProcessor);
+
+    List<PartitionStats> partitionStatsList = Lists.newArrayList();
+    while (!partitionIdQueue.isEmpty()) {
+      Integer partitionId = partitionIdQueue.poll();
+      if (partitionId == null) {
+        break;
+      }
+
+      Partition<I, V, E, M> partition =
+          serviceWorker.getPartitionStore().getPartition(partitionId);
+      try {
+        PartitionStats partitionStats = computePartition(partition);
+        partitionStatsList.add(partitionStats);
+        partitionStats.addMessagesSentCount(
+            workerClientRequestProcessor.resetMessageCount());
+        timedLogger.info("call: Completed " +
+            partitionStatsList.size() + " partitions, " +
+            partitionIdQueue.size() + " remaining " +
+            MemoryUtils.getRuntimeMemoryStats());
+      } catch (IOException e) {
+        throw new IllegalStateException("call: Caught unexpected IOException," +
+            " failing.", e);
+      }
+    }
+
+    if (LOG.isInfoEnabled()) {
+      float seconds = TIME.getNanosecondsSince(startNanos) /
+          Time.NS_PER_SECOND_AS_FLOAT;
+      LOG.info("call: Computation took " + seconds + " secs for "  +
+          partitionStatsList.size() + " partitions on superstep " +
+          graphState.getSuperstep() + ".  Flushing started");
+    }
+    try {
+      workerClientRequestProcessor.flush();
+    } catch (IOException e) {
+      throw new IllegalStateException("call: Flushing failed.", e);
+    }
+    return partitionStatsList;
+  }
+
+  /**
+   * Compute a single partition
+   *
+   * @param partition Partition to compute
+   * @return Partition stats for this computed partition
+   */
+  private PartitionStats computePartition(Partition<I, V, E, M> partition)
+    throws IOException {
+    PartitionStats partitionStats =
+        new PartitionStats(partition.getId(), 0, 0, 0, 0);
+    // Make sure this is thread-safe across runs
+    synchronized (partition) {
+      for (Vertex<I, V, E, M> vertex : partition.getVertices()) {
+        // Make sure every vertex has this thread's
+        // graphState before computing
+        vertex.setGraphState(graphState);
+        Collection<M> messages =
+            messageStore.getVertexMessages(vertex.getId());
+        messageStore.clearVertexMessages(vertex.getId());
+        if (vertex.isHalted() && !messages.isEmpty()) {
+          vertex.wakeUp();
+        }
+        if (!vertex.isHalted()) {
+          context.progress();
+          vertex.compute(messages);
+        }
+        if (vertex.isHalted()) {
+          partitionStats.incrFinishedVertexCount();
+        }
+        partitionStats.incrVertexCount();
+        partitionStats.addEdgeCount(vertex.getNumEdges());
+      }
+
+      messageStore.clearPartition(partition.getId());
+    }
+    return partitionStats;
+  }
+}
+

Added: giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/FinishedSuperstepStats.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/FinishedSuperstepStats.java?rev=1399090&view=auto
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/FinishedSuperstepStats.java (added)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/FinishedSuperstepStats.java Wed Oct 17 04:33:16 2012
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.graph;
+
+/**
+ * Immutable results of finishSuperste()
+ */
+public class FinishedSuperstepStats extends VertexEdgeCount {
+  /** Are all the graph vertices halted? */
+  private final boolean allVerticesHalted;
+
+  /**
+   * Constructor.
+   *
+   * @param allVerticesHalted Are all the vertices halted
+   * @param numVertices Number of vertices
+   * @param numEdges Number of edges
+   */
+  public FinishedSuperstepStats(boolean allVerticesHalted,
+                                long numVertices,
+                                long numEdges) {
+    super(numVertices, numEdges);
+    this.allVerticesHalted = allVerticesHalted;
+  }
+
+  public boolean getAllVerticesHalted() {
+    return allVerticesHalted;
+  }
+}

Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/GraphMapper.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/GraphMapper.java?rev=1399090&r1=1399089&r2=1399090&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/GraphMapper.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/GraphMapper.java Wed Oct 17 04:33:16 2012
@@ -18,17 +18,19 @@
 
 package org.apache.giraph.graph;
 
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
 import org.apache.giraph.GiraphConfiguration;
 import org.apache.giraph.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.bsp.CentralizedServiceMaster;
 import org.apache.giraph.bsp.CentralizedServiceWorker;
 import org.apache.giraph.comm.messages.MessageStoreByPartition;
-import org.apache.giraph.graph.partition.Partition;
 import org.apache.giraph.graph.partition.PartitionOwner;
 import org.apache.giraph.graph.partition.PartitionStats;
 import org.apache.giraph.utils.MemoryUtils;
+import org.apache.giraph.utils.ProgressableUtils;
 import org.apache.giraph.utils.ReflectionUtils;
-import org.apache.giraph.utils.TimedLogger;
 import org.apache.giraph.zk.ZooKeeperManager;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.filecache.DistributedCache;
@@ -36,9 +38,12 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.log4j.Appender;
 import org.apache.log4j.Level;
 import org.apache.log4j.Logger;
 
+import com.google.common.collect.Lists;
+
 import java.io.IOException;
 import java.lang.reflect.Type;
 import java.net.URL;
@@ -47,6 +52,10 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Enumeration;
 import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import org.apache.log4j.PatternLayout;
 
 /**
  * This mapper that will execute the BSP graph tasks.  Since this mapper will
@@ -84,11 +93,10 @@ public class GraphMapper<I extends Writa
   private boolean done = false;
   /** What kind of functions is this mapper doing? */
   private MapFunctions mapFunctions = MapFunctions.UNKNOWN;
-  /**
-   * Graph state for all vertices that is used for the duration of
-   * this mapper.
-   */
-  private GraphState<I, V, E, M> graphState = new GraphState<I, V, E, M>();
+  /** Total number of vertices in the graph (at this time) */
+  private long numVertices = -1;
+  /** Total number of edges in the graph (at this time) */
+  private long numEdges = -1;
 
   /** What kinds of functions to run on this mapper */
   public enum MapFunctions {
@@ -137,10 +145,6 @@ public class GraphMapper<I extends Writa
     return serviceWorker.getWorkerContext();
   }
 
-  public final GraphState<I, V, E, M> getGraphState() {
-    return graphState;
-  }
-
   /**
    * Default handler for uncaught exceptions.
    */
@@ -263,7 +267,6 @@ public class GraphMapper<I extends Writa
   public void setup(Context context)
     throws IOException, InterruptedException {
     context.setStatus("setup: Beginning mapper setup.");
-    graphState.setContext(context);
     // Setting the default handler for uncaught exceptions.
     Thread.setDefaultUncaughtExceptionHandler(
         new OverrideExceptionHandler());
@@ -282,6 +285,16 @@ public class GraphMapper<I extends Writa
     if (LOG.isInfoEnabled()) {
       LOG.info("setup: Set log level to " + logLevel);
     }
+    // Sets pattern layout for all appenders
+    if (conf.useLogThreadLayout()) {
+      PatternLayout layout =
+          new PatternLayout("%-7p %d [%t] %c %x - %m%n");
+      Enumeration<Appender> appenderEnum =
+          Logger.getRootLogger().getAllAppenders();
+      while (appenderEnum.hasMoreElements()) {
+        appenderEnum.nextElement().setLayout(layout);
+      }
+    }
 
     // Do some initial setup (possibly starting up a Zookeeper service)
     context.setStatus("setup: Initializing Zookeeper services.");
@@ -326,6 +339,11 @@ public class GraphMapper<I extends Writa
       zkManager.onlineZooKeeperServers();
       serverPortList = zkManager.getZooKeeperServerPortString();
     }
+    if (zkManager != null && zkManager.runsZooKeeper()) {
+      if (LOG.isInfoEnabled()) {
+        LOG.info("setup: Chosen to run ZooKeeper...");
+      }
+    }
     context.setStatus("setup: Connected to Zookeeper service " +
         serverPortList);
     this.mapFunctions = determineMapFunctions(conf, zkManager);
@@ -345,12 +363,10 @@ public class GraphMapper<I extends Writa
           LOG.info("setup: Starting up BspServiceMaster " +
               "(master thread)...");
         }
-        serviceMaster = new BspServiceMaster<I, V, E, M>(serverPortList,
-                sessionMsecTimeout,
-                context,
-                this);
+        serviceMaster = new BspServiceMaster<I, V, E, M>(
+            serverPortList, sessionMsecTimeout, context, this);
         masterThread = new MasterThread<I, V, E, M>(
-                (BspServiceMaster<I, V, E, M>) serviceMaster, context);
+            (BspServiceMaster<I, V, E, M>) serviceMaster, context);
         masterThread.start();
       }
       if ((mapFunctions == MapFunctions.WORKER_ONLY) ||
@@ -363,12 +379,10 @@ public class GraphMapper<I extends Writa
             serverPortList,
             sessionMsecTimeout,
             context,
-            this,
-            graphState);
+            this);
         if (LOG.isInfoEnabled()) {
           LOG.info("setup: Registering health of this worker...");
         }
-        serviceWorker.setup();
       }
     } catch (IOException e) {
       LOG.error("setup: Caught exception just before end of setup", e);
@@ -394,9 +408,6 @@ public class GraphMapper<I extends Writa
     if (done) {
       return;
     }
-    if ((serviceWorker != null) && (graphState.getTotalNumVertices() == 0)) {
-      return;
-    }
 
     if ((mapFunctions == MapFunctions.MASTER_ZOOKEEPER_ONLY) ||
         (mapFunctions == MapFunctions.MASTER_ONLY)) {
@@ -407,14 +418,23 @@ public class GraphMapper<I extends Writa
     }
 
     if (mapAlreadyRun) {
-      throw new RuntimeException("In BSP, map should have only been" +
+      throw new RuntimeException("map: In BSP, map should have only been" +
           " run exactly once, (already run)");
     }
     mapAlreadyRun = true;
 
-    graphState.setSuperstep(serviceWorker.getSuperstep()).
-      setContext(context).setGraphMapper(this);
+    FinishedSuperstepStats inputSuperstepStats =
+        serviceWorker.setup();
+    numVertices = inputSuperstepStats.getVertexCount();
+    numEdges = inputSuperstepStats.getEdgeCount();
+    if (inputSuperstepStats.getVertexCount() == 0) {
+      LOG.warn("map: No vertices in the graph, exiting.");
+      return;
+    }
 
+    serviceWorker.getWorkerContext().setGraphState(
+        new GraphState<I, V, E, M>(serviceWorker.getSuperstep(),
+            numVertices, numEdges, context, this, null));
     try {
       serviceWorker.getWorkerContext().preApplication();
     } catch (InstantiationException e) {
@@ -430,19 +450,18 @@ public class GraphMapper<I extends Writa
 
     List<PartitionStats> partitionStatsList =
         new ArrayList<PartitionStats>();
+
+    int numThreads = conf.getNumComputeThreads();
+    FinishedSuperstepStats finishedSuperstepStats = null;
     do {
-      long superstep = serviceWorker.getSuperstep();
+      final long superstep = serviceWorker.getSuperstep();
 
-      graphState.setSuperstep(superstep);
+      GraphState<I, V, E, M> graphState =
+          new GraphState<I, V, E, M>(superstep, numVertices, numEdges,
+              context, this, null);
 
       Collection<? extends PartitionOwner> masterAssignedPartitionOwners =
-          serviceWorker.startSuperstep();
-      if (zkManager != null && zkManager.runsZooKeeper()) {
-        if (LOG.isInfoEnabled()) {
-          LOG.info("map: Chosen to run ZooKeeper...");
-        }
-        context.setStatus("map: Running Zookeeper Server");
-      }
+          serviceWorker.startSuperstep(graphState);
 
       if (LOG.isDebugEnabled()) {
         LOG.debug("map: " + MemoryUtils.getRuntimeMemoryStats());
@@ -459,8 +478,12 @@ public class GraphMapper<I extends Writa
         if (LOG.isInfoEnabled()) {
           LOG.info("map: Loading from checkpoint " + superstep);
         }
-        serviceWorker.loadCheckpoint(
+        VertexEdgeCount vertexEdgeCount = serviceWorker.loadCheckpoint(
             serviceWorker.getRestartedSuperstep());
+        numVertices = vertexEdgeCount.getVertexCount();
+        numEdges = vertexEdgeCount.getEdgeCount();
+        graphState = new GraphState<I, V, E, M>(superstep, numVertices,
+            numEdges, context, this, null);
       } else if (serviceWorker.checkpointFrequencyMet(superstep)) {
         serviceWorker.storeCheckpoint();
       }
@@ -471,51 +494,61 @@ public class GraphMapper<I extends Writa
 
       MessageStoreByPartition<I, M> messageStore =
           serviceWorker.getServerData().getCurrentMessageStore();
-
       partitionStatsList.clear();
-      TimedLogger partitionLogger = new TimedLogger(15000, LOG);
-      int completedPartitions = 0;
-      for (Partition<I, V, E, M> partition :
-        serviceWorker.getPartitionStore().getPartitions()) {
-        PartitionStats partitionStats =
-            new PartitionStats(partition.getId(), 0, 0, 0);
-        for (Vertex<I, V, E, M> vertex : partition.getVertices()) {
-          // Make sure every vertex has the current
-          // graphState before computing
-          vertex.setGraphState(graphState);
-
-          Collection<M> messages =
-              messageStore.getVertexMessages(vertex.getId());
-          messageStore.clearVertexMessages(vertex.getId());
-
-          if (vertex.isHalted() && !messages.isEmpty()) {
-            vertex.wakeUp();
-          }
-          if (!vertex.isHalted()) {
-            context.progress();
-            vertex.compute(messages);
-          }
-          if (vertex.isHalted()) {
-            partitionStats.incrFinishedVertexCount();
-          }
-          partitionStats.incrVertexCount();
-          partitionStats.addEdgeCount(vertex.getNumEdges());
-        }
-
-        messageStore.clearPartition(partition.getId());
-
-        partitionStatsList.add(partitionStats);
-        ++completedPartitions;
-        partitionLogger.info("map: Completed " + completedPartitions + " of " +
-            serviceWorker.getPartitionStore().getNumPartitions() +
-            " partitions " + MemoryUtils.getRuntimeMemoryStats());
-      }
-    } while (!serviceWorker.finishSuperstep(partitionStatsList));
+      int numPartitions = serviceWorker.getPartitionStore().getNumPartitions();
+      if (LOG.isInfoEnabled()) {
+        LOG.info("map: " + numPartitions +
+            " partitions to process in superstep " + superstep + " with " +
+            numThreads + " thread(s)");
+      }
+
+      if (numPartitions > 0) {
+        List<Future<Collection<PartitionStats>>> partitionFutures =
+            Lists.newArrayListWithCapacity(numPartitions);
+        BlockingQueue<Integer> computePartitionIdQueue =
+            new ArrayBlockingQueue<Integer>(numPartitions);
+        for (Integer partitionId :
+            serviceWorker.getPartitionStore().getPartitionIds()) {
+          computePartitionIdQueue.add(partitionId);
+        }
+
+        ExecutorService partitionExecutor =
+            Executors.newFixedThreadPool(numThreads,
+                new ThreadFactoryBuilder().setNameFormat("compute-%d").build());
+        for (int i = 0; i < numThreads; ++i) {
+          ComputeCallable<I, V, E, M> computeCallable =
+              new ComputeCallable<I, V, E, M>(
+                  context,
+                  graphState,
+                  messageStore,
+                  computePartitionIdQueue,
+                  conf,
+                  serviceWorker);
+          partitionFutures.add(partitionExecutor.submit(computeCallable));
+        }
+
+        // Wait until all the threads are done to wait on all requests
+        for (Future<Collection<PartitionStats>> partitionFuture :
+            partitionFutures) {
+          Collection<PartitionStats> stats =
+              ProgressableUtils.getFutureResult(partitionFuture, context);
+          partitionStatsList.addAll(stats);
+        }
+        partitionExecutor.shutdown();
+      }
+
+      finishedSuperstepStats =
+          serviceWorker.finishSuperstep(graphState, partitionStatsList);
+      numVertices = finishedSuperstepStats.getVertexCount();
+      numEdges = finishedSuperstepStats.getEdgeCount();
+    } while (!finishedSuperstepStats.getAllVerticesHalted());
     if (LOG.isInfoEnabled()) {
-      LOG.info("map: BSP application done " +
-          "(global vertices marked done)");
+      LOG.info("map: BSP application done (global vertices marked done)");
     }
 
+    serviceWorker.getWorkerContext().setGraphState(
+        new GraphState<I, V, E, M>(serviceWorker.getSuperstep(),
+            numVertices, numEdges, context, this, null));
     serviceWorker.getWorkerContext().postApplication();
     context.progress();
   }

Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/GraphState.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/GraphState.java?rev=1399090&r1=1399089&r2=1399090&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/GraphState.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/GraphState.java Wed Oct 17 04:33:16 2012
@@ -17,14 +17,13 @@
  */
 package org.apache.giraph.graph;
 
-import org.apache.giraph.comm.WorkerClientServer;
+import org.apache.giraph.comm.WorkerClientRequestProcessor;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapreduce.Mapper;
 
 /**
- * Global state of the graph.  Should be treated as a singleton (but is kept
- * as a regular bean to facilitate ease of unit testing)
+ * Immutable global state of the graph.
  *
  * @param <I> Vertex id
  * @param <V> Vertex data
@@ -35,107 +34,73 @@ import org.apache.hadoop.mapreduce.Mappe
 public class GraphState<I extends WritableComparable, V extends Writable,
 E extends Writable, M extends Writable> {
   /** Graph-wide superstep */
-  private long superstep = 0;
+  private final long superstep;
   /** Graph-wide number of vertices */
-  private long numVertices = -1;
+  private final long numVertices;
   /** Graph-wide number of edges */
-  private long numEdges = -1;
+  private final long numEdges;
   /** Graph-wide map context */
-  private Mapper.Context context;
+  private final Mapper.Context context;
   /** Graph-wide BSP Mapper for this Vertex */
-  private GraphMapper<I, V, E, M> graphMapper;
-  /** Graph-wide worker communications */
-  private WorkerClientServer<I, V, E, M> workerCommunications;
+  private final GraphMapper<I, V, E, M> graphMapper;
+  /** Handles requests */
+  private final WorkerClientRequestProcessor<I, V, E, M>
+  workerClientRequestProcessor;
+
+  /**
+   * Constructor
+   *
+   * @param superstep Current superstep
+   * @param numVertices Current graph-wide vertices
+   * @param numEdges Current graph-wide edges
+   * @param context Context
+   * @param graphMapper Graph mapper
+   * @param workerClientRequestProcessor Handles all communication
+   */
+  public GraphState(
+      long superstep, long numVertices,
+      long numEdges, Mapper.Context context,
+      GraphMapper<I, V, E, M> graphMapper,
+      WorkerClientRequestProcessor<I, V, E, M> workerClientRequestProcessor) {
+    this.superstep = superstep;
+    this.numVertices = numVertices;
+    this.numEdges = numEdges;
+    this.context = context;
+    this.graphMapper = graphMapper;
+    this.workerClientRequestProcessor = workerClientRequestProcessor;
+  }
 
   public long getSuperstep() {
     return superstep;
   }
 
-  /**
-   * Set the current superstep.
-   *
-   * @param superstep Current superstep to use.
-   * @return Returns this object.
-   */
-  public GraphState<I, V, E, M> setSuperstep(long superstep) {
-    this.superstep = superstep;
-    return this;
-  }
-
   public long getTotalNumVertices() {
     return numVertices;
   }
 
-  /**
-   * Set the current number of vertices.
-   *
-   * @param numVertices Current number of vertices.
-   * @return Returns this object.
-   */
-  public GraphState<I, V, E, M> setTotalNumVertices(long numVertices) {
-    this.numVertices = numVertices;
-    return this;
-  }
-
   public long getTotalNumEdges() {
     return numEdges;
   }
 
-  /**
-   * Set the current number of edges.
-   *
-   * @param numEdges Current number of edges.
-   * @return Returns this object.
-   */
-  public GraphState<I, V, E, M> setTotalNumEdges(long numEdges) {
-    this.numEdges = numEdges;
-    return this;
-  }
-
   public Mapper.Context getContext() {
     return context;
   }
 
-  /**
-   * Set the current context.
-   *
-   * @param context Current context.
-   * @return Returns this object.
-   */
-  public GraphState<I, V, E, M> setContext(Mapper.Context context) {
-    this.context = context;
-    return this;
-  }
-
   public GraphMapper<I, V, E, M> getGraphMapper() {
     return graphMapper;
   }
 
-  /**
-   * Set the current graph mapper.
-   *
-   * @param graphMapper Current graph mapper.
-   * @return Returns this object.
-   */
-  public GraphState<I, V, E, M> setGraphMapper(
-      GraphMapper<I, V, E, M> graphMapper) {
-    this.graphMapper = graphMapper;
-    return this;
+  public WorkerClientRequestProcessor<I, V, E, M>
+  getWorkerClientRequestProcessor() {
+    return workerClientRequestProcessor;
   }
 
-  /**
-   * Set the current worker communications.
-   *
-   * @param workerCommunications Current worker communications.
-   * @return Returns this object.
-   */
-  public GraphState<I, V, E, M> setWorkerCommunications(
-      WorkerClientServer<I, V, E, M> workerCommunications) {
-    this.workerCommunications = workerCommunications;
-    return this;
-  }
+  @Override
+  public String toString() {
+    return "(superstep=" + superstep + ",numVertices=" + numVertices + "," +
+        "numEdges=" + numEdges + ",context=" + context +
+        ",graphMapper=" + graphMapper +
+        ",workerClientRequestProcessor=" + workerClientRequestProcessor + ")";
 
-  public WorkerClientServer<I, V, E, M> getWorkerCommunications() {
-    return workerCommunications;
   }
 }

Added: giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/InputSplitsCallable.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/InputSplitsCallable.java?rev=1399090&view=auto
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/InputSplitsCallable.java (added)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/InputSplitsCallable.java Wed Oct 17 04:33:16 2012
@@ -0,0 +1,439 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.graph;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.util.concurrent.Callable;
+import org.apache.giraph.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.comm.WorkerClientRequestProcessor;
+import org.apache.giraph.comm.netty.NettyWorkerClientRequestProcessor;
+import org.apache.giraph.graph.partition.PartitionOwner;
+import org.apache.giraph.utils.LoggerUtils;
+import org.apache.giraph.utils.MemoryUtils;
+import org.apache.giraph.utils.SystemTime;
+import org.apache.giraph.utils.Time;
+import org.apache.giraph.zk.ZooKeeperExt;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.data.Stat;
+
+/**
+ * Load as many input splits as possible.
+ * Every thread will has its own instance of WorkerClientRequestProcessor
+ * to send requests.
+ *
+ * @param <I> Vertex index value
+ * @param <V> Vertex value
+ * @param <E> Edge value
+ * @param <M> Message data
+ */
+public class InputSplitsCallable<I extends WritableComparable,
+    V extends Writable, E extends Writable, M extends Writable>
+    implements Callable {
+  /** Class logger */
+  private static final Logger LOG =
+      Logger.getLogger(InputSplitsCallable.class);
+  /** Class time object */
+  private static final Time TIME = SystemTime.getInstance();
+  /** Context */
+  private final Mapper<?, ?, ?, ?>.Context context;
+  /** Graph state */
+  private final GraphState<I, V, E, M> graphState;
+  /** Handles IPC communication */
+  private final WorkerClientRequestProcessor<I, V, E, M>
+  workerClientRequestProcessor;
+  /**
+   * Stores and processes the list of InputSplits advertised
+   * in a tree of child znodes by the master.
+   */
+  private final InputSplitPathOrganizer splitOrganizer;
+  /** Location of input splits */
+  private final String inputSplitsPath;
+  /** ZooKeeperExt handle */
+  private final ZooKeeperExt zooKeeperExt;
+  /** Configuration */
+  private final ImmutableClassesGiraphConfiguration<I, V, E, M>
+  configuration;
+  /** Total vertices loaded */
+  private long totalVerticesLoaded = 0;
+  /** Total edges loaded */
+  private long totalEdgesLoaded = 0;
+  /** Input split max vertices (-1 denotes all) */
+  private final long inputSplitMaxVertices;
+  /** Bsp service worker (only use thread-safe methods) */
+  private final BspServiceWorker<I, V, E, M> bspServiceWorker;
+  /** Get the start time in nanos */
+  private final long startNanos = TIME.getNanoseconds();
+
+  /**
+   * Constructor.
+   *
+   * @param context Context
+   * @param graphState Graph state
+   * @param configuration Configuration
+   * @param bspServiceWorker service worker
+   * @param inputSplitsPath Path of the input splits
+   * @param workerInfo This worker's info
+   * @param zooKeeperExt Handle to ZooKeeperExt
+   */
+  public InputSplitsCallable(
+      Mapper<?, ?, ?, ?>.Context context, GraphState<I, V, E, M> graphState,
+      ImmutableClassesGiraphConfiguration<I, V, E, M> configuration,
+      BspServiceWorker<I, V, E, M> bspServiceWorker,
+      String inputSplitsPath,
+      WorkerInfo workerInfo,
+      ZooKeeperExt zooKeeperExt)  {
+    this.zooKeeperExt = zooKeeperExt;
+    this.context = context;
+    this.workerClientRequestProcessor =
+        new NettyWorkerClientRequestProcessor<I, V, E, M>(
+            context, configuration, bspServiceWorker);
+    this.graphState = new GraphState<I, V, E, M>(graphState.getSuperstep(),
+        graphState.getTotalNumVertices(), graphState.getTotalNumEdges(),
+        context, graphState.getGraphMapper(), workerClientRequestProcessor);
+    this.inputSplitsPath = inputSplitsPath;
+    try {
+      splitOrganizer = new InputSplitPathOrganizer(zooKeeperExt,
+          inputSplitsPath, workerInfo.getHostname(), workerInfo.getPort());
+    } catch (KeeperException e) {
+      throw new IllegalStateException(
+          "InputSplitsCallable: KeeperException", e);
+    } catch (InterruptedException e) {
+      throw new IllegalStateException(
+          "InputSplitsCallable: InterruptedException", e);
+    }
+    this.configuration = configuration;
+    inputSplitMaxVertices = configuration.getInputSplitMaxVertices();
+    this.bspServiceWorker = bspServiceWorker;
+  }
+
+  @Override
+  public VertexEdgeCount call() {
+    VertexEdgeCount vertexEdgeCount = new VertexEdgeCount();
+    String inputSplitPath = null;
+    int inputSplitsProcessed = 0;
+    try {
+      while ((inputSplitPath = reserveInputSplit()) != null) {
+        vertexEdgeCount = vertexEdgeCount.incrVertexEdgeCount(
+            loadVerticesFromInputSplit(inputSplitPath,
+                graphState));
+        context.progress();
+        ++inputSplitsProcessed;
+      }
+    } catch (KeeperException e) {
+      throw new IllegalStateException("call: KeeperException", e);
+    } catch (InterruptedException e) {
+      throw new IllegalStateException("call: InterruptedException", e);
+    } catch (IOException e) {
+      throw new IllegalStateException("call: IOException", e);
+    } catch (ClassNotFoundException e) {
+      throw new IllegalStateException("call: ClassNotFoundException", e);
+    } catch (InstantiationException e) {
+      throw new IllegalStateException("call: InstantiationException", e);
+    } catch (IllegalAccessException e) {
+      throw new IllegalStateException("call: IllegalAccessException", e);
+    }
+
+    if (LOG.isInfoEnabled()) {
+      float seconds = TIME.getNanosecondsSince(startNanos) /
+          Time.NS_PER_SECOND_AS_FLOAT;
+      float verticesPerSecond = vertexEdgeCount.getVertexCount() / seconds;
+      float edgesPerSecond = vertexEdgeCount.getEdgeCount() / seconds;
+      LOG.info("call: Loaded " + inputSplitsProcessed + " " +
+          "input splits in " + seconds + " secs, " + vertexEdgeCount +
+          " " + verticesPerSecond + " vertices/sec, " +
+          edgesPerSecond + " edges/sec");
+    }
+    try {
+      workerClientRequestProcessor.flush();
+    } catch (IOException e) {
+      throw new IllegalStateException("call: Flushing failed.", e);
+    }
+    return vertexEdgeCount;
+  }
+
+  /**
+   * Try to reserve an InputSplit for loading.  While InputSplits exists that
+   * are not finished, wait until they are.
+   *
+   * NOTE: iterations on the InputSplit list only halt for each worker when it
+   * has scanned the entire list once and found every split marked RESERVED.
+   * When a worker fails, its Ephemeral RESERVED znodes will disappear,
+   * allowing other iterating workers to claim it's previously read splits.
+   * Only when the last worker left iterating on the list fails can a danger
+   * of data loss occur. Since worker failure in INPUT_SUPERSTEP currently
+   * causes job failure, this is OK. As the failure model evolves, this
+   * behavior might need to change.
+   *
+   * @return reserved InputSplit or null if no unfinished InputSplits exist
+   * @throws org.apache.zookeeper.KeeperException
+   * @throws InterruptedException
+   */
+  private String reserveInputSplit()
+    throws KeeperException, InterruptedException {
+    String reservedInputSplitPath = null;
+    Stat reservedStat = null;
+    while (true) {
+      int reservedInputSplits = 0;
+      for (String nextSplitToClaim : splitOrganizer) {
+        context.progress();
+        String tmpInputSplitReservedPath =
+            nextSplitToClaim + BspServiceWorker.INPUT_SPLIT_RESERVED_NODE;
+        reservedStat =
+            zooKeeperExt.exists(tmpInputSplitReservedPath, true);
+        if (reservedStat == null) {
+          try {
+            // Attempt to reserve this InputSplit
+            zooKeeperExt.createExt(tmpInputSplitReservedPath,
+                null,
+                ZooDefs.Ids.OPEN_ACL_UNSAFE,
+                CreateMode.EPHEMERAL,
+                false);
+            reservedInputSplitPath = nextSplitToClaim;
+            if (LOG.isInfoEnabled()) {
+              float percentFinished =
+                  reservedInputSplits * 100.0f /
+                      splitOrganizer.getPathListSize();
+              LOG.info("reserveInputSplit: Reserved input " +
+                  "split path " + reservedInputSplitPath +
+                  ", overall roughly " +
+                  + percentFinished +
+                  "% input splits reserved");
+            }
+            return reservedInputSplitPath;
+          } catch (KeeperException.NodeExistsException e) {
+            LOG.info("reserveInputSplit: Couldn't reserve " +
+                "(already reserved) inputSplit" +
+                " at " + tmpInputSplitReservedPath);
+          } catch (KeeperException e) {
+            throw new IllegalStateException(
+                "reserveInputSplit: KeeperException on reserve", e);
+          } catch (InterruptedException e) {
+            throw new IllegalStateException(
+                "reserveInputSplit: InterruptedException " +
+                    "on reserve", e);
+          }
+        } else {
+          ++reservedInputSplits;
+        }
+      }
+      if (LOG.isInfoEnabled()) {
+        LOG.info("reserveInputSplit: reservedPath = " +
+            reservedInputSplitPath + ", " + reservedInputSplits +
+            " of " + splitOrganizer.getPathListSize() +
+            " InputSplits are finished.");
+      }
+      if (reservedInputSplits == splitOrganizer.getPathListSize()) {
+        return null;
+      }
+      context.progress();
+      // Wait for either a reservation to go away or a notification that
+      // an InputSplit has finished.
+      context.progress();
+      bspServiceWorker.getInputSplitsStateChangedEvent().waitMsecs(
+          60 * 1000);
+      bspServiceWorker.getInputSplitsStateChangedEvent().reset();
+    }
+  }
+
+  /**
+   * Mark an input split path as completed by this worker.  This notifies
+   * the master and the other workers that this input split has not only
+   * been reserved, but also marked processed.
+   *
+   * @param inputSplitPath Path to the input split.
+   */
+  private void markInputSplitPathFinished(String inputSplitPath) {
+    String inputSplitFinishedPath =
+        inputSplitPath + BspServiceWorker.INPUT_SPLIT_FINISHED_NODE;
+    try {
+      zooKeeperExt.createExt(inputSplitFinishedPath,
+          null,
+          ZooDefs.Ids.OPEN_ACL_UNSAFE,
+          CreateMode.PERSISTENT,
+          true);
+    } catch (KeeperException.NodeExistsException e) {
+      LOG.warn("loadVertices: " + inputSplitFinishedPath +
+          " already exists!");
+    } catch (KeeperException e) {
+      throw new IllegalStateException(
+          "loadVertices: KeeperException on " +
+              inputSplitFinishedPath, e);
+    } catch (InterruptedException e) {
+      throw new IllegalStateException(
+          "loadVertices: InterruptedException on " +
+              inputSplitFinishedPath, e);
+    }
+  }
+
+  /**
+   * Extract vertices from input split, saving them into a mini cache of
+   * partitions.  Periodically flush the cache of vertices when a limit is
+   * reached in readVerticeFromInputSplit.
+   * Mark the input split finished when done.
+   *
+   * @param inputSplitPath ZK location of input split
+   * @param graphState Current graph state
+   * @return Mapping of vertex indices and statistics, or null if no data read
+   * @throws IOException
+   * @throws ClassNotFoundException
+   * @throws InterruptedException
+   * @throws InstantiationException
+   * @throws IllegalAccessException
+   */
+  private VertexEdgeCount loadVerticesFromInputSplit(
+      String inputSplitPath,
+      GraphState<I, V, E, M> graphState)
+    throws IOException, ClassNotFoundException, InterruptedException,
+      InstantiationException, IllegalAccessException {
+    InputSplit inputSplit = getInputSplitForVertices(inputSplitPath);
+    VertexEdgeCount vertexEdgeCount =
+        readVerticesFromInputSplit(inputSplit, graphState);
+    if (LOG.isInfoEnabled()) {
+      LOG.info("loadVerticesFromInputSplit: Finished loading " +
+          inputSplitPath + " " + vertexEdgeCount);
+    }
+    markInputSplitPathFinished(inputSplitPath);
+    return vertexEdgeCount;
+  }
+
+  /**
+   * Talk to ZooKeeper to convert the input split path to the actual
+   * InputSplit containing the vertices to read.
+   *
+   * @param inputSplitPath Location in ZK of input split
+   * @return instance of InputSplit containing vertices to read
+   * @throws IOException
+   * @throws ClassNotFoundException
+   */
+  private InputSplit getInputSplitForVertices(String inputSplitPath)
+    throws IOException, ClassNotFoundException {
+    byte[] splitList;
+    try {
+      splitList = zooKeeperExt.getData(inputSplitPath, false, null);
+    } catch (KeeperException e) {
+      throw new IllegalStateException(
+          "loadVertices: KeeperException on " + inputSplitPath, e);
+    } catch (InterruptedException e) {
+      throw new IllegalStateException(
+          "loadVertices: IllegalStateException on " + inputSplitPath, e);
+    }
+    context.progress();
+
+    DataInputStream inputStream =
+        new DataInputStream(new ByteArrayInputStream(splitList));
+    Text.readString(inputStream); // location data unused here, skip
+    String inputSplitClass = Text.readString(inputStream);
+    InputSplit inputSplit = (InputSplit)
+        ReflectionUtils.newInstance(
+            configuration.getClassByName(inputSplitClass),
+            configuration);
+    ((Writable) inputSplit).readFields(inputStream);
+
+    if (LOG.isInfoEnabled()) {
+      LOG.info("getInputSplitForVertices: Reserved " + inputSplitPath +
+          " from ZooKeeper and got input split '" +
+          inputSplit.toString() + "'");
+    }
+    return inputSplit;
+  }
+
+  /**
+   * Read vertices from input split.  If testing, the user may request a
+   * maximum number of vertices to be read from an input split.
+   *
+   * @param inputSplit Input split to process with vertex reader
+   * @param graphState Current graph state
+   * @return Vertices and edges loaded from this input split
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  private VertexEdgeCount readVerticesFromInputSplit(
+      InputSplit inputSplit,
+      GraphState<I, V, E, M> graphState)
+    throws IOException, InterruptedException {
+    VertexInputFormat<I, V, E, M> vertexInputFormat =
+        configuration.createVertexInputFormat();
+    VertexReader<I, V, E, M> vertexReader =
+        vertexInputFormat.createVertexReader(inputSplit, context);
+    vertexReader.initialize(inputSplit, context);
+    long inputSplitVerticesLoaded = 0;
+    long inputSplitEdgesLoaded = 0;
+    while (vertexReader.nextVertex()) {
+      Vertex<I, V, E, M> readerVertex =
+          vertexReader.getCurrentVertex();
+      if (readerVertex.getId() == null) {
+        throw new IllegalArgumentException(
+            "readVerticesFromInputSplit: Vertex reader returned a vertex " +
+                "without an id!  - " + readerVertex);
+      }
+      if (readerVertex.getValue() == null) {
+        readerVertex.setValue(configuration.createVertexValue());
+      }
+      readerVertex.setConf(configuration);
+      readerVertex.setGraphState(graphState);
+
+      PartitionOwner partitionOwner =
+          bspServiceWorker.getVertexPartitionOwner(readerVertex.getId());
+      graphState.getWorkerClientRequestProcessor().sendVertexRequest(
+          partitionOwner, readerVertex);
+      context.progress(); // do this before potential data transfer
+      ++inputSplitVerticesLoaded;
+      inputSplitEdgesLoaded += readerVertex.getNumEdges();
+
+      // Update status every 250k vertices
+      if (((inputSplitVerticesLoaded + totalVerticesLoaded) % 250000) == 0) {
+        LoggerUtils.setStatusAndLog(context, LOG, Level.INFO,
+            "readVerticesFromInputSplit: Loaded " +
+                (inputSplitVerticesLoaded + totalVerticesLoaded) +
+                " vertices " +
+                (inputSplitEdgesLoaded + totalEdgesLoaded) + " edges " +
+                MemoryUtils.getRuntimeMemoryStats());
+      }
+
+      // For sampling, or to limit outlier input splits, the number of
+      // records per input split can be limited
+      if (inputSplitMaxVertices > 0 &&
+          inputSplitVerticesLoaded >= inputSplitMaxVertices) {
+        if (LOG.isInfoEnabled()) {
+          LOG.info("readVerticesFromInputSplit: Leaving the input " +
+              "split early, reached maximum vertices " +
+              inputSplitVerticesLoaded);
+        }
+        break;
+      }
+    }
+    vertexReader.close();
+    totalVerticesLoaded += inputSplitVerticesLoaded;
+    totalEdgesLoaded += inputSplitEdgesLoaded;
+    return new VertexEdgeCount(
+        inputSplitVerticesLoaded, inputSplitEdgesLoaded);
+  }
+}
+

Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/MutableVertex.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/MutableVertex.java?rev=1399090&r1=1399089&r2=1399090&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/MutableVertex.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/MutableVertex.java Wed Oct 17 04:33:16 2012
@@ -68,7 +68,7 @@ public abstract class MutableVertex<I ex
     throws IOException {
     Vertex<I, V, E, M> vertex = getConf().createVertex();
     vertex.initialize(id, value, edges);
-    getGraphState().getWorkerCommunications().addVertexRequest(vertex);
+    getGraphState().getWorkerClientRequestProcessor().addVertexRequest(vertex);
   }
 
   /**
@@ -89,7 +89,8 @@ public abstract class MutableVertex<I ex
    * @param vertexId Id of the vertex to be removed.
    */
   public void removeVertexRequest(I vertexId) throws IOException {
-    getGraphState().getWorkerCommunications().removeVertexRequest(vertexId);
+    getGraphState().getWorkerClientRequestProcessor().
+        removeVertexRequest(vertexId);
   }
 
   /**
@@ -101,7 +102,7 @@ public abstract class MutableVertex<I ex
    */
   public void addEdgeRequest(I sourceVertexId, Edge<I, E> edge)
     throws IOException {
-    getGraphState().getWorkerCommunications().
+    getGraphState().getWorkerClientRequestProcessor().
         addEdgeRequest(sourceVertexId, edge);
   }
 
@@ -114,7 +115,7 @@ public abstract class MutableVertex<I ex
    */
   public void removeEdgeRequest(I sourceVertexId, I targetVertexId)
     throws IOException {
-    getGraphState().getWorkerCommunications().
+    getGraphState().getWorkerClientRequestProcessor().
         removeEdgeRequest(sourceVertexId, targetVertexId);
   }
 }



Mime
View raw message