giraph-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ni...@apache.org
Subject [16/51] [partial] GIRAPH-457: update module names (nitay)
Date Thu, 20 Dec 2012 04:25:31 GMT
http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph-core/src/main/java/org/apache/giraph/graph/GraphMapper.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/GraphMapper.java b/giraph-core/src/main/java/org/apache/giraph/graph/GraphMapper.java
new file mode 100644
index 0000000..eb35723
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/GraphMapper.java
@@ -0,0 +1,772 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.graph;
+
+import org.apache.giraph.bsp.CentralizedServiceMaster;
+import org.apache.giraph.bsp.CentralizedServiceWorker;
+import org.apache.giraph.comm.messages.MessageStoreByPartition;
+import org.apache.giraph.conf.GiraphConstants;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.graph.partition.PartitionOwner;
+import org.apache.giraph.graph.partition.PartitionStats;
+import org.apache.giraph.metrics.GiraphMetrics;
+import org.apache.giraph.metrics.GiraphMetricsRegistry;
+import org.apache.giraph.metrics.GiraphTimer;
+import org.apache.giraph.metrics.GiraphTimerContext;
+import org.apache.giraph.metrics.ResetSuperstepMetricsObserver;
+import org.apache.giraph.metrics.SuperstepMetricsRegistry;
+import org.apache.giraph.utils.MemoryUtils;
+import org.apache.giraph.utils.ProgressableUtils;
+import org.apache.giraph.utils.ReflectionUtils;
+import org.apache.giraph.utils.SystemTime;
+import org.apache.giraph.utils.Time;
+import org.apache.giraph.zk.ZooKeeperManager;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.filecache.DistributedCache;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.util.Progressable;
+import org.apache.log4j.Appender;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+import org.apache.log4j.PatternLayout;
+
+import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+import java.io.IOException;
+import java.lang.reflect.Type;
+import java.net.URL;
+import java.net.URLDecoder;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Enumeration;
+import java.util.List;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * This mapper that will execute the BSP graph tasks.  Since this mapper will
+ * not be passing data by key-value pairs through the MR framework, the
+ * types are irrelevant.
+ *
+ * @param <I> Vertex id
+ * @param <V> Vertex data
+ * @param <E> Edge data
+ * @param <M> Message data
+ */
+@SuppressWarnings("rawtypes")
+public class GraphMapper<I extends WritableComparable, V extends Writable,
+    E extends Writable, M extends Writable> extends
+    Mapper<Object, Object, Object, Object> implements
+    ResetSuperstepMetricsObserver {
+  static {
+    Configuration.addDefaultResource("giraph-site.xml");
+  }
+
+  /** Name of metric for superstep time in msec */
+  public static final String TIMER_SUPERSTEP_TIME = "superstep-time-ms";
+  /** Name of metric for compute on all vertices in msec */
+  public static final String TIMER_COMPUTE_ALL = "compute-all-ms";
+  /** Name of metric for time from begin compute to first message sent */
+  public static final String TIMER_TIME_TO_FIRST_MSG =
+      "time-to-first-message-ms";
+  /** Name of metric for time from first message till last message flushed */
+  public static final String TIMER_COMMUNICATION_TIME = "communication-time-ms";
+
+  /** Time instance used for timing in this class */
+  private static final Time TIME = SystemTime.get();
+  /** Class logger */
+  private static final Logger LOG = Logger.getLogger(GraphMapper.class);
+  /** Coordination service worker */
+  private CentralizedServiceWorker<I, V, E, M> serviceWorker;
+  /** Coordination service master */
+  private CentralizedServiceMaster<I, V, E, M> serviceMaster;
+  /** Coordination service master thread */
+  private Thread masterThread = null;
+  /** The map should be run exactly once, or else there is a problem. */
+  private boolean mapAlreadyRun = false;
+  /** Manages the ZooKeeper servers if necessary (dynamic startup) */
+  private ZooKeeperManager zkManager;
+  /** Configuration */
+  private ImmutableClassesGiraphConfiguration<I, V, E, M> conf;
+  /** Already complete? */
+  private boolean done = false;
+  /** What kind of functions is this mapper doing? */
+  private MapFunctions mapFunctions = MapFunctions.UNKNOWN;
+  /** Total number of vertices in the graph (at this time) */
+  private long numVertices = -1;
+  /** Total number of edges in the graph (at this time) */
+  private long numEdges = -1;
+
+  // Per-Job Metrics
+  /** Timer for WorkerContext#preApplication() */
+  private GiraphTimer wcPreAppTimer;
+  /** Timer for WorkerContext#postApplication() */
+  private GiraphTimer wcPostAppTimer;
+
+  // Per-Superstep Metrics
+  /** Time for how long superstep took */
+  private GiraphTimer superstepTimer;
+  /** Time for all compute() calls in a superstep */
+  private GiraphTimer computeAll;
+  /** Time from starting compute to sending first message */
+  private GiraphTimer timeToFirstMessage;
+  /** Context for timing time to first message above */
+  private GiraphTimerContext timeToFirstMessageTimerContext;
+  /** Time from first sent message till last message flushed. */
+  private GiraphTimer communicationTimer;
+  /** Context for timing communication time above */
+  private GiraphTimerContext communicationTimerContext;
+  /** Timer for WorkerContext#preSuperstep() */
+  private GiraphTimer wcPreSuperstepTimer;
+
+  /** What kinds of functions to run on this mapper */
+  public enum MapFunctions {
+    /** Undecided yet */
+    UNKNOWN,
+    /** Only be the master */
+    MASTER_ONLY,
+    /** Only be the master and ZooKeeper */
+    MASTER_ZOOKEEPER_ONLY,
+    /** Only be the worker */
+    WORKER_ONLY,
+    /** Do master, worker, and ZooKeeper */
+    ALL,
+    /** Do master and worker */
+    ALL_EXCEPT_ZOOKEEPER
+  }
+
+  /**
+   * Get the map function enum.
+   *
+   * @return Map functions of this mapper.
+   */
+  public MapFunctions getMapFunctions() {
+    return mapFunctions;
+  }
+
+  /**
+   * Get master aggregator usage, a subset of the functionality
+   *
+   * @return Master aggregator usage interface
+   */
+  public final MasterAggregatorUsage getMasterAggregatorUsage() {
+    return serviceMaster.getAggregatorHandler();
+  }
+
+  public final WorkerContext getWorkerContext() {
+    return serviceWorker.getWorkerContext();
+  }
+
+  /**
+   * Default handler for uncaught exceptions.
+   */
+  class OverrideExceptionHandler implements Thread.UncaughtExceptionHandler {
+    @Override
+    public void uncaughtException(Thread t, Throwable e) {
+      LOG.fatal(
+          "uncaughtException: OverrideExceptionHandler on thread " +
+              t.getName() + ", msg = " +  e.getMessage() + ", exiting...", e);
+      System.exit(1);
+    }
+  }
+
+  /**
+   * Set the concrete, user-defined choices about generic methods
+   * (validated earlier in GiraphRunner) into the Configuration.
+   * @param conf the Configuration object for this job run.
+   */
+  public void determineClassTypes(Configuration conf) {
+    Class<? extends Vertex<I, V, E, M>> vertexClass =
+        BspUtils.<I, V, E, M>getVertexClass(conf);
+    List<Class<?>> classList = ReflectionUtils.<Vertex>getTypeArguments(
+        Vertex.class, vertexClass);
+    Type vertexIndexType = classList.get(0);
+    Type vertexValueType = classList.get(1);
+    Type edgeValueType = classList.get(2);
+    Type messageValueType = classList.get(3);
+    conf.setClass(GiraphConstants.VERTEX_ID_CLASS,
+        (Class<?>) vertexIndexType,
+        WritableComparable.class);
+    conf.setClass(GiraphConstants.VERTEX_VALUE_CLASS,
+        (Class<?>) vertexValueType,
+        Writable.class);
+    conf.setClass(GiraphConstants.EDGE_VALUE_CLASS,
+        (Class<?>) edgeValueType,
+        Writable.class);
+    conf.setClass(GiraphConstants.MESSAGE_VALUE_CLASS,
+        (Class<?>) messageValueType,
+        Writable.class);
+  }
+
+    /**
+    * Copied from JobConf to get the location of this jar.  Workaround for
+    * things like Oozie map-reduce jobs.
+    *
+    * @param myClass Class to search the class loader path for to locate
+    *        the relevant jar file
+    * @return Location of the jar file containing myClass
+    */
+  private static String findContainingJar(Class<?> myClass) {
+    ClassLoader loader = myClass.getClassLoader();
+    String classFile =
+        myClass.getName().replaceAll("\\.", "/") + ".class";
+    try {
+      for (Enumeration<?> itr = loader.getResources(classFile);
+          itr.hasMoreElements();) {
+        URL url = (URL) itr.nextElement();
+        if ("jar".equals(url.getProtocol())) {
+          String toReturn = url.getPath();
+          if (toReturn.startsWith("file:")) {
+            toReturn = toReturn.substring("file:".length());
+          }
+          toReturn = URLDecoder.decode(toReturn, "UTF-8");
+          return toReturn.replaceAll("!.*$", "");
+        }
+      }
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+    return null;
+  }
+
+  /**
+   * Figure out what functions this mapper should do.  Basic logic is as
+   * follows:
+   * 1) If not split master, everyone does the everything and/or running
+   *    ZooKeeper.
+   * 2) If split master/worker, masters also run ZooKeeper (if it's not
+   *    given to us).
+   *
+   * @param conf Configuration to use
+   * @param zkManager ZooKeeper manager to help determine whether to run
+   *        ZooKeeper
+   * @return Functions that this mapper should do.
+   */
+  private static MapFunctions determineMapFunctions(
+      ImmutableClassesGiraphConfiguration conf,
+      ZooKeeperManager zkManager) {
+    boolean splitMasterWorker = conf.getSplitMasterWorker();
+    int taskPartition = conf.getTaskPartition();
+    boolean zkAlreadyProvided = conf.getZookeeperList() != null;
+    MapFunctions functions = MapFunctions.UNKNOWN;
+    // What functions should this mapper do?
+    if (!splitMasterWorker) {
+      if ((zkManager != null) && zkManager.runsZooKeeper()) {
+        functions = MapFunctions.ALL;
+      } else {
+        functions = MapFunctions.ALL_EXCEPT_ZOOKEEPER;
+      }
+    } else {
+      if (zkAlreadyProvided) {
+        int masterCount = conf.getZooKeeperServerCount();
+        if (taskPartition < masterCount) {
+          functions = MapFunctions.MASTER_ONLY;
+        } else {
+          functions = MapFunctions.WORKER_ONLY;
+        }
+      } else {
+        if ((zkManager != null) && zkManager.runsZooKeeper()) {
+          functions = MapFunctions.MASTER_ZOOKEEPER_ONLY;
+        } else {
+          functions = MapFunctions.WORKER_ONLY;
+        }
+      }
+    }
+    return functions;
+  }
+
+  @Override
+  public void setup(Context context)
+    throws IOException, InterruptedException {
+    context.setStatus("setup: Beginning mapper setup.");
+
+    // Setting the default handler for uncaught exceptions.
+    Thread.setDefaultUncaughtExceptionHandler(
+        new OverrideExceptionHandler());
+
+    determineClassTypes(context.getConfiguration());
+    conf = new ImmutableClassesGiraphConfiguration<I, V, E, M>(
+        context.getConfiguration());
+
+    // Hadoop security needs this property to be set
+    if (System.getenv("HADOOP_TOKEN_FILE_LOCATION") != null) {
+      conf.set("mapreduce.job.credentials.binary",
+          System.getenv("HADOOP_TOKEN_FILE_LOCATION"));
+    }
+
+    // Set the log level
+    String logLevel = conf.getLocalLevel();
+    if (!Logger.getRootLogger().getLevel().equals(Level.toLevel(logLevel))) {
+      Logger.getRootLogger().setLevel(Level.toLevel(logLevel));
+      if (LOG.isInfoEnabled()) {
+        LOG.info("setup: Set log level to " + logLevel);
+      }
+    } else {
+      if (LOG.isInfoEnabled()) {
+        LOG.info("setup: Log level remains at " + logLevel);
+      }
+    }
+    // Sets pattern layout for all appenders
+    if (conf.useLogThreadLayout()) {
+      PatternLayout layout =
+          new PatternLayout("%-7p %d [%t] %c %x - %m%n");
+      Enumeration<Appender> appenderEnum =
+          Logger.getRootLogger().getAllAppenders();
+      while (appenderEnum.hasMoreElements()) {
+        appenderEnum.nextElement().setLayout(layout);
+      }
+    }
+
+    // Set up GiraphMetrics
+    GiraphMetrics.init(conf);
+    GiraphMetrics.get().addSuperstepResetObserver(this);
+    initJobMetrics();
+    MemoryUtils.initMetrics();
+
+    // Do some initial setup (possibly starting up a Zookeeper service)
+    context.setStatus("setup: Initializing Zookeeper services.");
+    if (!conf.getLocalTestMode()) {
+      Path[] fileClassPaths = DistributedCache.getLocalCacheArchives(conf);
+      String zkClasspath = null;
+      if (fileClassPaths == null) {
+        if (LOG.isInfoEnabled()) {
+          LOG.info("Distributed cache is empty. Assuming fatjar.");
+        }
+        String jarFile = context.getJar();
+        if (jarFile == null) {
+          jarFile = findContainingJar(getClass());
+        }
+        zkClasspath = jarFile.replaceFirst("file:", "");
+      } else {
+        StringBuilder sb = new StringBuilder();
+        sb.append(fileClassPaths[0]);
+
+        for (int i = 1; i < fileClassPaths.length; i++) {
+          sb.append(":");
+          sb.append(fileClassPaths[i]);
+        }
+        zkClasspath = sb.toString();
+      }
+
+      if (LOG.isInfoEnabled()) {
+        LOG.info("setup: classpath @ " + zkClasspath + " for job " +
+            context.getJobName());
+      }
+      conf.setZooKeeperJar(zkClasspath);
+    }
+    String serverPortList = conf.getZookeeperList();
+    if (serverPortList == null) {
+      zkManager = new ZooKeeperManager(context, conf);
+      context.setStatus("setup: Setting up Zookeeper manager.");
+      zkManager.setup();
+      if (zkManager.computationDone()) {
+        done = true;
+        return;
+      }
+      zkManager.onlineZooKeeperServers();
+      serverPortList = zkManager.getZooKeeperServerPortString();
+    }
+    if (zkManager != null && zkManager.runsZooKeeper()) {
+      if (LOG.isInfoEnabled()) {
+        LOG.info("setup: Chosen to run ZooKeeper...");
+      }
+    }
+    context.setStatus("setup: Connected to Zookeeper service " +
+        serverPortList);
+    this.mapFunctions = determineMapFunctions(conf, zkManager);
+
+    // Sometimes it takes a while to get multiple ZooKeeper servers up
+    if (conf.getZooKeeperServerCount() > 1) {
+      Thread.sleep(GiraphConstants.DEFAULT_ZOOKEEPER_INIT_LIMIT *
+          GiraphConstants.DEFAULT_ZOOKEEPER_TICK_TIME);
+    }
+    int sessionMsecTimeout = conf.getZooKeeperSessionTimeout();
+    try {
+      if ((mapFunctions == MapFunctions.MASTER_ZOOKEEPER_ONLY) ||
+          (mapFunctions == MapFunctions.MASTER_ONLY) ||
+          (mapFunctions == MapFunctions.ALL) ||
+          (mapFunctions == MapFunctions.ALL_EXCEPT_ZOOKEEPER)) {
+        if (LOG.isInfoEnabled()) {
+          LOG.info("setup: Starting up BspServiceMaster " +
+              "(master thread)...");
+        }
+        serviceMaster = new BspServiceMaster<I, V, E, M>(
+            serverPortList, sessionMsecTimeout, context, this);
+        masterThread = new MasterThread<I, V, E, M>(
+            (BspServiceMaster<I, V, E, M>) serviceMaster, context);
+        masterThread.start();
+      }
+      if ((mapFunctions == MapFunctions.WORKER_ONLY) ||
+          (mapFunctions == MapFunctions.ALL) ||
+          (mapFunctions == MapFunctions.ALL_EXCEPT_ZOOKEEPER)) {
+        if (LOG.isInfoEnabled()) {
+          LOG.info("setup: Starting up BspServiceWorker...");
+        }
+        serviceWorker = new BspServiceWorker<I, V, E, M>(
+            serverPortList,
+            sessionMsecTimeout,
+            context,
+            this);
+        if (LOG.isInfoEnabled()) {
+          LOG.info("setup: Registering health of this worker...");
+        }
+      }
+    } catch (IOException e) {
+      LOG.error("setup: Caught exception just before end of setup", e);
+      if (zkManager != null) {
+        zkManager.offlineZooKeeperServers(
+            ZooKeeperManager.State.FAILED);
+      }
+      throw new RuntimeException(
+          "setup: Offlining servers due to exception...", e);
+    }
+
+    context.setStatus(getMapFunctions().toString() + " starting...");
+  }
+
+  /**
+   * Initialize job-level metrics used by this class.
+   */
+  private void initJobMetrics() {
+    GiraphMetricsRegistry jobMetrics = GiraphMetrics.get().perJob();
+    wcPreAppTimer = new GiraphTimer(jobMetrics, "worker-context-pre-app",
+        TimeUnit.MILLISECONDS);
+    wcPostAppTimer = new GiraphTimer(jobMetrics, "worker-context-post-app",
+        TimeUnit.MILLISECONDS);
+  }
+
+  @Override
+  public void newSuperstep(SuperstepMetricsRegistry superstepMetrics) {
+    superstepTimer = new GiraphTimer(superstepMetrics,
+        TIMER_SUPERSTEP_TIME, TimeUnit.MILLISECONDS);
+    computeAll = new GiraphTimer(superstepMetrics,
+        TIMER_COMPUTE_ALL, TimeUnit.MILLISECONDS);
+    timeToFirstMessage = new GiraphTimer(superstepMetrics,
+        TIMER_TIME_TO_FIRST_MSG, TimeUnit.MICROSECONDS);
+    communicationTimer = new GiraphTimer(superstepMetrics,
+        TIMER_COMMUNICATION_TIME, TimeUnit.MILLISECONDS);
+    wcPreSuperstepTimer = new GiraphTimer(superstepMetrics,
+        "worker-context-pre-superstep", TimeUnit.MILLISECONDS);
+  }
+
+  /**
+   * Notification from Vertex that a message has been sent.
+   */
+  public void notifySentMessages() {
+    // We are tracking the time between when the compute started and the first
+    // message get sent. We use null to flag that we have already recorded it.
+    GiraphTimerContext tmp = timeToFirstMessageTimerContext;
+    if (tmp != null) {
+      synchronized (timeToFirstMessage) {
+        if (timeToFirstMessageTimerContext != null) {
+          timeToFirstMessageTimerContext.stop();
+          timeToFirstMessageTimerContext = null;
+          communicationTimerContext = communicationTimer.time();
+        }
+      }
+    }
+  }
+
+  /**
+   * Notification of last message flushed. Comes when we finish the superstep
+   * and are done waiting for all messages to send.
+   */
+  public void notifyFinishedCommunication() {
+    GiraphTimerContext tmp = communicationTimerContext;
+    if (tmp != null) {
+      synchronized (communicationTimer) {
+        if (communicationTimerContext != null) {
+          communicationTimerContext.stop();
+          communicationTimerContext = null;
+        }
+      }
+    }
+  }
+
+  @Override
+  public void map(Object key, Object value, Context context)
+    throws IOException, InterruptedException {
+    // map() only does computation
+    // 1) Run checkpoint per frequency policy.
+    // 2) For every vertex on this mapper, run the compute() function
+    // 3) Wait until all messaging is done.
+    // 4) Check if all vertices are done.  If not goto 2).
+    // 5) Dump output.
+    if (done) {
+      return;
+    }
+
+    GiraphMetrics.get().
+        resetSuperstepMetrics(BspService.INPUT_SUPERSTEP);
+
+    if ((mapFunctions == MapFunctions.MASTER_ZOOKEEPER_ONLY) ||
+        (mapFunctions == MapFunctions.MASTER_ONLY)) {
+      if (LOG.isInfoEnabled()) {
+        LOG.info("map: No need to do anything when not a worker");
+      }
+      return;
+    }
+
+    if (mapAlreadyRun) {
+      throw new RuntimeException("map: In BSP, map should have only been" +
+          " run exactly once, (already run)");
+    }
+    mapAlreadyRun = true;
+
+    FinishedSuperstepStats inputSuperstepStats =
+        serviceWorker.setup();
+    numVertices = inputSuperstepStats.getVertexCount();
+    numEdges = inputSuperstepStats.getEdgeCount();
+    if (inputSuperstepStats.getVertexCount() == 0) {
+      LOG.warn("map: No vertices in the graph, exiting.");
+      return;
+    }
+
+    WorkerAggregatorUsage aggregatorUsage =
+        serviceWorker.getAggregatorHandler();
+    serviceWorker.getWorkerContext().setGraphState(
+        new GraphState<I, V, E, M>(serviceWorker.getSuperstep(),
+            numVertices, numEdges, context, this, null, aggregatorUsage));
+
+    workerContextPreApp(context);
+
+    List<PartitionStats> partitionStatsList =
+        new ArrayList<PartitionStats>();
+
+    int numComputeThreads = conf.getNumComputeThreads();
+    FinishedSuperstepStats finishedSuperstepStats = null;
+    do {
+      final long superstep = serviceWorker.getSuperstep();
+      GiraphMetrics.get().resetSuperstepMetrics(superstep);
+
+      GiraphTimerContext superstepTimerContext = superstepTimer.time();
+
+      GraphState<I, V, E, M> graphState =
+          new GraphState<I, V, E, M>(superstep, numVertices, numEdges,
+              context, this, null, aggregatorUsage);
+
+      Collection<? extends PartitionOwner> masterAssignedPartitionOwners =
+          serviceWorker.startSuperstep(graphState);
+
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("map: " + MemoryUtils.getRuntimeMemoryStats());
+      }
+      context.progress();
+
+      serviceWorker.exchangeVertexPartitions(masterAssignedPartitionOwners);
+
+      context.progress();
+
+      // Might need to restart from another superstep
+      // (manually or automatic), or store a checkpoint
+      if (serviceWorker.getRestartedSuperstep() == superstep) {
+        if (LOG.isInfoEnabled()) {
+          LOG.info("map: Loading from checkpoint " + superstep);
+        }
+        VertexEdgeCount vertexEdgeCount = serviceWorker.loadCheckpoint(
+            serviceWorker.getRestartedSuperstep());
+        numVertices = vertexEdgeCount.getVertexCount();
+        numEdges = vertexEdgeCount.getEdgeCount();
+        graphState = new GraphState<I, V, E, M>(superstep, numVertices,
+            numEdges, context, this, null, aggregatorUsage);
+      } else if (serviceWorker.checkpointFrequencyMet(superstep)) {
+        serviceWorker.storeCheckpoint();
+      }
+
+      serviceWorker.prepareSuperstep();
+
+      serviceWorker.getWorkerContext().setGraphState(graphState);
+      GiraphTimerContext perSuperstepTimer = wcPreSuperstepTimer.time();
+      serviceWorker.getWorkerContext().preSuperstep();
+      perSuperstepTimer.stop();
+      context.progress();
+
+      MessageStoreByPartition<I, M> messageStore =
+          serviceWorker.getServerData().getCurrentMessageStore();
+      int numPartitions = serviceWorker.getPartitionStore().getNumPartitions();
+      int numThreads =
+          Math.min(numComputeThreads, numPartitions);
+      if (LOG.isInfoEnabled()) {
+        LOG.info("map: " + numPartitions + " partitions to process with " +
+            numThreads + " compute thread(s), originally " +
+            numComputeThreads + " thread(s) on superstep " + superstep);
+      }
+      partitionStatsList.clear();
+      if (numPartitions > 0) {
+        List<Future<Collection<PartitionStats>>> partitionFutures =
+            Lists.newArrayListWithCapacity(numPartitions);
+        BlockingQueue<Integer> computePartitionIdQueue =
+            new ArrayBlockingQueue<Integer>(numPartitions);
+        for (Integer partitionId :
+            serviceWorker.getPartitionStore().getPartitionIds()) {
+          computePartitionIdQueue.add(partitionId);
+        }
+
+        GiraphTimerContext computeAllTimerContext = computeAll.time();
+        timeToFirstMessageTimerContext = timeToFirstMessage.time();
+
+        ExecutorService partitionExecutor =
+            Executors.newFixedThreadPool(numThreads,
+                new ThreadFactoryBuilder().setNameFormat("compute-%d").build());
+        for (int i = 0; i < numThreads; ++i) {
+          ComputeCallable<I, V, E, M> computeCallable =
+              new ComputeCallable<I, V, E, M>(
+                  context,
+                  graphState,
+                  messageStore,
+                  computePartitionIdQueue,
+                  conf,
+                  serviceWorker);
+          partitionFutures.add(partitionExecutor.submit(computeCallable));
+        }
+
+        // Wait until all the threads are done to wait on all requests
+        for (Future<Collection<PartitionStats>> partitionFuture :
+            partitionFutures) {
+          Collection<PartitionStats> stats =
+              ProgressableUtils.getFutureResult(partitionFuture, context);
+          partitionStatsList.addAll(stats);
+        }
+        partitionExecutor.shutdown();
+
+        computeAllTimerContext.stop();
+      }
+
+      finishedSuperstepStats =
+          serviceWorker.finishSuperstep(graphState, partitionStatsList);
+      numVertices = finishedSuperstepStats.getVertexCount();
+      numEdges = finishedSuperstepStats.getEdgeCount();
+
+      superstepTimerContext.stop();
+      if (conf.metricsEnabled()) {
+        GiraphMetrics.get().perSuperstep().printSummary();
+      }
+
+    } while (!finishedSuperstepStats.getAllVerticesHalted());
+    if (LOG.isInfoEnabled()) {
+      LOG.info("map: BSP application done (global vertices marked done)");
+    }
+
+    serviceWorker.getWorkerContext().setGraphState(
+        new GraphState<I, V, E, M>(serviceWorker.getSuperstep(),
+            numVertices, numEdges, context, this, null, aggregatorUsage));
+    GiraphTimerContext postAppTimerContext = wcPostAppTimer.time();
+    serviceWorker.getWorkerContext().postApplication();
+    postAppTimerContext.stop();
+    context.progress();
+  }
+
+  /**
+   * Call to the WorkerContext before application begins.
+   *
+   * @param progressable thing to call progress on.
+   */
+  private void workerContextPreApp(Progressable progressable) {
+    GiraphTimerContext preAppTimerContext = wcPreAppTimer.time();
+    try {
+      serviceWorker.getWorkerContext().preApplication();
+    } catch (InstantiationException e) {
+      LOG.fatal("map: preApplication failed in instantiation", e);
+      throw new RuntimeException(
+          "map: preApplication failed in instantiation", e);
+    } catch (IllegalAccessException e) {
+      LOG.fatal("map: preApplication failed in access", e);
+      throw new RuntimeException(
+          "map: preApplication failed in access", e);
+    }
+    preAppTimerContext.stop();
+    progressable.progress();
+  }
+
+  @Override
+  public void cleanup(Context context)
+    throws IOException, InterruptedException {
+    if (LOG.isInfoEnabled()) {
+      LOG.info("cleanup: Starting for " + getMapFunctions());
+    }
+    if (done) {
+      return;
+    }
+
+    if (serviceWorker != null) {
+      serviceWorker.cleanup();
+    }
+    try {
+      if (masterThread != null) {
+        masterThread.join();
+      }
+    } catch (InterruptedException e) {
+      // cleanup phase -- just log the error
+      LOG.error("cleanup: Master thread couldn't join");
+    }
+    if (zkManager != null) {
+      zkManager.offlineZooKeeperServers(
+          ZooKeeperManager.State.FINISHED);
+    }
+  }
+
+  @Override
+  public void run(Context context) throws IOException, InterruptedException {
+    // Notify the master quicker if there is worker failure rather than
+    // waiting for ZooKeeper to timeout and delete the ephemeral znodes
+    try {
+      setup(context);
+      while (context.nextKeyValue()) {
+        map(context.getCurrentKey(),
+            context.getCurrentValue(),
+            context);
+      }
+      cleanup(context);
+      // Checkstyle exception due to needing to dump ZooKeeper failure
+      // on exception
+      // CHECKSTYLE: stop IllegalCatch
+    } catch (RuntimeException e) {
+      // CHECKSTYLE: resume IllegalCatch
+      if (mapFunctions == MapFunctions.UNKNOWN ||
+          mapFunctions == MapFunctions.MASTER_ZOOKEEPER_ONLY) {
+        // ZooKeeper may have had an issue
+        if (zkManager != null) {
+          zkManager.logZooKeeperOutput(Level.WARN);
+        }
+      }
+      try {
+        if (mapFunctions == MapFunctions.WORKER_ONLY) {
+          serviceWorker.failureCleanup();
+        }
+      // Checkstyle exception due to needing to get the original
+      // exception on failure
+      // CHECKSTYLE: stop IllegalCatch
+      } catch (RuntimeException e1) {
+      // CHECKSTYLE: resume IllegalCatch
+        LOG.error("run: Worker failure failed on another RuntimeException, " +
+            "original expection will be rethrown", e1);
+      }
+      throw new IllegalStateException(
+          "run: Caught an unrecoverable exception " + e.getMessage(), e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph-core/src/main/java/org/apache/giraph/graph/GraphState.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/GraphState.java b/giraph-core/src/main/java/org/apache/giraph/graph/GraphState.java
new file mode 100644
index 0000000..5b1e0b0
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/GraphState.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.graph;
+
+import org.apache.giraph.comm.WorkerClientRequestProcessor;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.Mapper;
+
+/**
+ * Immutable global state of the graph.
+ *
+ * @param <I> Vertex id
+ * @param <V> Vertex data
+ * @param <E> Edge data
+ * @param <M> Message data
+ */
+@SuppressWarnings("rawtypes")
+public class GraphState<I extends WritableComparable, V extends Writable,
+E extends Writable, M extends Writable> {
+  /** Graph-wide superstep */
+  private final long superstep;
+  /** Graph-wide number of vertices */
+  private final long numVertices;
+  /** Graph-wide number of edges */
+  private final long numEdges;
+  /** Graph-wide map context */
+  private final Mapper.Context context;
+  /** Graph-wide BSP Mapper for this Vertex */
+  private final GraphMapper<I, V, E, M> graphMapper;
+  /** Handles requests */
+  private final WorkerClientRequestProcessor<I, V, E, M>
+  workerClientRequestProcessor;
+  /** Worker aggregator usage */
+  private final WorkerAggregatorUsage workerAggregatorUsage;
+
+  /**
+   * Constructor
+   *
+   * @param superstep Current superstep
+   * @param numVertices Current graph-wide vertices
+   * @param numEdges Current graph-wide edges
+   * @param context Context
+   * @param graphMapper Graph mapper
+   * @param workerClientRequestProcessor Handles all communication
+   * @param workerAggregatorUsage Aggregator usage
+   *
+   */
+  public GraphState(
+      long superstep, long numVertices,
+      long numEdges, Mapper.Context context,
+      GraphMapper<I, V, E, M> graphMapper,
+      WorkerClientRequestProcessor<I, V, E, M> workerClientRequestProcessor,
+      WorkerAggregatorUsage workerAggregatorUsage) {
+    this.superstep = superstep;
+    this.numVertices = numVertices;
+    this.numEdges = numEdges;
+    this.context = context;
+    this.graphMapper = graphMapper;
+    this.workerClientRequestProcessor = workerClientRequestProcessor;
+    this.workerAggregatorUsage = workerAggregatorUsage;
+  }
+
+  public long getSuperstep() {
+    return superstep;
+  }
+
+  public long getTotalNumVertices() {
+    return numVertices;
+  }
+
+  public long getTotalNumEdges() {
+    return numEdges;
+  }
+
+  public Mapper.Context getContext() {
+    return context;
+  }
+
+  public GraphMapper<I, V, E, M> getGraphMapper() {
+    return graphMapper;
+  }
+
+  public WorkerClientRequestProcessor<I, V, E, M>
+  getWorkerClientRequestProcessor() {
+    return workerClientRequestProcessor;
+  }
+
+  public WorkerAggregatorUsage getWorkerAggregatorUsage() {
+    return workerAggregatorUsage;
+  }
+
+  @Override
+  public String toString() {
+    return "(superstep=" + superstep + ",numVertices=" + numVertices + "," +
+        "numEdges=" + numEdges + ",context=" + context +
+        ",graphMapper=" + graphMapper +
+        ",workerClientRequestProcessor=" + workerClientRequestProcessor + ")";
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph-core/src/main/java/org/apache/giraph/graph/GraphStateAware.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/GraphStateAware.java b/giraph-core/src/main/java/org/apache/giraph/graph/GraphStateAware.java
new file mode 100644
index 0000000..76cef43
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/GraphStateAware.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.graph;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+/**
+ * Interface specifying that the class can be configured with a GraphState.
+ *
+ * @param <I> Vertex ID object
+ * @param <V> Vertex Value object
+ * @param <E> Edge object
+ * @param <M> Message object
+ */
+public interface GraphStateAware<I extends WritableComparable,
+    V extends Writable, E extends Writable, M extends Writable> {
+  /**
+   * Set the graph state.
+   *
+   * @param graphState Graph state saved.
+   */
+  void setGraphState(GraphState<I, V, E, M> graphState);
+
+  /**
+   * Get the graph state stored.
+   *
+   * @return GraphState stored.
+   */
+  GraphState<I, V, E, M> getGraphState();
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph-core/src/main/java/org/apache/giraph/graph/HashMapVertex.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/HashMapVertex.java b/giraph-core/src/main/java/org/apache/giraph/graph/HashMapVertex.java
new file mode 100644
index 0000000..3e7503b
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/HashMapVertex.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.graph;
+
+import com.google.common.base.Function;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Maps;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.log4j.Logger;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * User applications can subclass {@link HashMapVertex}, which stores
+ * the outbound edges in a HashMap, for efficient edge random-access.  Note
+ * that {@link EdgeListVertex} is much more memory efficient for static graphs.
+ * User applications which need to implement their own
+ * in-memory data structures should subclass {@link MutableVertex}.
+ *
+ * Package access will prevent users from accessing internal methods.
+ *
+ * @param <I> Vertex index value
+ * @param <V> Vertex value
+ * @param <E> Edge value
+ * @param <M> Message value
+ */
+@SuppressWarnings("rawtypes")
+public abstract class HashMapVertex<I extends WritableComparable,
+    V extends Writable, E extends Writable, M extends Writable>
+    extends MutableVertex<I, V, E, M> {
+  /** Class logger */
+  private static final Logger LOG = Logger.getLogger(HashMapVertex.class);
+  /** Map of target vertices and their edge values */
+  protected Map<I, E> edgeMap = new HashMap<I, E>();
+
+  @Override
+  public void setEdges(Iterable<Edge<I, E>> edges) {
+    edgeMap.clear();
+    for (Edge<I, E> edge : edges) {
+      edgeMap.put(edge.getTargetVertexId(), edge.getValue());
+    }
+  }
+
+  @Override
+  public boolean addEdge(Edge<I, E> edge) {
+    if (edgeMap.put(edge.getTargetVertexId(), edge.getValue()) != null) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("addEdge: Vertex=" + getId() +
+            ": already added an edge value for target vertex id " +
+            edge.getTargetVertexId());
+      }
+      return false;
+    } else {
+      return true;
+    }
+  }
+
+  @Override
+  public boolean hasEdge(I targetVertexId) {
+    return edgeMap.containsKey(targetVertexId);
+  }
+
+  /**
+   * Get an iterator to the edges on this vertex.
+   *
+   * @return A <em>sorted</em> iterator, as defined by the sort-order
+   *         of the vertex ids
+   */
+  @Override
+  public Iterable<Edge<I, E>> getEdges() {
+    return Iterables.transform(edgeMap.entrySet(),
+        new Function<Map.Entry<I, E>, Edge<I, E>>() {
+
+          @Override
+          public Edge<I, E> apply(Map.Entry<I, E> edge) {
+            return new Edge<I, E>(edge.getKey(), edge.getValue());
+          }
+        });
+  }
+
+  @Override
+  public E getEdgeValue(I targetVertexId) {
+    return edgeMap.get(targetVertexId);
+  }
+
+  @Override
+  public int getNumEdges() {
+    return edgeMap.size();
+  }
+
+  @Override
+  public int removeEdges(I targetVertexId) {
+    return edgeMap.remove(targetVertexId) != null ? 1 : 0;
+  }
+
+  @Override
+  public final void sendMessageToAllEdges(M message) {
+    for (I targetVertexId : edgeMap.keySet()) {
+      sendMessage(targetVertexId, message);
+    }
+  }
+
+  @Override
+  public final void readFields(DataInput in) throws IOException {
+    I vertexId = getConf().createVertexId();
+    vertexId.readFields(in);
+    V vertexValue = getConf().createVertexValue();
+    vertexValue.readFields(in);
+    initialize(vertexId, vertexValue);
+
+    int numEdges = in.readInt();
+    edgeMap = Maps.newHashMapWithExpectedSize(numEdges);
+    for (int i = 0; i < numEdges; ++i) {
+      I targetVertexId = getConf().createVertexId();
+      targetVertexId.readFields(in);
+      E edgeValue = getConf().createEdgeValue();
+      edgeValue.readFields(in);
+      edgeMap.put(targetVertexId, edgeValue);
+    }
+
+    readHaltBoolean(in);
+  }
+
+  @Override
+  public final void write(DataOutput out) throws IOException {
+    getId().write(out);
+    getValue().write(out);
+
+    out.writeInt(edgeMap.size());
+    for (Map.Entry<I, E> edge : edgeMap.entrySet()) {
+      edge.getKey().write(out);
+      edge.getValue().write(out);
+    }
+
+    out.writeBoolean(isHalted());
+  }
+
+}
+

http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph-core/src/main/java/org/apache/giraph/graph/InputSplitEvents.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/InputSplitEvents.java b/giraph-core/src/main/java/org/apache/giraph/graph/InputSplitEvents.java
new file mode 100644
index 0000000..23be1c4
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/InputSplitEvents.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.graph;
+
+import org.apache.giraph.zk.BspEvent;
+import org.apache.giraph.zk.PredicateLock;
+import org.apache.hadoop.util.Progressable;
+
+/**
+ * Simple container of input split events.
+ */
+public class InputSplitEvents {
+  /** Input splits are ready for consumption by workers */
+  private final BspEvent allReadyChanged;
+  /** Input split reservation or finished notification and synchronization */
+  private final BspEvent stateChanged;
+  /** Input splits are done being processed by workers */
+  private final BspEvent allDoneChanged;
+  /** Input split done by a worker finished notification and synchronization */
+  private final BspEvent doneStateChanged;
+
+  /**
+   * Constructor.
+   *
+   * @param progressable {@link Progressable} to report progress
+   */
+  public InputSplitEvents(Progressable progressable) {
+    allReadyChanged = new PredicateLock(progressable);
+    stateChanged = new PredicateLock(progressable);
+    allDoneChanged = new PredicateLock(progressable);
+    doneStateChanged = new PredicateLock(progressable);
+  }
+
+  /**
+   * Get event for input splits all ready
+   *
+   * @return {@link BspEvent} for input splits all ready
+   */
+  public BspEvent getAllReadyChanged() {
+    return allReadyChanged;
+  }
+
+  /**
+   * Get event for input splits state
+   *
+   * @return {@link BspEvent} for input splits state
+   */
+  public BspEvent getStateChanged() {
+    return stateChanged;
+  }
+
+  /**
+   * Get event for input splits all done
+   *
+   * @return {@link BspEvent} for input splits all done
+   */
+  public BspEvent getAllDoneChanged() {
+    return allDoneChanged;
+  }
+
+  /**
+   * Get event for input split done
+   *
+   * @return {@link BspEvent} for input split done
+   */
+  public BspEvent getDoneStateChanged() {
+    return doneStateChanged;
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph-core/src/main/java/org/apache/giraph/graph/InputSplitPathOrganizer.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/InputSplitPathOrganizer.java b/giraph-core/src/main/java/org/apache/giraph/graph/InputSplitPathOrganizer.java
new file mode 100644
index 0000000..3e9e56d
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/InputSplitPathOrganizer.java
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.graph;
+
+import com.google.common.collect.Lists;
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Iterator;
+import org.apache.giraph.zk.ZooKeeperExt;
+import org.apache.hadoop.io.Text;
+import org.apache.zookeeper.KeeperException;
+
+/**
+ * Utility class to extract the list of InputSplits from the
+ * ZooKeeper tree of "claimable splits" the master created,
+ * and to sort the list to favor local data blocks.
+ *
+ * This class provides an Iterator for the list the worker will
+ * claim splits from, making all sorting and data-code locality
+ * processing done here invisible to callers. The aim is to cut
+ * down on the number of ZK reads workers perform before locating
+ * an unclaimed InputSplit.
+ */
+public class InputSplitPathOrganizer implements Iterable<String> {
+  /** The worker's local ZooKeeperExt ref */
+  private final ZooKeeperExt zooKeeper;
+  /** The List of InputSplit znode paths */
+  private final List<String> pathList;
+  /** The worker's hostname */
+  private final String hostName;
+  /** The adjusted base offset by which to iterate on the path list */
+  private int baseOffset;
+
+  /**
+   * Constructor
+   *
+   * @param zooKeeper the worker's ZkExt
+   * @param zkPathList the path to read from
+   * @param hostName the worker's host name (for matching)
+   * @param port the port number for this worker
+   */
+  public InputSplitPathOrganizer(final ZooKeeperExt zooKeeper,
+    final String zkPathList, final String hostName, final int port)
+    throws KeeperException, InterruptedException {
+    this(zooKeeper, zooKeeper.getChildrenExt(zkPathList, false, false, true),
+        hostName, port);
+  }
+
+  /**
+   * Constructor
+   *
+   * @param zooKeeper the worker's ZkExt
+   * @param inputSplitPathList path of input splits to read from
+   * @param hostName the worker's host name (for matching)
+   * @param port the port number for this worker
+   */
+  public InputSplitPathOrganizer(
+      final ZooKeeperExt zooKeeper, final List<String> inputSplitPathList,
+      final String hostName, final int port)
+    throws KeeperException, InterruptedException {
+    this.zooKeeper = zooKeeper;
+    this.pathList = Lists.newArrayList(inputSplitPathList);
+    this.hostName = hostName;
+    this.baseOffset = 0; // set later after switching out local paths
+    prioritizeLocalInputSplits(port);
+  }
+
+ /**
+  * Re-order list of InputSplits so files local to this worker node's
+  * disk are the first it will iterate over when attempting to claim
+  * a split to read. This will increase locality of data reads with greater
+  * probability as the % of total nodes in the cluster hosting data and workers
+  * BOTH increase towards 100%. Replication increases our chances of a "hit."
+  *
+  * @param port the port number for hashing unique iteration indexes for all
+  *             workers, even those sharing the same host node.
+  */
+  private void prioritizeLocalInputSplits(final int port) {
+    List<String> sortedList = new ArrayList<String>();
+    String hosts = null;
+    for (Iterator<String> iterator = pathList.iterator(); iterator.hasNext();) {
+      final String path = iterator.next();
+      try {
+        hosts = getLocationsFromZkInputSplitData(path);
+      } catch (IOException ioe) {
+        hosts = null; // no problem, just don't sort this entry
+      } catch (KeeperException ke) {
+        hosts = null;
+      } catch (InterruptedException ie) {
+        hosts = null;
+      }
+      if (hosts != null && hosts.contains(hostName)) {
+        sortedList.add(path); // collect the local block
+        iterator.remove(); // remove local block from list
+      }
+    }
+    // shuffle the local blocks in case several workers exist on this host
+    Collections.shuffle(sortedList);
+    // determine the hash-based offset for this worker to iterate from
+    // and place the local blocks into the list at that index, if any
+    final int temp = hostName.hashCode() + (19 * port);
+    if (pathList.size() != 0) {
+      baseOffset = Math.abs(temp % pathList.size());
+    }
+    // re-insert local paths at "adjusted index zero" for caller to iterate on
+    pathList.addAll(baseOffset, sortedList);
+  }
+
+  /**
+   * Utility for extracting locality data from an InputSplit ZNode.
+   *
+   * @param zkSplitPath the input split path to attempt to read
+   * ZNode locality data from for this InputSplit.
+   * @return a String of hostnames from ZNode data, or throws
+   */
+  private String getLocationsFromZkInputSplitData(String zkSplitPath)
+    throws IOException, KeeperException, InterruptedException {
+    byte[] locationData = zooKeeper.getData(zkSplitPath, false, null);
+    DataInputStream inputStream =
+      new DataInputStream(new ByteArrayInputStream(locationData));
+    // only read the "first" entry in the znode data, the locations
+    return Text.readString(inputStream);
+  }
+
+  /**
+   * Utility accessor for Input Split znode path list size
+   *
+   * @return the size of <code>this.pathList</code>
+   */
+  public int getPathListSize() {
+    return this.pathList.size();
+  }
+
+  /**
+   * Iterator for the pathList
+   *
+   * @return an iterator for our list of input split paths
+   */
+  public Iterator<String> iterator() {
+    return new PathListIterator();
+  }
+
+  /**
+   * Iterator for path list that handles the locality and hash offsetting.
+   */
+  public class PathListIterator implements Iterator<String> {
+    /** the current iterator index */
+    private int currentIndex = 0;
+
+    /**
+     *  Do we have more list to iterate upon?
+     *
+     *  @return true if more path strings are available
+     */
+    @Override
+    public boolean hasNext() {
+      return currentIndex < pathList.size();
+    }
+
+    /**
+     * Return the next pathList element
+     *
+     * @return the next input split path
+     */
+    @Override
+    public String next() {
+      return pathList.get((baseOffset + currentIndex++) % pathList.size());
+    }
+
+    /** Just a placeholder; should not do anything! */
+    @Override
+    public void remove() {
+      throw new UnsupportedOperationException("Remove is not allowed.");
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph-core/src/main/java/org/apache/giraph/graph/InputSplitPaths.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/InputSplitPaths.java b/giraph-core/src/main/java/org/apache/giraph/graph/InputSplitPaths.java
new file mode 100644
index 0000000..4cf005e
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/InputSplitPaths.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.graph;
+
+/**
+ * Simple container of input split paths for coordination via ZooKeeper.
+ */
+public class InputSplitPaths {
+  /** Path to the input splits written by the master */
+  private final String path;
+  /** Path to the input splits all ready to be processed by workers */
+  private final String allReadyPath;
+  /** Path to the input splits done */
+  private final String donePath;
+  /** Path to the input splits all done to notify the workers to proceed */
+  private final String allDonePath;
+
+  /**
+   * Constructor.
+   *
+   * @param basePath Base path
+   * @param dir Input splits path
+   * @param doneDir Input split done path
+   * @param allReadyNode Input splits all ready path
+   * @param allDoneNode Input splits all done path
+   */
+  public InputSplitPaths(String basePath,
+                         String dir,
+                         String doneDir,
+                         String allReadyNode,
+                         String allDoneNode) {
+    path = basePath + dir;
+    allReadyPath = basePath + allReadyNode;
+    donePath = basePath + doneDir;
+    allDonePath = basePath + allDoneNode;
+  }
+
+  /**
+   * Get path to the input splits.
+   *
+   * @return Path to input splits
+   */
+  public String getPath() {
+    return path;
+  }
+
+  /**
+   * Get path to the input splits all ready.
+   *
+   * @return Path to input splits all ready
+   */
+  public String getAllReadyPath() {
+    return allReadyPath;
+  }
+
+  /** Get path to the input splits done.
+   *
+   * @return Path to input splits done
+   */
+  public String getDonePath() {
+    return donePath;
+  }
+
+  /**
+   * Get path to the input splits all done.
+   *
+   * @return Path to input splits all done
+   */
+  public String getAllDonePath() {
+    return allDonePath;
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph-core/src/main/java/org/apache/giraph/graph/InputSplitsCallable.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/InputSplitsCallable.java b/giraph-core/src/main/java/org/apache/giraph/graph/InputSplitsCallable.java
new file mode 100644
index 0000000..722b85e
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/InputSplitsCallable.java
@@ -0,0 +1,389 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.graph;
+
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.comm.WorkerClientRequestProcessor;
+import org.apache.giraph.comm.netty.NettyWorkerClientRequestProcessor;
+import org.apache.giraph.utils.SystemTime;
+import org.apache.giraph.utils.Time;
+import org.apache.giraph.utils.Times;
+import org.apache.giraph.zk.ZooKeeperExt;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.data.Stat;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.Callable;
+
+/**
+ * Abstract base class for loading vertex/edge input splits.
+ * Every thread will has its own instance of WorkerClientRequestProcessor
+ * to send requests.
+ *
+ * @param <I> Vertex index value
+ * @param <V> Vertex value
+ * @param <E> Edge value
+ * @param <M> Message data
+ */
+public abstract class InputSplitsCallable<I extends WritableComparable,
+    V extends Writable, E extends Writable, M extends Writable>
+    implements Callable<VertexEdgeCount> {
+  /** Name of counter for vertices loaded */
+  public static final String COUNTER_VERTICES_LOADED = "vertices-loaded";
+  /** Name of counter for edges loaded */
+  public static final String COUNTER_EDGES_LOADED = "edges-loaded";
+  /** Class logger */
+  private static final Logger LOG = Logger.getLogger(InputSplitsCallable.class);
+  /** Class time object */
+  private static final Time TIME = SystemTime.get();
+  /** Configuration */
+  protected final ImmutableClassesGiraphConfiguration<I, V, E, M>
+  configuration;
+  /** Context */
+  protected final Mapper<?, ?, ?, ?>.Context context;
+  /** Graph state */
+  private final GraphState<I, V, E, M> graphState;
+  /** Handles IPC communication */
+  private final WorkerClientRequestProcessor<I, V, E, M>
+  workerClientRequestProcessor;
+  /**
+   * Stores and processes the list of InputSplits advertised
+   * in a tree of child znodes by the master.
+   */
+  private final InputSplitPathOrganizer splitOrganizer;
+  /** ZooKeeperExt handle */
+  private final ZooKeeperExt zooKeeperExt;
+  /** Get the start time in nanos */
+  private final long startNanos = TIME.getNanoseconds();
+  /** ZooKeeper input split reserved node. */
+  private final String inputSplitReservedNode;
+  /** ZooKeeper input split finished node. */
+  private final String inputSplitFinishedNode;
+  /** Input split events. */
+  private final InputSplitEvents inputSplitEvents;
+
+  // CHECKSTYLE: stop ParameterNumberCheck
+  /**
+   * Constructor.
+   *
+   * @param context Context
+   * @param graphState Graph state
+   * @param configuration Configuration
+   * @param bspServiceWorker service worker
+   * @param inputSplitPathList List of the paths of the input splits
+   * @param workerInfo This worker's info
+   * @param zooKeeperExt Handle to ZooKeeperExt
+   * @param inputSplitReservedNode Path to input split reserved
+   * @param inputSplitFinishedNode Path to input split finsished
+   * @param inputSplitEvents Input split events
+   */
+  public InputSplitsCallable(
+      Mapper<?, ?, ?, ?>.Context context,
+      GraphState<I, V, E, M> graphState,
+      ImmutableClassesGiraphConfiguration<I, V, E, M> configuration,
+      BspServiceWorker<I, V, E, M> bspServiceWorker,
+      List<String> inputSplitPathList,
+      WorkerInfo workerInfo,
+      ZooKeeperExt zooKeeperExt,
+      String inputSplitReservedNode,
+      String inputSplitFinishedNode,
+      InputSplitEvents inputSplitEvents) {
+    this.zooKeeperExt = zooKeeperExt;
+    this.context = context;
+    this.workerClientRequestProcessor =
+        new NettyWorkerClientRequestProcessor<I, V, E, M>(
+            context, configuration, bspServiceWorker);
+    this.graphState = new GraphState<I, V, E, M>(graphState.getSuperstep(),
+        graphState.getTotalNumVertices(), graphState.getTotalNumEdges(),
+        context, graphState.getGraphMapper(), workerClientRequestProcessor,
+        null);
+    try {
+      splitOrganizer = new InputSplitPathOrganizer(zooKeeperExt,
+          inputSplitPathList, workerInfo.getHostname(), workerInfo.getPort());
+    } catch (KeeperException e) {
+      throw new IllegalStateException(
+          "InputSplitsCallable: KeeperException", e);
+    } catch (InterruptedException e) {
+      throw new IllegalStateException(
+          "InputSplitsCallable: InterruptedException", e);
+    }
+    this.configuration = configuration;
+    this.inputSplitReservedNode = inputSplitReservedNode;
+    this.inputSplitFinishedNode = inputSplitFinishedNode;
+    this.inputSplitEvents = inputSplitEvents;
+  }
+  // CHECKSTYLE: resume ParameterNumberCheck
+
+  /**
+   * Load vertices/edges from the given input split.
+   *
+   * @param inputSplit Input split to load
+   * @param graphState Graph state
+   * @return Count of vertices and edges loaded
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  protected abstract VertexEdgeCount readInputSplit(
+      InputSplit inputSplit,
+      GraphState<I, V, E, M> graphState)
+    throws IOException, InterruptedException;
+
+  @Override
+  public VertexEdgeCount call() {
+    VertexEdgeCount vertexEdgeCount = new VertexEdgeCount();
+    String inputSplitPath;
+    int inputSplitsProcessed = 0;
+    try {
+      while ((inputSplitPath = reserveInputSplit()) != null) {
+        vertexEdgeCount = vertexEdgeCount.incrVertexEdgeCount(
+            loadInputSplit(inputSplitPath,
+                graphState));
+        context.progress();
+        ++inputSplitsProcessed;
+      }
+    } catch (KeeperException e) {
+      throw new IllegalStateException("call: KeeperException", e);
+    } catch (InterruptedException e) {
+      throw new IllegalStateException("call: InterruptedException", e);
+    } catch (IOException e) {
+      throw new IllegalStateException("call: IOException", e);
+    } catch (ClassNotFoundException e) {
+      throw new IllegalStateException("call: ClassNotFoundException", e);
+    } catch (InstantiationException e) {
+      throw new IllegalStateException("call: InstantiationException", e);
+    } catch (IllegalAccessException e) {
+      throw new IllegalStateException("call: IllegalAccessException", e);
+    }
+
+    if (LOG.isInfoEnabled()) {
+      float seconds = Times.getNanosSince(TIME, startNanos) /
+          Time.NS_PER_SECOND_AS_FLOAT;
+      float verticesPerSecond = vertexEdgeCount.getVertexCount() / seconds;
+      float edgesPerSecond = vertexEdgeCount.getEdgeCount() / seconds;
+      LOG.info("call: Loaded " + inputSplitsProcessed + " " +
+          "input splits in " + seconds + " secs, " + vertexEdgeCount +
+          " " + verticesPerSecond + " vertices/sec, " +
+          edgesPerSecond + " edges/sec");
+    }
+    try {
+      workerClientRequestProcessor.flush();
+    } catch (IOException e) {
+      throw new IllegalStateException("call: Flushing failed.", e);
+    }
+    return vertexEdgeCount;
+  }
+
+  /**
+   * Try to reserve an InputSplit for loading.  While InputSplits exists that
+   * are not finished, wait until they are.
+   *
+   * NOTE: iterations on the InputSplit list only halt for each worker when it
+   * has scanned the entire list once and found every split marked RESERVED.
+   * When a worker fails, its Ephemeral RESERVED znodes will disappear,
+   * allowing other iterating workers to claim it's previously read splits.
+   * Only when the last worker left iterating on the list fails can a danger
+   * of data loss occur. Since worker failure in INPUT_SUPERSTEP currently
+   * causes job failure, this is OK. As the failure model evolves, this
+   * behavior might need to change.
+   *
+   * @return reserved InputSplit or null if no unfinished InputSplits exist
+   * @throws org.apache.zookeeper.KeeperException
+   * @throws InterruptedException
+   */
+  private String reserveInputSplit()
+    throws KeeperException, InterruptedException {
+    String reservedInputSplitPath = null;
+    Stat reservedStat;
+    while (true) {
+      int reservedInputSplits = 0;
+      for (String nextSplitToClaim : splitOrganizer) {
+        context.progress();
+        String tmpInputSplitReservedPath = nextSplitToClaim +
+            inputSplitReservedNode;
+        reservedStat =
+            zooKeeperExt.exists(tmpInputSplitReservedPath, true);
+        if (reservedStat == null) {
+          try {
+            // Attempt to reserve this InputSplit
+            zooKeeperExt.createExt(tmpInputSplitReservedPath,
+                null,
+                ZooDefs.Ids.OPEN_ACL_UNSAFE,
+                CreateMode.EPHEMERAL,
+                false);
+            reservedInputSplitPath = nextSplitToClaim;
+            if (LOG.isInfoEnabled()) {
+              float percentFinished =
+                  reservedInputSplits * 100.0f /
+                      splitOrganizer.getPathListSize();
+              LOG.info("reserveInputSplit: Reserved input " +
+                  "split path " + reservedInputSplitPath +
+                  ", overall roughly " +
+                  + percentFinished +
+                  "% input splits reserved");
+            }
+            return reservedInputSplitPath;
+          } catch (KeeperException.NodeExistsException e) {
+            LOG.info("reserveInputSplit: Couldn't reserve " +
+                "(already reserved) inputSplit" +
+                " at " + tmpInputSplitReservedPath);
+          } catch (KeeperException e) {
+            throw new IllegalStateException(
+                "reserveInputSplit: KeeperException on reserve", e);
+          } catch (InterruptedException e) {
+            throw new IllegalStateException(
+                "reserveInputSplit: InterruptedException " +
+                    "on reserve", e);
+          }
+        } else {
+          ++reservedInputSplits;
+        }
+      }
+      if (LOG.isInfoEnabled()) {
+        LOG.info("reserveInputSplit: reservedPath = " +
+            reservedInputSplitPath + ", " + reservedInputSplits +
+            " of " + splitOrganizer.getPathListSize() +
+            " InputSplits are finished.");
+      }
+      if (reservedInputSplits == splitOrganizer.getPathListSize()) {
+        return null;
+      }
+      context.progress();
+      // Wait for either a reservation to go away or a notification that
+      // an InputSplit has finished.
+      context.progress();
+      inputSplitEvents.getStateChanged().waitMsecs(
+          60 * 1000);
+      inputSplitEvents.getStateChanged().reset();
+    }
+  }
+
+  /**
+   * Mark an input split path as completed by this worker.  This notifies
+   * the master and the other workers that this input split has not only
+   * been reserved, but also marked processed.
+   *
+   * @param inputSplitPath Path to the input split.
+   */
+  private void markInputSplitPathFinished(String inputSplitPath) {
+    String inputSplitFinishedPath =
+        inputSplitPath + inputSplitFinishedNode;
+    try {
+      zooKeeperExt.createExt(inputSplitFinishedPath,
+          null,
+          ZooDefs.Ids.OPEN_ACL_UNSAFE,
+          CreateMode.PERSISTENT,
+          true);
+    } catch (KeeperException.NodeExistsException e) {
+      LOG.warn("markInputSplitPathFinished: " + inputSplitFinishedPath +
+          " already exists!");
+    } catch (KeeperException e) {
+      throw new IllegalStateException(
+          "markInputSplitPathFinished: KeeperException on " +
+              inputSplitFinishedPath, e);
+    } catch (InterruptedException e) {
+      throw new IllegalStateException(
+          "markInputSplitPathFinished: InterruptedException on " +
+              inputSplitFinishedPath, e);
+    }
+  }
+
+  /**
+   * Extract vertices from input split, saving them into a mini cache of
+   * partitions.  Periodically flush the cache of vertices when a limit is
+   * reached in readVerticeFromInputSplit.
+   * Mark the input split finished when done.
+   *
+   * @param inputSplitPath ZK location of input split
+   * @param graphState Current graph state
+   * @return Mapping of vertex indices and statistics, or null if no data read
+   * @throws IOException
+   * @throws ClassNotFoundException
+   * @throws InterruptedException
+   * @throws InstantiationException
+   * @throws IllegalAccessException
+   */
+  private VertexEdgeCount loadInputSplit(
+      String inputSplitPath,
+      GraphState<I, V, E, M> graphState)
+    throws IOException, ClassNotFoundException, InterruptedException,
+      InstantiationException, IllegalAccessException {
+    InputSplit inputSplit = getInputSplit(inputSplitPath);
+    VertexEdgeCount vertexEdgeCount =
+        readInputSplit(inputSplit, graphState);
+    if (LOG.isInfoEnabled()) {
+      LOG.info("loadFromInputSplit: Finished loading " +
+          inputSplitPath + " " + vertexEdgeCount);
+    }
+    markInputSplitPathFinished(inputSplitPath);
+    return vertexEdgeCount;
+  }
+
+  /**
+   * Talk to ZooKeeper to convert the input split path to the actual
+   * InputSplit.
+   *
+   * @param inputSplitPath Location in ZK of input split
+   * @return instance of InputSplit
+   * @throws IOException
+   * @throws ClassNotFoundException
+   */
+  protected InputSplit getInputSplit(String inputSplitPath)
+    throws IOException, ClassNotFoundException {
+    byte[] splitList;
+    try {
+      splitList = zooKeeperExt.getData(inputSplitPath, false, null);
+    } catch (KeeperException e) {
+      throw new IllegalStateException(
+          "getInputSplit: KeeperException on " + inputSplitPath, e);
+    } catch (InterruptedException e) {
+      throw new IllegalStateException(
+          "getInputSplit: IllegalStateException on " + inputSplitPath, e);
+    }
+    context.progress();
+
+    DataInputStream inputStream =
+        new DataInputStream(new ByteArrayInputStream(splitList));
+    Text.readString(inputStream); // location data unused here, skip
+    String inputSplitClass = Text.readString(inputStream);
+    InputSplit inputSplit = (InputSplit)
+        ReflectionUtils.newInstance(
+            configuration.getClassByName(inputSplitClass),
+            configuration);
+    ((Writable) inputSplit).readFields(inputStream);
+
+    if (LOG.isInfoEnabled()) {
+      LOG.info("getInputSplit: Reserved " + inputSplitPath +
+          " from ZooKeeper and got input split '" +
+          inputSplit.toString() + "'");
+    }
+    return inputSplit;
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph-core/src/main/java/org/apache/giraph/graph/InputSplitsCallableFactory.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/InputSplitsCallableFactory.java b/giraph-core/src/main/java/org/apache/giraph/graph/InputSplitsCallableFactory.java
new file mode 100644
index 0000000..040d348
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/InputSplitsCallableFactory.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.graph;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+/**
+ * Factory class for creating {@link InputSplitsCallable}s.
+ *
+ * @param <I> Vertex id
+ * @param <V> Vertex value
+ * @param <E> Edge value
+ * @param <M> Message data
+ */
+public interface InputSplitsCallableFactory<I extends WritableComparable,
+    V extends Writable, E extends Writable, M extends Writable> {
+  /**
+   * Return a newly-created {@link InputSplitsCallable}.
+   *
+   * @return A new {@link InputSplitsCallable}
+   */
+  InputSplitsCallable<I, V, E, M> newCallable();
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph-core/src/main/java/org/apache/giraph/graph/IntIntNullIntVertex.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/IntIntNullIntVertex.java b/giraph-core/src/main/java/org/apache/giraph/graph/IntIntNullIntVertex.java
new file mode 100644
index 0000000..374d1d2
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/IntIntNullIntVertex.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.graph;
+
+import com.google.common.collect.Iterables;
+import org.apache.giraph.utils.UnmodifiableIntArrayIterator;
+import org.apache.hadoop.io.IntWritable;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Iterator;
+
+/**
+ * Simple implementation of {@link Vertex} using an int as id, value and
+ * message.  Edges are immutable and unweighted. This class aims to be as
+ * memory efficient as possible.
+ */
+public abstract class IntIntNullIntVertex extends
+    SimpleVertex<IntWritable, IntWritable, IntWritable> {
+  /** Int array of neighbor vertex ids */
+  private int[] neighbors;
+
+  @Override
+  public void setNeighbors(Iterable<IntWritable> neighbors) {
+    this.neighbors =
+        new int[(neighbors != null) ? Iterables.size(neighbors) : 0];
+    int n = 0;
+    if (neighbors != null) {
+      for (IntWritable neighbor : neighbors) {
+        this.neighbors[n++] = neighbor.get();
+      }
+    }
+  }
+
+  @Override
+  public Iterable<IntWritable> getNeighbors() {
+    return new Iterable<IntWritable>() {
+      @Override
+      public Iterator<IntWritable> iterator() {
+        return new UnmodifiableIntArrayIterator(neighbors);
+      }
+    };
+  }
+
+  @Override
+  public boolean hasEdge(IntWritable targetVertexId) {
+    for (int neighbor : neighbors) {
+      if (neighbor == targetVertexId.get()) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  @Override
+  public int getNumEdges() {
+    return neighbors.length;
+  }
+
+  @Override
+  public void write(final DataOutput out) throws IOException {
+    out.writeInt(getId().get());
+    out.writeInt(getValue().get());
+    out.writeInt(neighbors.length);
+    for (int n = 0; n < neighbors.length; n++) {
+      out.writeInt(neighbors[n]);
+    }
+    out.writeBoolean(isHalted());
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    int id = in.readInt();
+    int value = in.readInt();
+    initialize(new IntWritable(id), new IntWritable(value));
+    int numEdges = in.readInt();
+    neighbors = new int[numEdges];
+    for (int n = 0; n < numEdges; n++) {
+      neighbors[n] = in.readInt();
+    }
+    readHaltBoolean(in);
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph-core/src/main/java/org/apache/giraph/graph/IntNullNullNullVertex.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/IntNullNullNullVertex.java b/giraph-core/src/main/java/org/apache/giraph/graph/IntNullNullNullVertex.java
new file mode 100644
index 0000000..65c3aa6
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/IntNullNullNullVertex.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.graph;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.NullWritable;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+/**
+ * A vertex with no value, edges, or messages. Just an ID, nothing more.
+ */
+public abstract class IntNullNullNullVertex extends Vertex<IntWritable,
+    NullWritable, NullWritable, NullWritable> {
+  @Override
+  public void setEdges(Iterable<Edge<IntWritable, NullWritable>> edges) { }
+
+  @Override
+  public Iterable<Edge<IntWritable, NullWritable>> getEdges() {
+    return ImmutableList.of();
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    getId().write(out);
+    out.writeBoolean(isHalted());
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    int id = in.readInt();
+    initialize(new IntWritable(id), NullWritable.get());
+    boolean halt = in.readBoolean();
+    if (halt) {
+      voteToHalt();
+    } else {
+      wakeUp();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph-core/src/main/java/org/apache/giraph/graph/LongDoubleFloatDoubleEdgeListVertex.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/LongDoubleFloatDoubleEdgeListVertex.java b/giraph-core/src/main/java/org/apache/giraph/graph/LongDoubleFloatDoubleEdgeListVertex.java
new file mode 100644
index 0000000..61b6a5b
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/LongDoubleFloatDoubleEdgeListVertex.java
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.graph;
+
+import com.google.common.collect.Iterables;
+import org.apache.giraph.utils.UnmodifiableLongFloatEdgeArrayIterable;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.LongWritable;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+/**
+ * Compact vertex representation with primitive arrays.
+ */
+public abstract class LongDoubleFloatDoubleEdgeListVertex extends
+    Vertex<LongWritable, DoubleWritable, FloatWritable, DoubleWritable> {
+  /** long represented vertex id */
+  private long id;
+  /** double represented vertex value */
+  private double value;
+  /** long array of neighbor vertex IDs */
+  private long[] neighbors;
+  /** float array of edge weights */
+  private float[] edgeWeights;
+
+  @Override
+  public void initialize(LongWritable vertexId, DoubleWritable vertexValue) {
+    id = vertexId.get();
+    value = vertexValue.get();
+  }
+
+  @Override
+  public void initialize(LongWritable vertexId, DoubleWritable vertexValue,
+                         Iterable<Edge<LongWritable, FloatWritable>> edges) {
+    id = vertexId.get();
+    value = vertexValue.get();
+    setEdges(edges);
+  }
+
+  @Override
+  public void setEdges(Iterable<Edge<LongWritable, FloatWritable>> edges) {
+    neighbors = new long[(edges != null) ? Iterables.size(edges) : 0];
+    edgeWeights = new float[(edges != null) ? Iterables.size(edges) : 0];
+    int n = 0;
+    if (edges != null) {
+      for (Edge<LongWritable, FloatWritable> edge : edges) {
+        neighbors[n] = edge.getTargetVertexId().get();
+        edgeWeights[n] = edge.getValue().get();
+        n++;
+      }
+    }
+  }
+
+  @Override
+  public LongWritable getId() {
+    return new LongWritable(id);
+  }
+
+  @Override
+  public DoubleWritable getValue() {
+    return new DoubleWritable(value);
+  }
+
+  @Override
+  public void setValue(DoubleWritable vertexValue) {
+    value = vertexValue.get();
+  }
+
+  @Override
+  public Iterable<Edge<LongWritable, FloatWritable>> getEdges() {
+    return new UnmodifiableLongFloatEdgeArrayIterable(neighbors, edgeWeights);
+  }
+
+  @Override
+  public FloatWritable getEdgeValue(LongWritable targetVertexId) {
+    int idx = 0;
+    for (long neighbor : neighbors) {
+      if (neighbor == targetVertexId.get()) {
+        return new FloatWritable(edgeWeights[idx]);
+      }
+      idx++;
+    }
+    return null;
+  }
+
+  @Override
+  public boolean hasEdge(LongWritable targetVertexId) {
+    for (long neighbor : neighbors) {
+      if (neighbor == targetVertexId.get()) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  @Override
+  public int getNumEdges() {
+    return neighbors.length;
+  }
+
+  @Override
+  public void sendMessageToAllEdges(final DoubleWritable message) {
+    for (long neighbor : neighbors) {
+      sendMessage(new LongWritable(neighbor), message);
+    }
+  }
+
+  @Override
+  public void write(final DataOutput out) throws IOException {
+    out.writeLong(id);
+    out.writeDouble(value);
+    out.writeInt(neighbors.length);
+    for (int n = 0; n < neighbors.length; n++) {
+      out.writeLong(neighbors[n]);
+    }
+    for (int n = 0; n < edgeWeights.length; n++) {
+      out.writeFloat(edgeWeights[n]);
+    }
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    id = in.readLong();
+    value = in.readDouble();
+    int numEdges = in.readInt();
+    neighbors = new long[numEdges];
+    for (int n = 0; n < numEdges; n++) {
+      neighbors[n] = in.readLong();
+    }
+    edgeWeights = new float[numEdges];
+    for (int n = 0; n < numEdges; n++) {
+      edgeWeights[n] = in.readFloat();
+    }
+  }
+
+  @Override
+  public int hashCode() {
+    final int prime = 31;
+    int result = 1;
+    result = prime * result + (int) (id ^ (id >>> 32));
+    return result;
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (this == obj) {
+      return true;
+    }
+    if (obj == null) {
+      return false;
+    }
+    if (!(obj instanceof LongDoubleFloatDoubleEdgeListVertex)) {
+      return false;
+    }
+    LongDoubleFloatDoubleEdgeListVertex other =
+        (LongDoubleFloatDoubleEdgeListVertex) obj;
+    if (id != other.id) {
+      return false;
+    }
+    return true;
+  }
+}


Mime
View raw message