incubator-giraph-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ach...@apache.org
Subject svn commit: r1245205 [12/18] - in /incubator/giraph/trunk: ./ src/main/java/org/apache/giraph/ src/main/java/org/apache/giraph/benchmark/ src/main/java/org/apache/giraph/bsp/ src/main/java/org/apache/giraph/comm/ src/main/java/org/apache/giraph/example...
Date Thu, 16 Feb 2012 22:12:36 GMT
Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GraphMapper.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GraphMapper.java?rev=1245205&r1=1245204&r2=1245205&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GraphMapper.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GraphMapper.java Thu Feb 16 22:12:31 2012
@@ -48,598 +48,614 @@ import java.util.List;
  * This mapper that will execute the BSP graph tasks.  Since this mapper will
  * not be passing data by key-value pairs through the MR framework, the
  * types are irrelevant.
+ *
+ * @param <I> Vertex id
+ * @param <V> Vertex data
+ * @param <E> Edge data
+ * @param <M> Message data
  */
 @SuppressWarnings("rawtypes")
 public class GraphMapper<I extends WritableComparable, V extends Writable,
-        E extends Writable, M extends Writable> extends
-        Mapper<Object, Object, Object, Object> {
-    /** Class logger */
-    private static final Logger LOG = Logger.getLogger(GraphMapper.class);
-    /** Coordination service worker */
-    CentralizedServiceWorker<I, V, E, M> serviceWorker;
-    /** Coordination service master thread */
-    Thread masterThread = null;
-    /** The map should be run exactly once, or else there is a problem. */
-    boolean mapAlreadyRun = false;
-    /** Manages the ZooKeeper servers if necessary (dynamic startup) */
-    private ZooKeeperManager zkManager;
-    /** Configuration */
-    private Configuration conf;
-    /** Already complete? */
-    private boolean done = false;
-    /** What kind of functions is this mapper doing? */
-    private MapFunctions mapFunctions = MapFunctions.UNKNOWN;
-    /**
-     * Graph state for all vertices that is used for the duration of
-     * this mapper.
-     */
-    private GraphState<I,V,E,M> graphState = new GraphState<I, V, E, M>();
-
-    /** What kinds of functions to run on this mapper */
-    public enum MapFunctions {
-        UNKNOWN,
-        MASTER_ONLY,
-        MASTER_ZOOKEEPER_ONLY,
-        WORKER_ONLY,
-        ALL,
-        ALL_EXCEPT_ZOOKEEPER
-    }
-
-    /**
-     * Get the map function enum
-     */
-    public MapFunctions getMapFunctions() {
-        return mapFunctions;
-    }
-
-    /**
-     * Get the aggregator usage, a subset of the functionality
-     *
-     * @return Aggregator usage interface
-     */
-    public final AggregatorUsage getAggregatorUsage() {
-        return serviceWorker;
-    }
-
-    public final WorkerContext getWorkerContext() {
-    	return serviceWorker.getWorkerContext();
-    }
-
-    public final GraphState<I,V,E,M> getGraphState() {
-      return graphState;
-    }
-
-    /**
-     * Default handler for uncaught exceptions.
-     */
-    class OverrideExceptionHandler
-            implements Thread.UncaughtExceptionHandler {
-        public void uncaughtException(Thread t, Throwable e) {
-            LOG.fatal(
-                "uncaughtException: OverrideExceptionHandler on thread " +
-                t.getName() + ", msg = " +  e.getMessage() +
-                ", exiting...", e);
-            System.exit(1);
-        }
-    }
-
-    /**
-     * Copied from JobConf to get the location of this jar.  Workaround for
-     * things like Oozie map-reduce jobs.
-     *
-     * @param my_class Class to search the class loader path for to locate
-     *        the relevant jar file
-     * @return Location of the jar file containing my_class
-     */
-    private static String findContainingJar(Class<?> my_class) {
-        ClassLoader loader = my_class.getClassLoader();
-        String class_file =
-            my_class.getName().replaceAll("\\.", "/") + ".class";
-        try {
-            for(Enumeration<?> itr = loader.getResources(class_file);
-            itr.hasMoreElements();) {
-                URL url = (URL) itr.nextElement();
-                if ("jar".equals(url.getProtocol())) {
-                    String toReturn = url.getPath();
-                    if (toReturn.startsWith("file:")) {
-                        toReturn = toReturn.substring("file:".length());
-                    }
-                    toReturn = URLDecoder.decode(toReturn, "UTF-8");
-                    return toReturn.replaceAll("!.*$", "");
-                }
-            }
-        } catch (IOException e) {
-            throw new RuntimeException(e);
-        }
-        return null;
-    }
-
-    /**
-     * Make sure that all registered classes have matching types.  This
-     * is a little tricky due to type erasure, cannot simply get them from
-     * the class type arguments.  Also, set the vertex index, vertex value,
-     * edge value and message value classes.
-     *
-     * @param conf Configuration to get the various classes
-     */
-    public void determineClassTypes(Configuration conf) {
-        Class<? extends BasicVertex<I, V, E, M>> vertexClass =
-            BspUtils.<I, V, E, M>getVertexClass(conf);
-        List<Class<?>> classList = ReflectionUtils.<BasicVertex>getTypeArguments(
-            BasicVertex.class, vertexClass);
-        Type vertexIndexType = classList.get(0);
-        Type vertexValueType = classList.get(1);
-        Type edgeValueType = classList.get(2);
-        Type messageValueType = classList.get(3);
-
-        Class<? extends VertexInputFormat<I, V, E, M>> vertexInputFormatClass =
-            BspUtils.<I, V, E, M>getVertexInputFormatClass(conf);
-        classList = ReflectionUtils.<VertexInputFormat>getTypeArguments(
-            VertexInputFormat.class, vertexInputFormatClass);
-        if (classList.get(0) == null) {
-            LOG.warn("Input format vertex index type is not known");
-        } else if (!vertexIndexType.equals(classList.get(0))) {
-            throw new IllegalArgumentException(
-                "checkClassTypes: Vertex index types don't match, " +
+    E extends Writable, M extends Writable> extends
+    Mapper<Object, Object, Object, Object> {
+  /** Class logger */
+  private static final Logger LOG = Logger.getLogger(GraphMapper.class);
+  /** Coordination service worker */
+  private CentralizedServiceWorker<I, V, E, M> serviceWorker;
+  /** Coordination service master thread */
+  private Thread masterThread = null;
+  /** The map should be run exactly once, or else there is a problem. */
+  private boolean mapAlreadyRun = false;
+  /** Manages the ZooKeeper servers if necessary (dynamic startup) */
+  private ZooKeeperManager zkManager;
+  /** Configuration */
+  private Configuration conf;
+  /** Already complete? */
+  private boolean done = false;
+  /** What kind of functions is this mapper doing? */
+  private MapFunctions mapFunctions = MapFunctions.UNKNOWN;
+  /**
+   * Graph state for all vertices that is used for the duration of
+   * this mapper.
+   */
+  private GraphState<I, V, E, M> graphState = new GraphState<I, V, E, M>();
+
+  /** What kinds of functions to run on this mapper */
+  public enum MapFunctions {
+    /** Undecided yet */
+    UNKNOWN,
+    /** Only be the master */
+    MASTER_ONLY,
+    /** Only be the master and ZooKeeper */
+    MASTER_ZOOKEEPER_ONLY,
+    /** Only be the worker */
+    WORKER_ONLY,
+    /** Do master, worker, and ZooKeeper */
+    ALL,
+    /** Do master and worker */
+    ALL_EXCEPT_ZOOKEEPER
+  }
+
+  /**
+   * Get the map function enum.
+   *
+   * @return Map functions of this mapper.
+   */
+  public MapFunctions getMapFunctions() {
+    return mapFunctions;
+  }
+
+  /**
+   * Get the aggregator usage, a subset of the functionality
+   *
+   * @return Aggregator usage interface
+   */
+  public final AggregatorUsage getAggregatorUsage() {
+    return serviceWorker;
+  }
+
+  public final WorkerContext getWorkerContext() {
+    return serviceWorker.getWorkerContext();
+  }
+
+  public final GraphState<I, V, E, M> getGraphState() {
+    return graphState;
+  }
+
+  /**
+   * Default handler for uncaught exceptions.
+   */
+  class OverrideExceptionHandler implements Thread.UncaughtExceptionHandler {
+    @Override
+    public void uncaughtException(Thread t, Throwable e) {
+      LOG.fatal(
+          "uncaughtException: OverrideExceptionHandler on thread " +
+              t.getName() + ", msg = " +  e.getMessage() +
+              ", exiting...", e);
+      System.exit(1);
+    }
+  }
+
+  /**
+   * Copied from JobConf to get the location of this jar.  Workaround for
+   * things like Oozie map-reduce jobs.
+   *
+   * @param myClass Class to search the class loader path for to locate
+   *        the relevant jar file
+   * @return Location of the jar file containing myClass
+   */
+  private static String findContainingJar(Class<?> myClass) {
+    ClassLoader loader = myClass.getClassLoader();
+    String classFile =
+        myClass.getName().replaceAll("\\.", "/") + ".class";
+    try {
+      for (Enumeration<?> itr = loader.getResources(classFile);
+          itr.hasMoreElements();) {
+        URL url = (URL) itr.nextElement();
+        if ("jar".equals(url.getProtocol())) {
+          String toReturn = url.getPath();
+          if (toReturn.startsWith("file:")) {
+            toReturn = toReturn.substring("file:".length());
+          }
+          toReturn = URLDecoder.decode(toReturn, "UTF-8");
+          return toReturn.replaceAll("!.*$", "");
+        }
+      }
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+    return null;
+  }
+
+  /**
+   * Make sure that all registered classes have matching types.  This
+   * is a little tricky due to type erasure, cannot simply get them from
+   * the class type arguments.  Also, set the vertex index, vertex value,
+   * edge value and message value classes.
+   *
+   * @param conf Configuration to get the various classes
+   */
+  public void determineClassTypes(Configuration conf) {
+    Class<? extends BasicVertex<I, V, E, M>> vertexClass =
+        BspUtils.<I, V, E, M>getVertexClass(conf);
+    List<Class<?>> classList = ReflectionUtils.<BasicVertex>getTypeArguments(
+        BasicVertex.class, vertexClass);
+    Type vertexIndexType = classList.get(0);
+    Type vertexValueType = classList.get(1);
+    Type edgeValueType = classList.get(2);
+    Type messageValueType = classList.get(3);
+
+    Class<? extends VertexInputFormat<I, V, E, M>> vertexInputFormatClass =
+        BspUtils.<I, V, E, M>getVertexInputFormatClass(conf);
+    classList = ReflectionUtils.<VertexInputFormat>getTypeArguments(
+        VertexInputFormat.class, vertexInputFormatClass);
+    if (classList.get(0) == null) {
+      LOG.warn("Input format vertex index type is not known");
+    } else if (!vertexIndexType.equals(classList.get(0))) {
+      throw new IllegalArgumentException(
+          "checkClassTypes: Vertex index types don't match, " +
+              "vertex - " + vertexIndexType +
+              ", vertex input format - " + classList.get(0));
+    }
+    if (classList.get(1) == null) {
+      LOG.warn("Input format vertex value type is not known");
+    } else if (!vertexValueType.equals(classList.get(1))) {
+      throw new IllegalArgumentException(
+          "checkClassTypes: Vertex value types don't match, " +
+              "vertex - " + vertexValueType +
+              ", vertex input format - " + classList.get(1));
+    }
+    if (classList.get(2) == null) {
+      LOG.warn("Input format edge value type is not known");
+    } else if (!edgeValueType.equals(classList.get(2))) {
+      throw new IllegalArgumentException(
+          "checkClassTypes: Edge value types don't match, " +
+              "vertex - " + edgeValueType +
+              ", vertex input format - " + classList.get(2));
+    }
+    // If has vertex combiner class, check
+    Class<? extends VertexCombiner<I, M>> vertexCombinerClass =
+        BspUtils.<I, M>getVertexCombinerClass(conf);
+    if (vertexCombinerClass != null) {
+      classList = ReflectionUtils.<VertexCombiner>getTypeArguments(
+          VertexCombiner.class, vertexCombinerClass);
+      if (!vertexIndexType.equals(classList.get(0))) {
+        throw new IllegalArgumentException(
+            "checkClassTypes: Vertex index types don't match, " +
                 "vertex - " + vertexIndexType +
-                ", vertex input format - " + classList.get(0));
-        }
-        if (classList.get(1) == null) {
-            LOG.warn("Input format vertex value type is not known");
-        } else if (!vertexValueType.equals(classList.get(1))) {
-            throw new IllegalArgumentException(
-                "checkClassTypes: Vertex value types don't match, " +
+                ", vertex combiner - " + classList.get(0));
+      }
+      if (!messageValueType.equals(classList.get(1))) {
+        throw new IllegalArgumentException(
+            "checkClassTypes: Message value types don't match, " +
                 "vertex - " + vertexValueType +
-                ", vertex input format - " + classList.get(1));
-        }
-        if (classList.get(2) == null) {
-            LOG.warn("Input format edge value type is not known");
-        } else if (!edgeValueType.equals(classList.get(2))) {
-            throw new IllegalArgumentException(
-                "checkClassTypes: Edge value types don't match, " +
-                "vertex - " + edgeValueType +
-                ", vertex input format - " + classList.get(2));
-        }
-        // If has vertex combiner class, check
-        Class<? extends VertexCombiner<I, M>> vertexCombinerClass =
-            BspUtils.<I, M>getVertexCombinerClass(conf);
-        if (vertexCombinerClass != null) {
-            classList = ReflectionUtils.<VertexCombiner>getTypeArguments(
-                VertexCombiner.class, vertexCombinerClass);
-            if (!vertexIndexType.equals(classList.get(0))) {
-                throw new IllegalArgumentException(
-                    "checkClassTypes: Vertex index types don't match, " +
-                    "vertex - " + vertexIndexType +
-                    ", vertex combiner - " + classList.get(0));
-            }
-            if (!messageValueType.equals(classList.get(1))) {
-                throw new IllegalArgumentException(
-                    "checkClassTypes: Message value types don't match, " +
-                    "vertex - " + vertexValueType +
-                    ", vertex combiner - " + classList.get(1));
-            }
-        }
-        // If has vertex output format class, check
-        Class<? extends VertexOutputFormat<I, V, E>>
-            vertexOutputFormatClass =
-                BspUtils.<I, V, E>getVertexOutputFormatClass(conf);
-        if (vertexOutputFormatClass != null) {
-            classList =
-                ReflectionUtils.<VertexOutputFormat>getTypeArguments(
-                    VertexOutputFormat.class, vertexOutputFormatClass);
-            if (classList.get(0) == null) {
-                LOG.warn("Output format vertex index type is not known");
-            } else if (!vertexIndexType.equals(classList.get(0))) {
-                throw new IllegalArgumentException(
-                    "checkClassTypes: Vertex index types don't match, " +
-                    "vertex - " + vertexIndexType +
-                    ", vertex output format - " + classList.get(0));
-            }
-            if (classList.get(1) == null) {
-                LOG.warn("Output format vertex value type is not known");
-            } else if (!vertexValueType.equals(classList.get(1))) {
-                throw new IllegalArgumentException(
-                    "checkClassTypes: Vertex value types don't match, " +
-                    "vertex - " + vertexValueType +
-                    ", vertex output format - " + classList.get(1));
-            } if (classList.get(2) == null) {
-                LOG.warn("Output format edge value type is not known");
-            } else if (!edgeValueType.equals(classList.get(2))) {
-                throw new IllegalArgumentException(
-                    "checkClassTypes: Edge value types don't match, " +
-                    "vertex - " + vertexIndexType +
-                    ", vertex output format - " + classList.get(2));
-            }
-        }
-        // Vertex resolver might never select the types
-        Class<? extends VertexResolver<I, V, E, M>>
-            vertexResolverClass =
-                BspUtils.<I, V, E, M>getVertexResolverClass(conf);
-        classList = ReflectionUtils.<VertexResolver>getTypeArguments(
-            VertexResolver.class, vertexResolverClass);
-        if (classList.get(0) != null &&
-                !vertexIndexType.equals(classList.get(0))) {
-            throw new IllegalArgumentException(
-                "checkClassTypes: Vertex index types don't match, " +
+                ", vertex combiner - " + classList.get(1));
+      }
+    }
+    // If has vertex output format class, check
+    Class<? extends VertexOutputFormat<I, V, E>>
+    vertexOutputFormatClass =
+      BspUtils.<I, V, E>getVertexOutputFormatClass(conf);
+    if (vertexOutputFormatClass != null) {
+      classList =
+          ReflectionUtils.<VertexOutputFormat>getTypeArguments(
+              VertexOutputFormat.class, vertexOutputFormatClass);
+      if (classList.get(0) == null) {
+        LOG.warn("Output format vertex index type is not known");
+      } else if (!vertexIndexType.equals(classList.get(0))) {
+        throw new IllegalArgumentException(
+            "checkClassTypes: Vertex index types don't match, " +
                 "vertex - " + vertexIndexType +
-                ", vertex resolver - " + classList.get(0));
-        }
-        if (classList.get(1) != null &&
-                !vertexValueType.equals(classList.get(1))) {
-            throw new IllegalArgumentException(
-                "checkClassTypes: Vertex value types don't match, " +
+                ", vertex output format - " + classList.get(0));
+      }
+      if (classList.get(1) == null) {
+        LOG.warn("Output format vertex value type is not known");
+      } else if (!vertexValueType.equals(classList.get(1))) {
+        throw new IllegalArgumentException(
+            "checkClassTypes: Vertex value types don't match, " +
                 "vertex - " + vertexValueType +
-                ", vertex resolver - " + classList.get(1));
-        }
-        if (classList.get(2) != null &&
-                !edgeValueType.equals(classList.get(2))) {
-            throw new IllegalArgumentException(
-                "checkClassTypes: Edge value types don't match, " +
-                "vertex - " + edgeValueType +
-                ", vertex resolver - " + classList.get(2));
-        }
-        if (classList.get(3) != null &&
-                !messageValueType.equals(classList.get(3))) {
-            throw new IllegalArgumentException(
-                "checkClassTypes: Message value types don't match, " +
-                "vertex - " + edgeValueType +
-                ", vertex resolver - " + classList.get(3));
-        }
-        conf.setClass(GiraphJob.VERTEX_INDEX_CLASS,
-                      (Class<?>) vertexIndexType,
-                      WritableComparable.class);
-        conf.setClass(GiraphJob.VERTEX_VALUE_CLASS,
-                      (Class<?>) vertexValueType,
-                      Writable.class);
-        conf.setClass(GiraphJob.EDGE_VALUE_CLASS,
-                      (Class<?>) edgeValueType,
-                      Writable.class);
-        conf.setClass(GiraphJob.MESSAGE_VALUE_CLASS,
-                      (Class<?>) messageValueType,
-                      Writable.class);
-    }
-
-    /**
-     * Figure out what functions this mapper should do.  Basic logic is as
-     * follows:
-     * 1) If not split master, everyone does the everything and/or running
-     *    ZooKeeper.
-     * 2) If split master/worker, masters also run ZooKeeper (if it's not
-     *    given to us).
-     *
-     * @param conf Configuration to use
-     * @return Functions that this mapper should do.
-     */
-    private static MapFunctions determineMapFunctions(
-            Configuration conf,
-            ZooKeeperManager zkManager) {
-        boolean splitMasterWorker =
-            conf.getBoolean(GiraphJob.SPLIT_MASTER_WORKER,
-                            GiraphJob.SPLIT_MASTER_WORKER_DEFAULT);
-        int taskPartition = conf.getInt("mapred.task.partition", -1);
-        boolean zkAlreadyProvided =
-            conf.get(GiraphJob.ZOOKEEPER_LIST) != null;
-        MapFunctions functions = MapFunctions.UNKNOWN;
-        // What functions should this mapper do?
-        if (!splitMasterWorker) {
-            if ((zkManager != null) && zkManager.runsZooKeeper()) {
-                functions = MapFunctions.ALL;
-            } else {
-                functions = MapFunctions.ALL_EXCEPT_ZOOKEEPER;
-            }
+                ", vertex output format - " + classList.get(1));
+      }
+      if (classList.get(2) == null) {
+        LOG.warn("Output format edge value type is not known");
+      } else if (!edgeValueType.equals(classList.get(2))) {
+        throw new IllegalArgumentException(
+            "checkClassTypes: Edge value types don't match, " +
+                "vertex - " + vertexIndexType +
+                ", vertex output format - " + classList.get(2));
+      }
+    }
+    // Vertex resolver might never select the types
+    Class<? extends VertexResolver<I, V, E, M>>
+    vertexResolverClass =
+      BspUtils.<I, V, E, M>getVertexResolverClass(conf);
+    classList = ReflectionUtils.<VertexResolver>getTypeArguments(
+        VertexResolver.class, vertexResolverClass);
+    if (classList.get(0) != null &&
+        !vertexIndexType.equals(classList.get(0))) {
+      throw new IllegalArgumentException(
+          "checkClassTypes: Vertex index types don't match, " +
+              "vertex - " + vertexIndexType +
+              ", vertex resolver - " + classList.get(0));
+    }
+    if (classList.get(1) != null &&
+        !vertexValueType.equals(classList.get(1))) {
+      throw new IllegalArgumentException(
+          "checkClassTypes: Vertex value types don't match, " +
+              "vertex - " + vertexValueType +
+              ", vertex resolver - " + classList.get(1));
+    }
+    if (classList.get(2) != null &&
+        !edgeValueType.equals(classList.get(2))) {
+      throw new IllegalArgumentException(
+          "checkClassTypes: Edge value types don't match, " +
+              "vertex - " + edgeValueType +
+              ", vertex resolver - " + classList.get(2));
+    }
+    if (classList.get(3) != null &&
+        !messageValueType.equals(classList.get(3))) {
+      throw new IllegalArgumentException(
+          "checkClassTypes: Message value types don't match, " +
+              "vertex - " + edgeValueType +
+              ", vertex resolver - " + classList.get(3));
+    }
+    conf.setClass(GiraphJob.VERTEX_INDEX_CLASS,
+        (Class<?>) vertexIndexType,
+        WritableComparable.class);
+    conf.setClass(GiraphJob.VERTEX_VALUE_CLASS,
+        (Class<?>) vertexValueType,
+        Writable.class);
+    conf.setClass(GiraphJob.EDGE_VALUE_CLASS,
+        (Class<?>) edgeValueType,
+        Writable.class);
+    conf.setClass(GiraphJob.MESSAGE_VALUE_CLASS,
+        (Class<?>) messageValueType,
+        Writable.class);
+  }
+
+  /**
+   * Figure out what functions this mapper should do.  Basic logic is as
+   * follows:
+   * 1) If not split master, everyone does the everything and/or running
+   *    ZooKeeper.
+   * 2) If split master/worker, masters also run ZooKeeper (if it's not
+   *    given to us).
+   *
+   * @param conf Configuration to use
+   * @param zkManager ZooKeeper manager to help determine whether to run
+   *        ZooKeeper
+   * @return Functions that this mapper should do.
+   */
+  private static MapFunctions determineMapFunctions(
+      Configuration conf,
+      ZooKeeperManager zkManager) {
+    boolean splitMasterWorker =
+        conf.getBoolean(GiraphJob.SPLIT_MASTER_WORKER,
+            GiraphJob.SPLIT_MASTER_WORKER_DEFAULT);
+    int taskPartition = conf.getInt("mapred.task.partition", -1);
+    boolean zkAlreadyProvided =
+        conf.get(GiraphJob.ZOOKEEPER_LIST) != null;
+    MapFunctions functions = MapFunctions.UNKNOWN;
+    // What functions should this mapper do?
+    if (!splitMasterWorker) {
+      if ((zkManager != null) && zkManager.runsZooKeeper()) {
+        functions = MapFunctions.ALL;
+      } else {
+        functions = MapFunctions.ALL_EXCEPT_ZOOKEEPER;
+      }
+    } else {
+      if (zkAlreadyProvided) {
+        int masterCount =
+            conf.getInt(GiraphJob.ZOOKEEPER_SERVER_COUNT,
+                GiraphJob.ZOOKEEPER_SERVER_COUNT_DEFAULT);
+        if (taskPartition < masterCount) {
+          functions = MapFunctions.MASTER_ONLY;
         } else {
-            if (zkAlreadyProvided) {
-                int masterCount =
-                    conf.getInt(GiraphJob.ZOOKEEPER_SERVER_COUNT,
-                                GiraphJob.ZOOKEEPER_SERVER_COUNT_DEFAULT);
-                if (taskPartition < masterCount) {
-                    functions = MapFunctions.MASTER_ONLY;
-                } else {
-                    functions = MapFunctions.WORKER_ONLY;
-                }
-            } else {
-                if ((zkManager != null) && zkManager.runsZooKeeper()) {
-                    functions = MapFunctions.MASTER_ZOOKEEPER_ONLY;
-                } else {
-                    functions = MapFunctions.WORKER_ONLY;
-                }
-            }
+          functions = MapFunctions.WORKER_ONLY;
         }
-        return functions;
-    }
-
-    @Override
-    public void setup(Context context)
-            throws IOException, InterruptedException {
-        context.setStatus("setup: Beginning mapper setup.");
-        graphState.setContext(context);
-        // Setting the default handler for uncaught exceptions.
-        Thread.setDefaultUncaughtExceptionHandler(
-            new OverrideExceptionHandler());
-        conf = context.getConfiguration();
-        // Hadoop security needs this property to be set
-        if (System.getenv("HADOOP_TOKEN_FILE_LOCATION") != null) {
-            conf.set("mapreduce.job.credentials.binary",
-                    System.getenv("HADOOP_TOKEN_FILE_LOCATION"));
-        }
-        // Ensure the user classes have matching types and figure them out
-        determineClassTypes(conf);
-
-        // Do some initial setup (possibly starting up a Zookeeper service)
-        context.setStatus("setup: Initializing Zookeeper services.");
-        if (!conf.getBoolean(GiraphJob.LOCAL_TEST_MODE,
-                GiraphJob.LOCAL_TEST_MODE_DEFAULT)) {
-            Path[] fileClassPaths = DistributedCache.getLocalCacheArchives(conf);
-            String zkClasspath = null;
-            if(fileClassPaths == null) {
-                if(LOG.isInfoEnabled()) {
-                    LOG.info("Distributed cache is empty. Assuming fatjar.");
-                }
-                String jarFile = context.getJar();
-                if (jarFile == null) {
-                   jarFile = findContainingJar(getClass());
-                }
-                zkClasspath = jarFile.replaceFirst("file:", "");
-            } else {
-                StringBuilder sb = new StringBuilder();
-                sb.append(fileClassPaths[0]);
-
-                for (int i = 1; i < fileClassPaths.length; i++) {
-                    sb.append(":");
-                    sb.append(fileClassPaths[i]);
-                }
-                zkClasspath = sb.toString();
-            }
-
-            if (LOG.isInfoEnabled()) {
-                LOG.info("setup: classpath @ " + zkClasspath);
-            }
-            conf.set(GiraphJob.ZOOKEEPER_JAR, zkClasspath);
-        }
-        String serverPortList =
-            conf.get(GiraphJob.ZOOKEEPER_LIST, "");
-        if (serverPortList == "") {
-            zkManager = new ZooKeeperManager(context);
-            context.setStatus("setup: Setting up Zookeeper manager.");
-            zkManager.setup();
-            if (zkManager.computationDone()) {
-                done = true;
-                return;
-            }
-            zkManager.onlineZooKeeperServers();
-            serverPortList = zkManager.getZooKeeperServerPortString();
-        }
-        context.setStatus("setup: Connected to Zookeeper service " +
-                          serverPortList);
-        this.mapFunctions = determineMapFunctions(conf, zkManager);
-
-        // Sometimes it takes a while to get multiple ZooKeeper servers up
-        if (conf.getInt(GiraphJob.ZOOKEEPER_SERVER_COUNT,
-                    GiraphJob.ZOOKEEPER_SERVER_COUNT_DEFAULT) > 1) {
-            Thread.sleep(GiraphJob.DEFAULT_ZOOKEEPER_INIT_LIMIT *
-                         GiraphJob.DEFAULT_ZOOKEEPER_TICK_TIME);
-        }
-        int sessionMsecTimeout =
-            conf.getInt(GiraphJob.ZOOKEEPER_SESSION_TIMEOUT,
-                          GiraphJob.ZOOKEEPER_SESSION_TIMEOUT_DEFAULT);
-        try {
-            if ((mapFunctions == MapFunctions.MASTER_ZOOKEEPER_ONLY) ||
-                    (mapFunctions == MapFunctions.MASTER_ONLY) ||
-                    (mapFunctions == MapFunctions.ALL) ||
-                    (mapFunctions == MapFunctions.ALL_EXCEPT_ZOOKEEPER)) {
-                if (LOG.isInfoEnabled()) {
-                    LOG.info("setup: Starting up BspServiceMaster " +
-                             "(master thread)...");
-                }
-                masterThread =
-                    new MasterThread<I, V, E, M>(
-                        new BspServiceMaster<I, V, E, M>(serverPortList,
-                                                         sessionMsecTimeout,
-                                                         context,
-                                                         this),
-                                                         context);
-                masterThread.start();
-            }
-            if ((mapFunctions == MapFunctions.WORKER_ONLY) ||
-                    (mapFunctions == MapFunctions.ALL) ||
-                    (mapFunctions == MapFunctions.ALL_EXCEPT_ZOOKEEPER)) {
-                if (LOG.isInfoEnabled()) {
-                    LOG.info("setup: Starting up BspServiceWorker...");
-                }
-                serviceWorker = new BspServiceWorker<I, V, E, M>(
-                    serverPortList,
-                    sessionMsecTimeout,
-                    context,
-                    this,
-                    graphState);
-                if (LOG.isInfoEnabled()) {
-                    LOG.info("setup: Registering health of this worker...");
-                }
-                serviceWorker.setup();
-            }
-        } catch (Exception e) {
-            LOG.error("setup: Caught exception just before end of setup", e);
-            if (zkManager != null ) {
-                zkManager.offlineZooKeeperServers(
-                ZooKeeperManager.State.FAILED);
-            }
-            throw new RuntimeException(
-                "setup: Offlining servers due to exception...", e);
+      } else {
+        if ((zkManager != null) && zkManager.runsZooKeeper()) {
+          functions = MapFunctions.MASTER_ZOOKEEPER_ONLY;
+        } else {
+          functions = MapFunctions.WORKER_ONLY;
         }
-        context.setStatus(getMapFunctions().toString() + " starting...");
+      }
     }
+    return functions;
+  }
 
-    @Override
-    public void map(Object key, Object value, Context context)
-        throws IOException, InterruptedException {
-        // map() only does computation
-        // 1) Run checkpoint per frequency policy.
-        // 2) For every vertex on this mapper, run the compute() function
-        // 3) Wait until all messaging is done.
-        // 4) Check if all vertices are done.  If not goto 2).
-        // 5) Dump output.
-        if (done == true) {
-            return;
-        }
-        if ((serviceWorker != null) && (graphState.getNumVertices() == 0)) {
-            return;
-        }
-
-        if ((mapFunctions == MapFunctions.MASTER_ZOOKEEPER_ONLY) ||
-                (mapFunctions == MapFunctions.MASTER_ONLY)) {
-            if (LOG.isInfoEnabled()) {
-                LOG.info("map: No need to do anything when not a worker");
-            }
-            return;
-        }
-
-        if (mapAlreadyRun) {
-            throw new RuntimeException("In BSP, map should have only been" +
-                                       " run exactly once, (already run)");
-        }
-        mapAlreadyRun = true;
-
-        graphState.setSuperstep(serviceWorker.getSuperstep()).
-            setContext(context).setGraphMapper(this);
-
-        try {
-            serviceWorker.getWorkerContext().preApplication();
-        } catch (InstantiationException e) {
-            LOG.fatal("map: preApplication failed in instantiation", e);
-            throw new RuntimeException(
-                "map: preApplication failed in instantiation", e);
-        } catch (IllegalAccessException e) {
-            LOG.fatal("map: preApplication failed in access", e);
-            throw new RuntimeException(
-                "map: preApplication failed in access",e );
-        }
-        context.progress();
-
-        List<PartitionStats> partitionStatsList =
-            new ArrayList<PartitionStats>();
-        do {
-            long superstep = serviceWorker.getSuperstep();
-
-            graphState.setSuperstep(superstep);
-
-            Collection<? extends PartitionOwner> masterAssignedPartitionOwners =
-                serviceWorker.startSuperstep();
-            if (zkManager != null && zkManager.runsZooKeeper()) {
-                if (LOG.isInfoEnabled()) {
-                    LOG.info("map: Chosen to run ZooKeeper...");
-                }
-                context.setStatus("map: Running Zookeeper Server");
-            }
-
-            if (LOG.isDebugEnabled()) {
-                LOG.debug("map: " + MemoryUtils.getRuntimeMemoryStats());
-            }
-            context.progress();
-
-            serviceWorker.exchangeVertexPartitions(
-                masterAssignedPartitionOwners);
-            context.progress();
-
-            // Might need to restart from another superstep
-            // (manually or automatic), or store a checkpoint
-            if (serviceWorker.getRestartedSuperstep() == superstep) {
-                if (LOG.isInfoEnabled()) {
-                    LOG.info("map: Loading from checkpoint " + superstep);
-                }
-                serviceWorker.loadCheckpoint(
-                    serviceWorker.getRestartedSuperstep());
-            } else if (serviceWorker.checkpointFrequencyMet(superstep)) {
-                serviceWorker.storeCheckpoint();
-            }
-
-            serviceWorker.getWorkerContext().setGraphState(graphState);
-            serviceWorker.getWorkerContext().preSuperstep();
-            context.progress();
-
-            partitionStatsList.clear();
-            for (Partition<I, V, E, M> partition :
-                    serviceWorker.getPartitionMap().values()) {
-                PartitionStats partitionStats =
-                    new PartitionStats(partition.getPartitionId(), 0, 0, 0);
-                for (BasicVertex<I, V, E, M> basicVertex :
-                        partition.getVertices()) {
-                    // Make sure every vertex has the current
-                    // graphState before computing
-                    basicVertex.setGraphState(graphState);
-                    if (basicVertex.isHalted()
-                            && !Iterables.isEmpty(basicVertex.getMessages())) {
-                        basicVertex.halt = false;
-                    }
-                    if (!basicVertex.isHalted()) {
-                        Iterator<M> vertexMsgIt =
-                            basicVertex.getMessages().iterator();
-                        context.progress();
-                        basicVertex.compute(vertexMsgIt);
-                        basicVertex.releaseResources();
-                    }
-                    if (basicVertex.isHalted()) {
-                        partitionStats.incrFinishedVertexCount();
-                    }
-                    partitionStats.incrVertexCount();
-                    partitionStats.addEdgeCount(basicVertex.getNumOutEdges());
-                }
-                partitionStatsList.add(partitionStats);
-            }
-        } while (!serviceWorker.finishSuperstep(partitionStatsList));
+  @Override
+  public void setup(Context context)
+    throws IOException, InterruptedException {
+    context.setStatus("setup: Beginning mapper setup.");
+    graphState.setContext(context);
+    // Setting the default handler for uncaught exceptions.
+    Thread.setDefaultUncaughtExceptionHandler(
+        new OverrideExceptionHandler());
+    conf = context.getConfiguration();
+    // Hadoop security needs this property to be set
+    if (System.getenv("HADOOP_TOKEN_FILE_LOCATION") != null) {
+      conf.set("mapreduce.job.credentials.binary",
+          System.getenv("HADOOP_TOKEN_FILE_LOCATION"));
+    }
+    // Ensure the user classes have matching types and figure them out
+    determineClassTypes(conf);
+
+    // Do some initial setup (possibly starting up a Zookeeper service)
+    context.setStatus("setup: Initializing Zookeeper services.");
+    if (!conf.getBoolean(GiraphJob.LOCAL_TEST_MODE,
+        GiraphJob.LOCAL_TEST_MODE_DEFAULT)) {
+      Path[] fileClassPaths = DistributedCache.getLocalCacheArchives(conf);
+      String zkClasspath = null;
+      if (fileClassPaths == null) {
         if (LOG.isInfoEnabled()) {
-            LOG.info("map: BSP application done " +
-                     "(global vertices marked done)");
+          LOG.info("Distributed cache is empty. Assuming fatjar.");
         }
-
-        serviceWorker.getWorkerContext().postApplication();
-        context.progress();
-    }
-
-    @Override
-    public void cleanup(Context context)
-            throws IOException, InterruptedException {
+        String jarFile = context.getJar();
+        if (jarFile == null) {
+          jarFile = findContainingJar(getClass());
+        }
+        zkClasspath = jarFile.replaceFirst("file:", "");
+      } else {
+        StringBuilder sb = new StringBuilder();
+        sb.append(fileClassPaths[0]);
+
+        for (int i = 1; i < fileClassPaths.length; i++) {
+          sb.append(":");
+          sb.append(fileClassPaths[i]);
+        }
+        zkClasspath = sb.toString();
+      }
+
+      if (LOG.isInfoEnabled()) {
+        LOG.info("setup: classpath @ " + zkClasspath);
+      }
+      conf.set(GiraphJob.ZOOKEEPER_JAR, zkClasspath);
+    }
+    String serverPortList =
+        conf.get(GiraphJob.ZOOKEEPER_LIST, "");
+    if (serverPortList.isEmpty()) {
+      zkManager = new ZooKeeperManager(context);
+      context.setStatus("setup: Setting up Zookeeper manager.");
+      zkManager.setup();
+      if (zkManager.computationDone()) {
+        done = true;
+        return;
+      }
+      zkManager.onlineZooKeeperServers();
+      serverPortList = zkManager.getZooKeeperServerPortString();
+    }
+    context.setStatus("setup: Connected to Zookeeper service " +
+        serverPortList);
+    this.mapFunctions = determineMapFunctions(conf, zkManager);
+
+    // Sometimes it takes a while to get multiple ZooKeeper servers up
+    if (conf.getInt(GiraphJob.ZOOKEEPER_SERVER_COUNT,
+        GiraphJob.ZOOKEEPER_SERVER_COUNT_DEFAULT) > 1) {
+      Thread.sleep(GiraphJob.DEFAULT_ZOOKEEPER_INIT_LIMIT *
+          GiraphJob.DEFAULT_ZOOKEEPER_TICK_TIME);
+    }
+    int sessionMsecTimeout =
+        conf.getInt(GiraphJob.ZOOKEEPER_SESSION_TIMEOUT,
+            GiraphJob.ZOOKEEPER_SESSION_TIMEOUT_DEFAULT);
+    try {
+      if ((mapFunctions == MapFunctions.MASTER_ZOOKEEPER_ONLY) ||
+          (mapFunctions == MapFunctions.MASTER_ONLY) ||
+          (mapFunctions == MapFunctions.ALL) ||
+          (mapFunctions == MapFunctions.ALL_EXCEPT_ZOOKEEPER)) {
         if (LOG.isInfoEnabled()) {
-            LOG.info("cleanup: Starting for " + getMapFunctions());
+          LOG.info("setup: Starting up BspServiceMaster " +
+              "(master thread)...");
         }
-        if (done) {
-            return;
+        masterThread =
+            new MasterThread<I, V, E, M>(
+                new BspServiceMaster<I, V, E, M>(serverPortList,
+                    sessionMsecTimeout,
+                    context,
+                    this),
+                    context);
+        masterThread.start();
+      }
+      if ((mapFunctions == MapFunctions.WORKER_ONLY) ||
+          (mapFunctions == MapFunctions.ALL) ||
+          (mapFunctions == MapFunctions.ALL_EXCEPT_ZOOKEEPER)) {
+        if (LOG.isInfoEnabled()) {
+          LOG.info("setup: Starting up BspServiceWorker...");
         }
-
-        if (serviceWorker != null) {
-            serviceWorker.cleanup();
+        serviceWorker = new BspServiceWorker<I, V, E, M>(
+            serverPortList,
+            sessionMsecTimeout,
+            context,
+            this,
+            graphState);
+        if (LOG.isInfoEnabled()) {
+          LOG.info("setup: Registering health of this worker...");
         }
-        try {
-            if (masterThread != null) {
-                masterThread.join();
-            }
-        } catch (InterruptedException e) {
-            // cleanup phase -- just log the error
-            LOG.error("cleanup: Master thread couldn't join");
-        }
-        if (zkManager != null) {
-            zkManager.offlineZooKeeperServers(
-                ZooKeeperManager.State.FINISHED);
+        serviceWorker.setup();
+      }
+    } catch (IOException e) {
+      LOG.error("setup: Caught exception just before end of setup", e);
+      if (zkManager != null) {
+        zkManager.offlineZooKeeperServers(
+            ZooKeeperManager.State.FAILED);
+      }
+      throw new RuntimeException(
+          "setup: Offlining servers due to exception...", e);
+    }
+    context.setStatus(getMapFunctions().toString() + " starting...");
+  }
+
+  @Override
+  public void map(Object key, Object value, Context context)
+    throws IOException, InterruptedException {
+    // map() only does computation
+    // 1) Run checkpoint per frequency policy.
+    // 2) For every vertex on this mapper, run the compute() function
+    // 3) Wait until all messaging is done.
+    // 4) Check if all vertices are done.  If not goto 2).
+    // 5) Dump output.
+    if (done) {
+      return;
+    }
+    if ((serviceWorker != null) && (graphState.getNumVertices() == 0)) {
+      return;
+    }
+
+    if ((mapFunctions == MapFunctions.MASTER_ZOOKEEPER_ONLY) ||
+        (mapFunctions == MapFunctions.MASTER_ONLY)) {
+      if (LOG.isInfoEnabled()) {
+        LOG.info("map: No need to do anything when not a worker");
+      }
+      return;
+    }
+
+    if (mapAlreadyRun) {
+      throw new RuntimeException("In BSP, map should have only been" +
+          " run exactly once, (already run)");
+    }
+    mapAlreadyRun = true;
+
+    graphState.setSuperstep(serviceWorker.getSuperstep()).
+      setContext(context).setGraphMapper(this);
+
+    try {
+      serviceWorker.getWorkerContext().preApplication();
+    } catch (InstantiationException e) {
+      LOG.fatal("map: preApplication failed in instantiation", e);
+      throw new RuntimeException(
+          "map: preApplication failed in instantiation", e);
+    } catch (IllegalAccessException e) {
+      LOG.fatal("map: preApplication failed in access", e);
+      throw new RuntimeException(
+          "map: preApplication failed in access", e);
+    }
+    context.progress();
+
+    List<PartitionStats> partitionStatsList =
+        new ArrayList<PartitionStats>();
+    do {
+      long superstep = serviceWorker.getSuperstep();
+
+      graphState.setSuperstep(superstep);
+
+      Collection<? extends PartitionOwner> masterAssignedPartitionOwners =
+          serviceWorker.startSuperstep();
+      if (zkManager != null && zkManager.runsZooKeeper()) {
+        if (LOG.isInfoEnabled()) {
+          LOG.info("map: Chosen to run ZooKeeper...");
         }
-    }
+        context.setStatus("map: Running Zookeeper Server");
+      }
 
-    @Override
-    public void run(Context context) throws IOException, InterruptedException {
-        // Notify the master quicker if there is worker failure rather than
-        // waiting for ZooKeeper to timeout and delete the ephemeral znodes
-        try {
-            setup(context);
-            while (context.nextKeyValue()) {
-                map(context.getCurrentKey(),
-                    context.getCurrentValue(),
-                    context);
-            }
-        cleanup(context);
-        } catch (Exception e) {
-            if (mapFunctions == MapFunctions.WORKER_ONLY) {
-                serviceWorker.failureCleanup();
-            }
-            throw new IllegalStateException(
-                "run: Caught an unrecoverable exception " + e.getMessage(), e);
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("map: " + MemoryUtils.getRuntimeMemoryStats());
+      }
+      context.progress();
+
+      serviceWorker.exchangeVertexPartitions(
+          masterAssignedPartitionOwners);
+      context.progress();
+
+      // Might need to restart from another superstep
+      // (manually or automatic), or store a checkpoint
+      if (serviceWorker.getRestartedSuperstep() == superstep) {
+        if (LOG.isInfoEnabled()) {
+          LOG.info("map: Loading from checkpoint " + superstep);
         }
+        serviceWorker.loadCheckpoint(
+            serviceWorker.getRestartedSuperstep());
+      } else if (serviceWorker.checkpointFrequencyMet(superstep)) {
+        serviceWorker.storeCheckpoint();
+      }
+
+      serviceWorker.getWorkerContext().setGraphState(graphState);
+      serviceWorker.getWorkerContext().preSuperstep();
+      context.progress();
+
+      partitionStatsList.clear();
+      for (Partition<I, V, E, M> partition :
+        serviceWorker.getPartitionMap().values()) {
+        PartitionStats partitionStats =
+            new PartitionStats(partition.getPartitionId(), 0, 0, 0);
+        for (BasicVertex<I, V, E, M> basicVertex :
+          partition.getVertices()) {
+          // Make sure every vertex has the current
+          // graphState before computing
+          basicVertex.setGraphState(graphState);
+          if (basicVertex.isHalted() &
+              !Iterables.isEmpty(basicVertex.getMessages())) {
+            basicVertex.halt = false;
+          }
+          if (!basicVertex.isHalted()) {
+            Iterator<M> vertexMsgIt =
+                basicVertex.getMessages().iterator();
+            context.progress();
+            basicVertex.compute(vertexMsgIt);
+            basicVertex.releaseResources();
+          }
+          if (basicVertex.isHalted()) {
+            partitionStats.incrFinishedVertexCount();
+          }
+          partitionStats.incrVertexCount();
+          partitionStats.addEdgeCount(basicVertex.getNumOutEdges());
+        }
+        partitionStatsList.add(partitionStats);
+      }
+    } while (!serviceWorker.finishSuperstep(partitionStatsList));
+    if (LOG.isInfoEnabled()) {
+      LOG.info("map: BSP application done " +
+          "(global vertices marked done)");
+    }
+
+    serviceWorker.getWorkerContext().postApplication();
+    context.progress();
+  }
+
+  @Override
+  public void cleanup(Context context)
+    throws IOException, InterruptedException {
+    if (LOG.isInfoEnabled()) {
+      LOG.info("cleanup: Starting for " + getMapFunctions());
+    }
+    if (done) {
+      return;
+    }
+
+    if (serviceWorker != null) {
+      serviceWorker.cleanup();
+    }
+    try {
+      if (masterThread != null) {
+        masterThread.join();
+      }
+    } catch (InterruptedException e) {
+      // cleanup phase -- just log the error
+      LOG.error("cleanup: Master thread couldn't join");
+    }
+    if (zkManager != null) {
+      zkManager.offlineZooKeeperServers(
+          ZooKeeperManager.State.FINISHED);
+    }
+  }
+
+  @Override
+  public void run(Context context) throws IOException, InterruptedException {
+    // Notify the master quicker if there is worker failure rather than
+    // waiting for ZooKeeper to timeout and delete the ephemeral znodes
+    try {
+      setup(context);
+      while (context.nextKeyValue()) {
+        map(context.getCurrentKey(),
+            context.getCurrentValue(),
+            context);
+      }
+      cleanup(context);
+    } catch (IOException e) {
+      if (mapFunctions == MapFunctions.WORKER_ONLY) {
+        serviceWorker.failureCleanup();
+      }
+      throw new IllegalStateException(
+          "run: Caught an unrecoverable exception " + e.getMessage(), e);
     }
+  }
 }

Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GraphState.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GraphState.java?rev=1245205&r1=1245204&r2=1245205&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GraphState.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GraphState.java Thu Feb 16 22:12:31 2012
@@ -22,84 +22,120 @@ import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapreduce.Mapper;
 
-/*
+/**
  * Global state of the graph.  Should be treated as a singleton (but is kept
  * as a regular bean to facilitate ease of unit testing)
  *
- * @param <I> vertex id
- * @param <V> vertex data
- * @param <E> edge data
- * @param <M> message data
+ * @param <I> Vertex id
+ * @param <V> Vertex data
+ * @param <E> Edge data
+ * @param <M> Message data
  */
 @SuppressWarnings("rawtypes")
 public class GraphState<I extends WritableComparable, V extends Writable,
-        E extends Writable, M extends Writable> {
-    /** Graph-wide superstep */
-    private long superstep = 0;
-    /** Graph-wide number of vertices */
-    private long numVertices = -1;
-    /** Graph-wide number of edges */
-    private long numEdges = -1;
-    /** Graph-wide map context */
-    private Mapper.Context context;
-    /** Graph-wide BSP Mapper for this Vertex */
-    private GraphMapper<I, V, E, M> graphMapper;
-    /** Graph-wide worker communications */
-    private WorkerCommunications<I, V, E, M> workerCommunications;
-
-    public long getSuperstep() {
-        return superstep;
-    }
-
-    public GraphState<I, V, E, M> setSuperstep(long superstep) {
-        this.superstep = superstep;
-        return this;
-    }
-
-    public long getNumVertices() {
-        return numVertices;
-    }
-
-    public GraphState<I, V, E, M> setNumVertices(long numVertices) {
-        this.numVertices = numVertices;
-        return this;
-    }
-
-    public long getNumEdges() {
-        return numEdges;
-    }
-
-    public GraphState<I, V, E, M> setNumEdges(long numEdges) {
-        this.numEdges = numEdges;
-        return this;
-    }
-
-    public Mapper.Context getContext() {
-        return context;
-    }
-
-    public GraphState<I, V, E ,M> setContext(Mapper.Context context) {
-        this.context = context;
-        return this;
-    }
-
-    public GraphMapper<I, V, E, M> getGraphMapper() {
-        return graphMapper;
-    }
-
-    public GraphState<I, V, E, M> setGraphMapper(
-            GraphMapper<I, V, E, M> graphMapper) {
-        this.graphMapper = graphMapper;
-        return this;
-    }
-
-    public GraphState<I, V, E, M> setWorkerCommunications(
-            WorkerCommunications<I, V, E, M> workerCommunications) {
-        this.workerCommunications = workerCommunications;
-        return this;
-    }
-
-    public WorkerCommunications<I, V, E, M> getWorkerCommunications() {
-        return workerCommunications;
-    }
+E extends Writable, M extends Writable> {
+  /** Graph-wide superstep */
+  private long superstep = 0;
+  /** Graph-wide number of vertices */
+  private long numVertices = -1;
+  /** Graph-wide number of edges */
+  private long numEdges = -1;
+  /** Graph-wide map context */
+  private Mapper.Context context;
+  /** Graph-wide BSP Mapper for this Vertex */
+  private GraphMapper<I, V, E, M> graphMapper;
+  /** Graph-wide worker communications */
+  private WorkerCommunications<I, V, E, M> workerCommunications;
+
+  public long getSuperstep() {
+    return superstep;
+  }
+
+  /**
+   * Set the current superstep.
+   *
+   * @param superstep Current superstep to use.
+   * @return Returns this object.
+   */
+  public GraphState<I, V, E, M> setSuperstep(long superstep) {
+    this.superstep = superstep;
+    return this;
+  }
+
+  public long getNumVertices() {
+    return numVertices;
+  }
+
+  /**
+   * Set the current number of vertices.
+   *
+   * @param numVertices Current number of vertices.
+   * @return Returns this object.
+   */
+  public GraphState<I, V, E, M> setNumVertices(long numVertices) {
+    this.numVertices = numVertices;
+    return this;
+  }
+
+  public long getNumEdges() {
+    return numEdges;
+  }
+
+  /**
+   * Set the current number of edges.
+   *
+   * @param numEdges Current number of edges.
+   * @return Returns this object.
+   */
+  public GraphState<I, V, E, M> setNumEdges(long numEdges) {
+    this.numEdges = numEdges;
+    return this;
+  }
+
+  public Mapper.Context getContext() {
+    return context;
+  }
+
+  /**
+   * Set the current context.
+   *
+   * @param context Current context.
+   * @return Returns this object.
+   */
+  public GraphState<I, V, E, M> setContext(Mapper.Context context) {
+    this.context = context;
+    return this;
+  }
+
+  public GraphMapper<I, V, E, M> getGraphMapper() {
+    return graphMapper;
+  }
+
+  /**
+   * Set the current graph mapper.
+   *
+   * @param graphMapper Current graph mapper.
+   * @return Returns this object.
+   */
+  public GraphState<I, V, E, M> setGraphMapper(
+      GraphMapper<I, V, E, M> graphMapper) {
+    this.graphMapper = graphMapper;
+    return this;
+  }
+
+  /**
+   * Set the current worker communications.
+   *
+   * @param workerCommunications Current worker communications.
+   * @return Returns this object.
+   */
+  public GraphState<I, V, E, M> setWorkerCommunications(
+      WorkerCommunications<I, V, E, M> workerCommunications) {
+    this.workerCommunications = workerCommunications;
+    return this;
+  }
+
+  public WorkerCommunications<I, V, E, M> getWorkerCommunications() {
+    return workerCommunications;
+  }
 }

Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/HashMapVertex.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/HashMapVertex.java?rev=1245205&r1=1245204&r2=1245205&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/HashMapVertex.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/HashMapVertex.java Thu Feb 16 22:12:31 2012
@@ -48,196 +48,196 @@ import java.util.Map;
  */
 @SuppressWarnings("rawtypes")
 public abstract class HashMapVertex<I extends WritableComparable,
-        V extends Writable, E extends Writable, M extends Writable>
-        extends MutableVertex<I, V, E, M> {
-    /** Class logger */
-    private static final Logger LOG = Logger.getLogger(HashMapVertex.class);
-    /** Vertex id */
-    private I vertexId = null;
-    /** Vertex value */
-    private V vertexValue = null;
-    /** Map of destination vertices and their edge values */
-    protected final Map<I, Edge<I, E>> destEdgeMap =
-        new HashMap<I, Edge<I, E>>();
-    /** List of incoming messages from the previous superstep */
-    private final List<M> msgList = Lists.newArrayList();
-
-    @Override
-    public void initialize(
-            I vertexId, V vertexValue, Map<I, E> edges, Iterable<M> messages) {
-        if (vertexId != null) {
-            setVertexId(vertexId);
-        }
-        if (vertexValue != null) {
-            setVertexValue(vertexValue);
-        }
-        if (edges != null && !edges.isEmpty()) {
-            for (Map.Entry<I, E> entry : edges.entrySet()) {
-                destEdgeMap.put(
-                    entry.getKey(),
-                    new Edge<I, E>(entry.getKey(), entry.getValue()));
-            }
-        }
-        if (messages != null) {
-            Iterables.<M>addAll(msgList, messages);
-        }
-    }
-
-    @Override
-    public final boolean addEdge(I targetVertexId, E edgeValue) {
-        if (destEdgeMap.put(
-                targetVertexId,
-                new Edge<I, E>(targetVertexId, edgeValue)) != null) {
-            if (LOG.isDebugEnabled()) {
-                LOG.debug("addEdge: Vertex=" + vertexId +
-                          ": already added an edge value for dest vertex id " +
-                          targetVertexId);
-            }
-            return false;
-        } else {
-            return true;
-        }
-    }
-
-    @Override
-    public long getSuperstep() {
-        return getGraphState().getSuperstep();
-    }
-
-    @Override
-    public final void setVertexId(I vertexId) {
-        this.vertexId = vertexId;
-    }
-
-    @Override
-    public final I getVertexId() {
-        return vertexId;
-    }
-
-    @Override
-    public final V getVertexValue() {
-        return vertexValue;
-    }
-
-    @Override
-    public final void setVertexValue(V vertexValue) {
-        this.vertexValue = vertexValue;
-    }
-
-    @Override
-    public E getEdgeValue(I targetVertexId) {
-        Edge<I, E> edge = destEdgeMap.get(targetVertexId);
-        return edge != null ? edge.getEdgeValue() : null;
-    }
-
-    @Override
-    public boolean hasEdge(I targetVertexId) {
-        return destEdgeMap.containsKey(targetVertexId);
-    }
-
-    /**
-     * Get an iterator to the edges on this vertex.
-     *
-     * @return A <em>sorted</em> iterator, as defined by the sort-order
-     *         of the vertex ids
-     */
-    @Override
-    public Iterator<I> iterator() {
-        return destEdgeMap.keySet().iterator();
-    }
-
-    @Override
-    public int getNumOutEdges() {
-        return destEdgeMap.size();
-    }
-
-    @Override
-    public E removeEdge(I targetVertexId) {
-        Edge<I, E> edge = destEdgeMap.remove(targetVertexId);
-        if (edge != null) {
-            return edge.getEdgeValue();
-        } else {
-            return null;
-        }
-    }
-
-    @Override
-    public final void sendMsgToAllEdges(M msg) {
-        if (msg == null) {
-            throw new IllegalArgumentException(
-                "sendMsgToAllEdges: Cannot send null message to all edges");
-        }
-        for (Edge<I, E> edge : destEdgeMap.values()) {
-            sendMsg(edge.getDestVertexId(), msg);
-        }
-    }
-
-    @Override
-    final public void readFields(DataInput in) throws IOException {
-        vertexId = BspUtils.<I>createVertexIndex(getConf());
-        vertexId.readFields(in);
-        boolean hasVertexValue = in.readBoolean();
-        if (hasVertexValue) {
-            vertexValue = BspUtils.<V>createVertexValue(getConf());
-            vertexValue.readFields(in);
-        }
-        long edgeMapSize = in.readLong();
-        for (long i = 0; i < edgeMapSize; ++i) {
-            Edge<I, E> edge = new Edge<I, E>();
-            edge.setConf(getConf());
-            edge.readFields(in);
-            addEdge(edge.getDestVertexId(), edge.getEdgeValue());
-        }
-        long msgListSize = in.readLong();
-        for (long i = 0; i < msgListSize; ++i) {
-            M msg = BspUtils.<M>createMessageValue(getConf());
-            msg.readFields(in);
-            msgList.add(msg);
-        }
-        halt = in.readBoolean();
-    }
-
-    @Override
-    final public void write(DataOutput out) throws IOException {
-        vertexId.write(out);
-        out.writeBoolean(vertexValue != null);
-        if (vertexValue != null) {
-            vertexValue.write(out);
-        }
-        out.writeLong(destEdgeMap.size());
-        for (Edge<I, E> edge : destEdgeMap.values()) {
-            edge.write(out);
-        }
-        out.writeLong(msgList.size());
-        for (M msg : msgList) {
-            msg.write(out);
-        }
-        out.writeBoolean(halt);
-    }
-
-    @Override
-    void putMessages(Iterable<M> messages) {
-        msgList.clear();
-        for (M message : messages) {
-            msgList.add(message);
-        }
-    }
-
-    @Override
-    public Iterable<M> getMessages() {
-        return Iterables.unmodifiableIterable(msgList);
-    }
-
-    @Override
-    void releaseResources() {
-        // Hint to GC to free the messages
-        msgList.clear();
-    }
-
-    @Override
-    public String toString() {
-        return "Vertex(id=" + getVertexId() + ",value=" + getVertexValue() +
-            ",#edges=" + destEdgeMap.size() + ")";
-    }
+    V extends Writable, E extends Writable, M extends Writable>
+    extends MutableVertex<I, V, E, M> {
+  /** Class logger */
+  private static final Logger LOG = Logger.getLogger(HashMapVertex.class);
+  /** Map of destination vertices and their edge values */
+  protected final Map<I, Edge<I, E>> destEdgeMap =
+      new HashMap<I, Edge<I, E>>();
+  /** Vertex id */
+  private I vertexId = null;
+  /** Vertex value */
+  private V vertexValue = null;
+  /** List of incoming messages from the previous superstep */
+  private final List<M> msgList = Lists.newArrayList();
+
+  @Override
+  public void initialize(
+      I vertexId, V vertexValue, Map<I, E> edges, Iterable<M> messages) {
+    if (vertexId != null) {
+      setVertexId(vertexId);
+    }
+    if (vertexValue != null) {
+      setVertexValue(vertexValue);
+    }
+    if (edges != null && !edges.isEmpty()) {
+      for (Map.Entry<I, E> entry : edges.entrySet()) {
+        destEdgeMap.put(
+            entry.getKey(),
+            new Edge<I, E>(entry.getKey(), entry.getValue()));
+      }
+    }
+    if (messages != null) {
+      Iterables.<M>addAll(msgList, messages);
+    }
+  }
+
+  @Override
+  public final boolean addEdge(I targetVertexId, E edgeValue) {
+    if (destEdgeMap.put(
+        targetVertexId,
+        new Edge<I, E>(targetVertexId, edgeValue)) != null) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("addEdge: Vertex=" + vertexId +
+            ": already added an edge value for dest vertex id " +
+            targetVertexId);
+      }
+      return false;
+    } else {
+      return true;
+    }
+  }
+
+  @Override
+  public long getSuperstep() {
+    return getGraphState().getSuperstep();
+  }
+
+  @Override
+  public final void setVertexId(I vertexId) {
+    this.vertexId = vertexId;
+  }
+
+  @Override
+  public final I getVertexId() {
+    return vertexId;
+  }
+
+  @Override
+  public final V getVertexValue() {
+    return vertexValue;
+  }
+
+  @Override
+  public final void setVertexValue(V vertexValue) {
+    this.vertexValue = vertexValue;
+  }
+
+  @Override
+  public E getEdgeValue(I targetVertexId) {
+    Edge<I, E> edge = destEdgeMap.get(targetVertexId);
+    return edge != null ? edge.getEdgeValue() : null;
+  }
+
+  @Override
+  public boolean hasEdge(I targetVertexId) {
+    return destEdgeMap.containsKey(targetVertexId);
+  }
+
+  /**
+   * Get an iterator to the edges on this vertex.
+   *
+   * @return A <em>sorted</em> iterator, as defined by the sort-order
+   *         of the vertex ids
+   */
+  @Override
+  public Iterator<I> iterator() {
+    return destEdgeMap.keySet().iterator();
+  }
+
+  @Override
+  public int getNumOutEdges() {
+    return destEdgeMap.size();
+  }
+
+  @Override
+  public E removeEdge(I targetVertexId) {
+    Edge<I, E> edge = destEdgeMap.remove(targetVertexId);
+    if (edge != null) {
+      return edge.getEdgeValue();
+    } else {
+      return null;
+    }
+  }
+
+  @Override
+  public final void sendMsgToAllEdges(M msg) {
+    if (msg == null) {
+      throw new IllegalArgumentException(
+          "sendMsgToAllEdges: Cannot send null message to all edges");
+    }
+    for (Edge<I, E> edge : destEdgeMap.values()) {
+      sendMsg(edge.getDestVertexId(), msg);
+    }
+  }
+
+  @Override
+  public final void readFields(DataInput in) throws IOException {
+    vertexId = BspUtils.<I>createVertexIndex(getConf());
+    vertexId.readFields(in);
+    boolean hasVertexValue = in.readBoolean();
+    if (hasVertexValue) {
+      vertexValue = BspUtils.<V>createVertexValue(getConf());
+      vertexValue.readFields(in);
+    }
+    long edgeMapSize = in.readLong();
+    for (long i = 0; i < edgeMapSize; ++i) {
+      Edge<I, E> edge = new Edge<I, E>();
+      edge.setConf(getConf());
+      edge.readFields(in);
+      addEdge(edge.getDestVertexId(), edge.getEdgeValue());
+    }
+    long msgListSize = in.readLong();
+    for (long i = 0; i < msgListSize; ++i) {
+      M msg = BspUtils.<M>createMessageValue(getConf());
+      msg.readFields(in);
+      msgList.add(msg);
+    }
+    halt = in.readBoolean();
+  }
+
+  @Override
+  public final void write(DataOutput out) throws IOException {
+    vertexId.write(out);
+    out.writeBoolean(vertexValue != null);
+    if (vertexValue != null) {
+      vertexValue.write(out);
+    }
+    out.writeLong(destEdgeMap.size());
+    for (Edge<I, E> edge : destEdgeMap.values()) {
+      edge.write(out);
+    }
+    out.writeLong(msgList.size());
+    for (M msg : msgList) {
+      msg.write(out);
+    }
+    out.writeBoolean(halt);
+  }
+
+  @Override
+  void putMessages(Iterable<M> messages) {
+    msgList.clear();
+    for (M message : messages) {
+      msgList.add(message);
+    }
+  }
+
+  @Override
+  public Iterable<M> getMessages() {
+    return Iterables.unmodifiableIterable(msgList);
+  }
+
+  @Override
+  void releaseResources() {
+    // Hint to GC to free the messages
+    msgList.clear();
+  }
+
+  @Override
+  public String toString() {
+    return "Vertex(id=" + getVertexId() + ",value=" + getVertexValue() +
+        ",#edges=" + destEdgeMap.size() + ")";
+  }
 }
 

Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/IntIntNullIntVertex.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/IntIntNullIntVertex.java?rev=1245205&r1=1245204&r2=1245205&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/IntIntNullIntVertex.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/IntIntNullIntVertex.java Thu Feb 16 22:12:31 2012
@@ -30,135 +30,138 @@ import java.util.Iterator;
 import java.util.Map;
 
 /**
- * Simple implementation of {@link BasicVertex} using an int as id, value and message.
- * Edges are immutable and unweighted. This class aims to be as memory efficient as possible.
+ * Simple implementation of {@link BasicVertex} using an int as id, value and
+ * message.  Edges are immutable and unweighted. This class aims to be as
+ * memory efficient as possible.
  */
 public abstract class IntIntNullIntVertex extends
-        BasicVertex<IntWritable, IntWritable, NullWritable,IntWritable> {
-
-    private int id;
-    private int value;
-
-    private int[] neighbors;
-    private int[] messages;
-
-    @Override
-    public void initialize(IntWritable vertexId, IntWritable vertexValue,
-            Map<IntWritable, NullWritable> edges,
-            Iterable<IntWritable> messages) {
-        id = vertexId.get();
-        value = vertexValue.get();
-        this.neighbors = new int[edges.size()];
-        int n = 0;
-        for (IntWritable neighbor : edges.keySet()) {
-            this.neighbors[n++] = neighbor.get();
-        }
-        this.messages = new int[Iterables.size(messages)];
-        n = 0;
-        for (IntWritable message : messages) {
-            this.messages[n++] = message.get();
-        }
-    }
-
-    @Override
-    public IntWritable getVertexId() {
-        return new IntWritable(id);
-    }
-
-    @Override
-    public IntWritable getVertexValue() {
-        return new IntWritable(value);
-    }
-
-    @Override
-    public void setVertexValue(IntWritable vertexValue) {
-        value = vertexValue.get();
-    }
-
-    @Override
-    public Iterator<IntWritable> iterator() {
-        return new UnmodifiableIntArrayIterator(neighbors);
-    }
-
-    @Override
-    public NullWritable getEdgeValue(IntWritable targetVertexId) {
-        return NullWritable.get();
-    }
-
-    @Override
-    public boolean hasEdge(IntWritable targetVertexId) {
-        for (int neighbor : neighbors) {
-            if (neighbor == targetVertexId.get()) {
-                return true;
-            }
-        }
-        return false;
-    }
-
-    @Override
-    public int getNumOutEdges() {
-        return neighbors.length;
-    }
-
-    @Override
-    public void sendMsgToAllEdges(final IntWritable message) {
-        for (int neighbor : neighbors) {
-            sendMsg(new IntWritable(neighbor), message);
-        }
-    }
-
-    @Override
-    public Iterable<IntWritable> getMessages() {
-        return new Iterable<IntWritable>() {
-            @Override
-            public Iterator<IntWritable> iterator() {
-                return new UnmodifiableIntArrayIterator(messages);
-            }
-        };
-    }
-
-    @Override
-    public void putMessages(Iterable<IntWritable> newMessages) {
-        messages = new int[Iterables.size(newMessages)];
-        int n = 0;
-        for (IntWritable message : newMessages) {
-            messages[n++] = message.get();
-        }
-    }
-
-    @Override
-    void releaseResources() {
-        messages = new int[0];
-    }
-
-    @Override
-    public void write(final DataOutput out) throws IOException {
-        out.writeInt(id);
-        out.writeInt(value);
-        out.writeInt(neighbors.length);
-        for (int n = 0; n < neighbors.length; n++) {
-            out.writeInt(neighbors[n]);
-        }
-        out.writeInt(messages.length);
-        for (int n = 0; n < messages.length; n++) {
-            out.writeInt(messages[n]);
-        }
-    }
-
-    @Override
-    public void readFields(DataInput in) throws IOException {
-        id = in.readInt();
-        value = in.readInt();
-        int numEdges = in.readInt();
-        neighbors = new int[numEdges];
-        for (int n = 0; n < numEdges; n++) {
-            neighbors[n] = in.readInt();
-        }
-        int numMessages = in.readInt();
-        messages = new int[numMessages];
-        for (int n = 0; n < numMessages; n++) {
-            messages[n] = in.readInt();
-        }
+    BasicVertex<IntWritable, IntWritable, NullWritable, IntWritable> {
+  /** Int represented vertex id */
+  private int id;
+  /** Int represented vertex value */
+  private int value;
+  /** Int array of neighbor vertex ids */
+  private int[] neighbors;
+  /** Int array of messages */
+  private int[] messages;
+
+  @Override
+  public void initialize(IntWritable vertexId, IntWritable vertexValue,
+      Map<IntWritable, NullWritable> edges,
+      Iterable<IntWritable> messages) {
+    id = vertexId.get();
+    value = vertexValue.get();
+    this.neighbors = new int[edges.size()];
+    int n = 0;
+    for (IntWritable neighbor : edges.keySet()) {
+      this.neighbors[n++] = neighbor.get();
+    }
+    this.messages = new int[Iterables.size(messages)];
+    n = 0;
+    for (IntWritable message : messages) {
+      this.messages[n++] = message.get();
+    }
+  }
+
+  @Override
+  public IntWritable getVertexId() {
+    return new IntWritable(id);
+  }
+
+  @Override
+  public IntWritable getVertexValue() {
+    return new IntWritable(value);
+  }
+
+  @Override
+  public void setVertexValue(IntWritable vertexValue) {
+    value = vertexValue.get();
+  }
+
+  @Override
+  public Iterator<IntWritable> iterator() {
+    return new UnmodifiableIntArrayIterator(neighbors);
+  }
+
+  @Override
+  public NullWritable getEdgeValue(IntWritable targetVertexId) {
+    return NullWritable.get();
+  }
+
+  @Override
+  public boolean hasEdge(IntWritable targetVertexId) {
+    for (int neighbor : neighbors) {
+      if (neighbor == targetVertexId.get()) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  @Override
+  public int getNumOutEdges() {
+    return neighbors.length;
+  }
+
+  @Override
+  public void sendMsgToAllEdges(final IntWritable message) {
+    for (int neighbor : neighbors) {
+      sendMsg(new IntWritable(neighbor), message);
+    }
+  }
+
+  @Override
+  public Iterable<IntWritable> getMessages() {
+    return new Iterable<IntWritable>() {
+      @Override
+      public Iterator<IntWritable> iterator() {
+        return new UnmodifiableIntArrayIterator(messages);
+      }
+    };
+  }
+
+  @Override
+  public void putMessages(Iterable<IntWritable> newMessages) {
+    messages = new int[Iterables.size(newMessages)];
+    int n = 0;
+    for (IntWritable message : newMessages) {
+      messages[n++] = message.get();
+    }
+  }
+
+  @Override
+  void releaseResources() {
+    messages = new int[0];
+  }
+
+  @Override
+  public void write(final DataOutput out) throws IOException {
+    out.writeInt(id);
+    out.writeInt(value);
+    out.writeInt(neighbors.length);
+    for (int n = 0; n < neighbors.length; n++) {
+      out.writeInt(neighbors[n]);
+    }
+    out.writeInt(messages.length);
+    for (int n = 0; n < messages.length; n++) {
+      out.writeInt(messages[n]);
+    }
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    id = in.readInt();
+    value = in.readInt();
+    int numEdges = in.readInt();
+    neighbors = new int[numEdges];
+    for (int n = 0; n < numEdges; n++) {
+      neighbors[n] = in.readInt();
+    }
+    int numMessages = in.readInt();
+    messages = new int[numMessages];
+    for (int n = 0; n < numMessages; n++) {
+      messages[n] = in.readInt();
     }
+  }
 
 }



Mime
View raw message