giraph-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ni...@apache.org
Subject [9/51] [partial] GIRAPH-457: update module names (nitay)
Date Thu, 20 Dec 2012 04:25:30 GMT
http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph-core/src/main/java/org/apache/giraph/conf/GiraphClasses.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphClasses.java b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphClasses.java
new file mode 100644
index 0000000..d791e58
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphClasses.java
@@ -0,0 +1,560 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.conf;
+
+import org.apache.giraph.graph.AggregatorWriter;
+import org.apache.giraph.graph.Combiner;
+import org.apache.giraph.graph.DefaultMasterCompute;
+import org.apache.giraph.graph.DefaultVertexResolver;
+import org.apache.giraph.graph.DefaultWorkerContext;
+import org.apache.giraph.graph.EdgeInputFormat;
+import org.apache.giraph.graph.MasterCompute;
+import org.apache.giraph.graph.TextAggregatorWriter;
+import org.apache.giraph.graph.Vertex;
+import org.apache.giraph.graph.VertexInputFormat;
+import org.apache.giraph.graph.VertexOutputFormat;
+import org.apache.giraph.graph.VertexResolver;
+import org.apache.giraph.graph.WorkerContext;
+import org.apache.giraph.graph.partition.GraphPartitionerFactory;
+import org.apache.giraph.graph.partition.HashPartitionerFactory;
+import org.apache.giraph.graph.partition.Partition;
+import org.apache.giraph.graph.partition.SimplePartition;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+import java.util.List;
+
+/**
+ * Holder for classes used by Giraph.
+ *
+ * @param <I> Vertex ID class
+ * @param <V> Vertex Value class
+ * @param <E> Edge class
+ * @param <M> Message class
+ */
+public class GiraphClasses<I extends WritableComparable,
+    V extends Writable, E extends Writable, M extends Writable>
+    implements GiraphConstants {
+  /** Vertex class - cached for fast access */
+  protected Class<? extends Vertex<I, V, E, M>> vertexClass;
+  /** Vertex id class - cached for fast access */
+  protected Class<I> vertexIdClass;
+  /** Vertex value class - cached for fast access */
+  protected Class<V> vertexValueClass;
+  /** Edge value class - cached for fast access */
+  protected Class<E> edgeValueClass;
+  /** Message value class - cached for fast access */
+  protected Class<M> messageValueClass;
+
+  /** Graph partitioner factory class - cached for fast access */
+  protected Class<? extends GraphPartitionerFactory<I, V, E, M>>
+  graphPartitionerFactoryClass;
+
+  /** Vertex input format class - cached for fast access */
+  protected Class<? extends VertexInputFormat<I, V, E, M>>
+  vertexInputFormatClass;
+  /** Vertex output format class - cached for fast access */
+  protected Class<? extends VertexOutputFormat<I, V, E>>
+  vertexOutputFormatClass;
+  /** Edge input format class - cached for fast access */
+  protected Class<? extends EdgeInputFormat<I, E>>
+  edgeInputFormatClass;
+
+  /** Aggregator writer class - cached for fast access */
+  protected Class<? extends AggregatorWriter> aggregatorWriterClass;
+  /** Combiner class - cached for fast access */
+  protected Class<? extends Combiner<I, M>> combinerClass;
+
+  /** Vertex resolver class - cached for fast access */
+  protected Class<? extends VertexResolver<I, V, E, M>> vertexResolverClass;
+  /** Worker context class - cached for fast access */
+  protected Class<? extends WorkerContext> workerContextClass;
+  /** Master compute class - cached for fast access */
+  protected Class<? extends MasterCompute> masterComputeClass;
+
+  /** Partition class - cached for fast accesss */
+  protected Class<? extends Partition<I, V, E, M>> partitionClass;
+
+  /**
+   * Empty constructor. Initialize with classes all null.
+   */
+  public GiraphClasses() { }
+
+  /**
+   * Contructor that reads classes from a Configuration object.
+   *
+   * @param conf Configuration object to read from.
+   */
+  public GiraphClasses(Configuration conf) {
+    readFromConf(conf);
+  }
+
+  /**
+   * Read classes from Configuration.
+   *
+   * @param conf Configuration to read from.
+   */
+  public void readFromConf(Configuration conf) {
+    // set pre-validated generic parameter types into Configuration
+    vertexClass = (Class<? extends Vertex<I, V, E, M>>)
+        conf.getClass(VERTEX_CLASS, null, Vertex.class);
+    List<Class<?>> classList =
+        org.apache.giraph.utils.ReflectionUtils.<Vertex>getTypeArguments(
+            Vertex.class, vertexClass);
+    vertexIdClass = (Class<I>) classList.get(0);
+    vertexValueClass = (Class<V>) classList.get(1);
+    edgeValueClass = (Class<E>) classList.get(2);
+    messageValueClass = (Class<M>) classList.get(3);
+
+    graphPartitionerFactoryClass =
+        (Class<? extends GraphPartitionerFactory<I, V, E, M>>)
+        conf.getClass(GRAPH_PARTITIONER_FACTORY_CLASS,
+            HashPartitionerFactory.class,
+            GraphPartitionerFactory.class);
+
+    vertexInputFormatClass = (Class<? extends VertexInputFormat<I, V, E, M>>)
+        conf.getClass(VERTEX_INPUT_FORMAT_CLASS,
+        null, VertexInputFormat.class);
+    vertexOutputFormatClass = (Class<? extends VertexOutputFormat<I, V, E>>)
+        conf.getClass(VERTEX_OUTPUT_FORMAT_CLASS,
+        null, VertexOutputFormat.class);
+    edgeInputFormatClass = (Class<? extends EdgeInputFormat<I, E>>)
+        conf.getClass(EDGE_INPUT_FORMAT_CLASS,
+        null, EdgeInputFormat.class);
+
+    aggregatorWriterClass = conf.getClass(AGGREGATOR_WRITER_CLASS,
+        TextAggregatorWriter.class, AggregatorWriter.class);
+    combinerClass = (Class<? extends Combiner<I, M>>)
+        conf.getClass(VERTEX_COMBINER_CLASS, null, Combiner.class);
+    vertexResolverClass = (Class<? extends VertexResolver<I, V, E, M>>)
+        conf.getClass(VERTEX_RESOLVER_CLASS,
+        DefaultVertexResolver.class, VertexResolver.class);
+    workerContextClass = conf.getClass(WORKER_CONTEXT_CLASS,
+        DefaultWorkerContext.class, WorkerContext.class);
+    masterComputeClass =  conf.getClass(MASTER_COMPUTE_CLASS,
+        DefaultMasterCompute.class, MasterCompute.class);
+
+    partitionClass = (Class<? extends Partition<I, V, E, M>>)
+        conf.getClass(PARTITION_CLASS, SimplePartition.class);
+  }
+
+  /**
+   * Get Vertex class
+   *
+   * @return Vertex class.
+   */
+  public Class<? extends Vertex<I, V, E, M>> getVertexClass() {
+    return vertexClass;
+  }
+
+  /**
+   * Get Vertex ID class
+   *
+   * @return Vertex ID class
+   */
+  public Class<I> getVertexIdClass() {
+    return vertexIdClass;
+  }
+
+  /**
+   * Get Vertex Value class
+   *
+   * @return Vertex Value class
+   */
+  public Class<V> getVertexValueClass() {
+    return vertexValueClass;
+  }
+
+  /**
+   * Get Edge Value class
+   *
+   * @return Edge Value class
+   */
+  public Class<E> getEdgeValueClass() {
+    return edgeValueClass;
+  }
+
+  /**
+   * Get Message Value class
+   *
+   * @return Message Value class
+   */
+  public Class<M> getMessageValueClass() {
+    return messageValueClass;
+  }
+
+  /**
+   * Check if we GraphPartitionerFactory is set
+   *
+   * @return true if GraphPartitionerFactory is set
+   */
+  public boolean hasGraphPartitionerFactoryClass() {
+    return graphPartitionerFactoryClass != null;
+  }
+
+  /**
+   * Get the GraphPartitionerFactory
+   *
+   * @return GraphPartitionerFactory
+   */
+  public Class<? extends GraphPartitionerFactory<I, V, E, M>>
+  getGraphPartitionerFactoryClass() {
+    return graphPartitionerFactoryClass;
+  }
+
+  /**
+   * Check if VertexInputFormat class is set
+   *
+   * @return true if VertexInputFormat class is set
+   */
+  public boolean hasVertexInputFormat() {
+    return vertexInputFormatClass != null;
+  }
+
+  /**
+   * Get VertexInputFormat held
+   *
+   * @return VertexInputFormat
+   */
+  public Class<? extends VertexInputFormat<I, V, E, M>>
+  getVertexInputFormatClass() {
+    return vertexInputFormatClass;
+  }
+
+  /**
+   * Check if VertexOutputFormat is set
+   *
+   * @return true if VertexOutputFormat is set
+   */
+  public boolean hasVertexOutputFormat() {
+    return vertexOutputFormatClass != null;
+  }
+
+  /**
+   * Get VertexOutputFormat set
+   *
+   * @return VertexOutputFormat
+   */
+  public Class<? extends VertexOutputFormat<I, V, E>>
+  getVertexOutputFormatClass() {
+    return vertexOutputFormatClass;
+  }
+
+  /**
+   * Check if EdgeInputFormat is set
+   *
+   * @return true if EdgeInputFormat is set
+   */
+  public boolean hasEdgeInputFormat() {
+    return edgeInputFormatClass != null;
+  }
+
+  /**
+   * Get EdgeInputFormat used
+   *
+   * @return EdgeInputFormat
+   */
+  public Class<? extends EdgeInputFormat<I, E>> getEdgeInputFormatClass() {
+    return edgeInputFormatClass;
+  }
+
+  /**
+   * Check if AggregatorWriter is set
+   *
+   * @return true if AggregatorWriter is set
+   */
+  public boolean hasAggregatorWriterClass() {
+    return aggregatorWriterClass != null;
+  }
+
+  /**
+   * Get AggregatorWriter used
+   *
+   * @return AggregatorWriter
+   */
+  public Class<? extends AggregatorWriter> getAggregatorWriterClass() {
+    return aggregatorWriterClass;
+  }
+
+  /**
+   * Check if Combiner is set
+   *
+   * @return true if Combiner is set
+   */
+  public boolean hasCombinerClass() {
+    return combinerClass != null;
+  }
+
+  /**
+   * Get Combiner used
+   *
+   * @return Combiner
+   */
+  public Class<? extends Combiner<I, M>> getCombinerClass() {
+    return combinerClass;
+  }
+
+  /**
+   * Check if VertexResolver is set
+   *
+   * @return true if VertexResolver is set
+   */
+  public boolean hasVertexResolverClass() {
+    return vertexResolverClass != null;
+  }
+
+  /**
+   * Get VertexResolver used
+   *
+   * @return VertexResolver
+   */
+  public Class<? extends VertexResolver<I, V, E, M>> getVertexResolverClass() {
+    return vertexResolverClass;
+  }
+
+  /**
+   * Check if WorkerContext is set
+   *
+   * @return true if WorkerContext is set
+   */
+  public boolean hasWorkerContextClass() {
+    return workerContextClass != null;
+  }
+
+  /**
+   * Get WorkerContext used
+   *
+   * @return WorkerContext
+   */
+  public Class<? extends WorkerContext> getWorkerContextClass() {
+    return workerContextClass;
+  }
+
+  /**
+   * Check if MasterCompute is set
+   *
+   * @return true MasterCompute is set
+   */
+  public boolean hasMasterComputeClass() {
+    return masterComputeClass != null;
+  }
+
+  /**
+   * Get MasterCompute used
+   *
+   * @return MasterCompute
+   */
+  public Class<? extends MasterCompute> getMasterComputeClass() {
+    return masterComputeClass;
+  }
+
+  /**
+   * Check if Partition is set
+   *
+   * @return true if Partition is set
+   */
+  public boolean hasPartitionClass() {
+    return partitionClass != null;
+  }
+
+  /**
+   * Get Partition
+   *
+   * @return Partition
+   */
+  public Class<? extends Partition<I, V, E, M>> getPartitionClass() {
+    return partitionClass;
+  }
+
+  /**
+   * Set Vertex class held
+   *
+   * @param vertexClass Vertex class to set
+   * @return this
+   */
+  public GiraphClasses setVertexClass(
+      Class<? extends Vertex<I, V, E, M>> vertexClass) {
+    this.vertexClass = vertexClass;
+    return this;
+  }
+
+  /**
+   * Set Vertex ID class held
+   *
+   * @param vertexIdClass Vertex ID to set
+   * @return this
+   */
+  public GiraphClasses setVertexIdClass(Class<I> vertexIdClass) {
+    this.vertexIdClass = vertexIdClass;
+    return this;
+  }
+
+  /**
+   * Set Vertex Value class held
+   *
+   * @param vertexValueClass Vertex Value class to set
+   * @return this
+   */
+  public GiraphClasses setVertexValueClass(Class<V> vertexValueClass) {
+    this.vertexValueClass = vertexValueClass;
+    return this;
+  }
+
+  /**
+   * Set Edge Value class held
+   *
+   * @param edgeValueClass Edge Value class to set
+   * @return this
+   */
+  public GiraphClasses setEdgeValueClass(Class<E> edgeValueClass) {
+    this.edgeValueClass = edgeValueClass;
+    return this;
+  }
+
+  /**
+   * Set Message Value class held
+   *
+   * @param messageValueClass Message Value class to set
+   * @return this
+   */
+  public GiraphClasses setMessageValueClass(Class<M> messageValueClass) {
+    this.messageValueClass = messageValueClass;
+    return this;
+  }
+
+  /**
+   * Set GraphPartitionerFactory class held
+   *
+   * @param klass GraphPartitionerFactory to set
+   * @return this
+   */
+  public GiraphClasses setGraphPartitionerFactoryClass(
+      Class<? extends GraphPartitionerFactory<I, V, E, M>> klass) {
+    this.graphPartitionerFactoryClass = klass;
+    return this;
+  }
+
+  /**
+   * Set VertexInputFormat held
+   *
+   * @param vertexInputFormatClass VertexInputFormat to set
+   * @return this
+   */
+  public GiraphClasses setVertexInputFormatClass(
+      Class<? extends VertexInputFormat<I, V, E, M>> vertexInputFormatClass) {
+    this.vertexInputFormatClass = vertexInputFormatClass;
+    return this;
+  }
+
+  /**
+   * Set VertexOutputFormat held
+   *
+   * @param vertexOutputFormatClass VertexOutputFormat to set
+   * @return this
+   */
+  public GiraphClasses setVertexOutputFormatClass(
+      Class<? extends VertexOutputFormat<I, V, E>> vertexOutputFormatClass) {
+    this.vertexOutputFormatClass = vertexOutputFormatClass;
+    return this;
+  }
+
+  /**
+   * Set EdgeInputFormat class held
+   *
+   * @param edgeInputFormatClass EdgeInputFormat to set
+   * @return this
+   */
+  public GiraphClasses setEdgeInputFormatClass(
+      Class<? extends EdgeInputFormat<I, E>> edgeInputFormatClass) {
+    this.edgeInputFormatClass = edgeInputFormatClass;
+    return this;
+  }
+
+  /**
+   * Set AggregatorWriter class used
+   *
+   * @param aggregatorWriterClass AggregatorWriter to set
+   * @return this
+   */
+  public GiraphClasses setAggregatorWriterClass(
+      Class<? extends AggregatorWriter> aggregatorWriterClass) {
+    this.aggregatorWriterClass = aggregatorWriterClass;
+    return this;
+  }
+
+  /**
+   * Set Combiner class used
+   *
+   * @param combinerClass Combiner class to set
+   * @return this
+   */
+  public GiraphClasses setCombinerClass(
+      Class<? extends Combiner<I, M>> combinerClass) {
+    this.combinerClass = combinerClass;
+    return this;
+  }
+
+  /**
+   * Set VertexResolver used
+   *
+   * @param vertexResolverClass VertexResolver to set
+   * @return this
+   */
+  public GiraphClasses setVertexResolverClass(
+      Class<? extends VertexResolver<I, V, E, M>> vertexResolverClass) {
+    this.vertexResolverClass = vertexResolverClass;
+    return this;
+  }
+
+  /**
+   * Set WorkerContext used
+   *
+   * @param workerContextClass WorkerContext class to set
+   * @return this
+   */
+  public GiraphClasses setWorkerContextClass(
+      Class<? extends WorkerContext> workerContextClass) {
+    this.workerContextClass = workerContextClass;
+    return this;
+  }
+
+  /**
+   * Set MasterCompute class used
+   *
+   * @param masterComputeClass MasterCompute class to set
+   * @return this
+   */
+  public GiraphClasses setMasterComputeClass(
+      Class<? extends MasterCompute> masterComputeClass) {
+    this.masterComputeClass = masterComputeClass;
+    return this;
+  }
+
+  /**
+   * Set Partition class to use
+   *
+   * @param partitionClass Partition class to set
+   * @return this
+   */
+  public GiraphClasses setPartitionClass(
+      Class<? extends Partition<I, V, E, M>> partitionClass) {
+    this.partitionClass = partitionClass;
+    return this;
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
new file mode 100644
index 0000000..3e14aad
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
@@ -0,0 +1,615 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.conf;
+
+import org.apache.giraph.graph.AggregatorWriter;
+import org.apache.giraph.graph.Combiner;
+import org.apache.giraph.graph.EdgeInputFormat;
+import org.apache.giraph.graph.MasterCompute;
+import org.apache.giraph.graph.Vertex;
+import org.apache.giraph.graph.VertexInputFormat;
+import org.apache.giraph.graph.VertexOutputFormat;
+import org.apache.giraph.graph.VertexResolver;
+import org.apache.giraph.graph.WorkerContext;
+import org.apache.giraph.graph.partition.GraphPartitionerFactory;
+import org.apache.giraph.graph.partition.Partition;
+import org.apache.giraph.master.MasterObserver;
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * Adds user methods specific to Giraph.  This will be put into an
+ * ImmutableClassesGiraphConfiguration that provides the configuration plus
+ * the immutable classes.
+ */
+public class GiraphConfiguration extends Configuration
+    implements GiraphConstants {
+  /**
+   * Constructor that creates the configuration
+   */
+  public GiraphConfiguration() { }
+
+  /**
+   * Constructor.
+   *
+   * @param conf Configuration
+   */
+  public GiraphConfiguration(Configuration conf) {
+    super(conf);
+  }
+
+  /**
+   * Set the vertex class (required)
+   *
+   * @param vertexClass Runs vertex computation
+   */
+  public final void setVertexClass(
+      Class<? extends Vertex> vertexClass) {
+    setClass(VERTEX_CLASS, vertexClass, Vertex.class);
+  }
+
+  /**
+   * Set the vertex input format class (required)
+   *
+   * @param vertexInputFormatClass Determines how graph is input
+   */
+  public final void setVertexInputFormatClass(
+      Class<? extends VertexInputFormat> vertexInputFormatClass) {
+    setClass(VERTEX_INPUT_FORMAT_CLASS,
+        vertexInputFormatClass,
+        VertexInputFormat.class);
+  }
+
+  /**
+   * Set the edge input format class (required)
+   *
+   * @param edgeInputFormatClass Determines how graph is input
+   */
+  public final void setEdgeInputFormatClass(
+      Class<? extends EdgeInputFormat> edgeInputFormatClass) {
+    setClass(EDGE_INPUT_FORMAT_CLASS,
+        edgeInputFormatClass,
+        EdgeInputFormat.class);
+  }
+
+  /**
+   * Set the master class (optional)
+   *
+   * @param masterComputeClass Runs master computation
+   */
+  public final void setMasterComputeClass(
+      Class<? extends MasterCompute> masterComputeClass) {
+    setClass(MASTER_COMPUTE_CLASS, masterComputeClass,
+        MasterCompute.class);
+  }
+
+  /**
+   * Add a MasterObserver class (optional)
+   *
+   * @param masterObserverClass MasterObserver class to add.
+   */
+  public final void addMasterObserverClass(
+      Class<? extends MasterObserver> masterObserverClass) {
+    addToClasses(MASTER_OBSERVER_CLASSES, masterObserverClass,
+        MasterObserver.class);
+  }
+
+  /**
+   * Add a class to a property that is a list of classes. If the property does
+   * not exist it will be created.
+   *
+   * @param name String name of property.
+   * @param klass interface of the class being set.
+   * @param xface Class to add to the list.
+   */
+  public final void addToClasses(String name, Class<?> klass, Class<?> xface) {
+    if (!xface.isAssignableFrom(klass)) {
+      throw new RuntimeException(klass + " does not implement " +
+          xface.getName());
+    }
+    String value;
+    String klasses = get(name);
+    if (klasses == null) {
+      value = klass.getName();
+    } else {
+      value = klasses + "," + klass.getName();
+    }
+    set(name, value);
+  }
+
+  /**
+   * Set mapping from a key name to a list of classes.
+   *
+   * @param name String key name to use.
+   * @param xface interface of the classes being set.
+   * @param klasses Classes to set.
+   */
+  public final void setClasses(String name, Class<?> xface,
+                               Class<?> ... klasses) {
+    String[] klassNames = new String[klasses.length];
+    for (int i = 0; i < klasses.length; ++i) {
+      Class<?> klass = klasses[i];
+      if (!xface.isAssignableFrom(klass)) {
+        throw new RuntimeException(klass + " does not implement " +
+            xface.getName());
+      }
+      klassNames[i] = klasses[i].getName();
+    }
+    setStrings(name, klassNames);
+  }
+
+  /**
+   * Get classes from a property that all implement a given interface.
+   *
+   * @param name String name of property to fetch.
+   * @param xface interface classes must implement.
+   * @param defaultValue If not found, return this
+   * @param <T> Generic type of interface class
+   * @return array of Classes implementing interface specified.
+   */
+  public final <T> Class<? extends T>[] getClassesOfType(String name,
+      Class<T> xface, Class<? extends T> ... defaultValue) {
+    Class<?>[] klasses = getClasses(name, defaultValue);
+    for (Class<?> klass : klasses) {
+      if (!xface.isAssignableFrom(klass)) {
+        throw new RuntimeException(klass + " is not assignable from " +
+            xface.getName());
+      }
+    }
+    return (Class<? extends T>[]) klasses;
+  }
+
+  /**
+   * Set the vertex output format class (optional)
+   *
+   * @param vertexOutputFormatClass Determines how graph is output
+   */
+  public final void setVertexOutputFormatClass(
+      Class<? extends VertexOutputFormat> vertexOutputFormatClass) {
+    setClass(VERTEX_OUTPUT_FORMAT_CLASS,
+        vertexOutputFormatClass,
+        VertexOutputFormat.class);
+  }
+
+  /**
+   * Set the vertex combiner class (optional)
+   *
+   * @param vertexCombinerClass Determines how vertex messages are combined
+   */
+  public final void setVertexCombinerClass(
+      Class<? extends Combiner> vertexCombinerClass) {
+    setClass(VERTEX_COMBINER_CLASS,
+        vertexCombinerClass,
+        Combiner.class);
+  }
+
+  /**
+   * Set the graph partitioner class (optional)
+   *
+   * @param graphPartitionerFactoryClass Determines how the graph is partitioned
+   */
+  public final void setGraphPartitionerFactoryClass(
+      Class<?> graphPartitionerFactoryClass) {
+    setClass(GRAPH_PARTITIONER_FACTORY_CLASS,
+        graphPartitionerFactoryClass,
+        GraphPartitionerFactory.class);
+  }
+
+  /**
+   * Set the vertex resolver class (optional)
+   *
+   * @param vertexResolverClass Determines how vertex mutations are resolved
+   */
+  public final void setVertexResolverClass(
+      Class<? extends VertexResolver> vertexResolverClass) {
+    setClass(VERTEX_RESOLVER_CLASS, vertexResolverClass, VertexResolver.class);
+  }
+
+  /**
+   * Whether to create a vertex that doesn't exist when it receives messages.
+   * This only affects DefaultVertexResolver.
+   *
+   * @return true if we should create non existent vertices that get messages.
+   */
+  public final boolean getResolverCreateVertexOnMessages() {
+    return getBoolean(RESOLVER_CREATE_VERTEX_ON_MSGS, true);
+  }
+
+  /**
+   * Set whether to create non existent vertices when they receive messages.
+   *
+   * @param v true if we should create vertices when they get messages.
+   */
+  public final void setResolverCreateVertexOnMessages(boolean v) {
+    setBoolean(RESOLVER_CREATE_VERTEX_ON_MSGS, v);
+  }
+
+  /**
+   * Set the worker context class (optional)
+   *
+   * @param workerContextClass Determines what code is executed on a each
+   *        worker before and after each superstep and computation
+   */
+  public final void setWorkerContextClass(
+      Class<? extends WorkerContext> workerContextClass) {
+    setClass(WORKER_CONTEXT_CLASS,
+        workerContextClass,
+        WorkerContext.class);
+  }
+
+  /**
+   * Set the aggregator writer class (optional)
+   *
+   * @param aggregatorWriterClass Determines how the aggregators are
+   *        written to file at the end of the job
+   */
+  public final void setAggregatorWriterClass(
+      Class<? extends AggregatorWriter> aggregatorWriterClass) {
+    setClass(AGGREGATOR_WRITER_CLASS,
+        aggregatorWriterClass,
+        AggregatorWriter.class);
+  }
+
+  /**
+   * Set the partition class (optional)
+   *
+   * @param partitionClass Determines the why partitions are stored
+   */
+  public final void setPartitionClass(
+      Class<? extends Partition> partitionClass) {
+    setClass(PARTITION_CLASS,
+        partitionClass,
+        Partition.class);
+  }
+
+  /**
+   * Set worker configuration for determining what is required for
+   * a superstep.
+   *
+   * @param minWorkers Minimum workers to do a superstep
+   * @param maxWorkers Maximum workers to do a superstep
+   *        (max map tasks in job)
+   * @param minPercentResponded 0 - 100 % of the workers required to
+   *        have responded before continuing the superstep
+   */
+  public final void setWorkerConfiguration(int minWorkers,
+                                           int maxWorkers,
+                                           float minPercentResponded) {
+    setInt(MIN_WORKERS, minWorkers);
+    setInt(MAX_WORKERS, maxWorkers);
+    setFloat(MIN_PERCENT_RESPONDED, minPercentResponded);
+  }
+
+  public final int getMinWorkers() {
+    return getInt(MIN_WORKERS, -1);
+  }
+
+  public final int getMaxWorkers() {
+    return getInt(MAX_WORKERS, -1);
+  }
+
+  public final float getMinPercentResponded() {
+    return getFloat(MIN_PERCENT_RESPONDED, MIN_PERCENT_RESPONDED_DEFAULT);
+  }
+
+  /**
+   * Utilize an existing ZooKeeper service.  If this is not set, ZooKeeper
+   * will be dynamically started by Giraph for this job.
+   *
+   * @param serverList Comma separated list of servers and ports
+   *        (i.e. zk1:2221,zk2:2221)
+   */
+  public final void setZooKeeperConfiguration(String serverList) {
+    set(ZOOKEEPER_LIST, serverList);
+  }
+
+  public final boolean getSplitMasterWorker() {
+    return getBoolean(SPLIT_MASTER_WORKER, SPLIT_MASTER_WORKER_DEFAULT);
+  }
+
+  /**
+   * Get array of MasterObserver classes set in the configuration.
+   *
+   * @return array of MasterObserver classes.
+   */
+  public Class<? extends MasterObserver>[] getMasterObserverClasses() {
+    return getClassesOfType(MASTER_OBSERVER_CLASSES, MasterObserver.class);
+  }
+
+  /**
+   * Whether to track, print, and aggregate metrics.
+   *
+   * @return true if metrics are enabled, false otherwise (default)
+   */
+  public boolean metricsEnabled() {
+    return getBoolean(METRICS_ENABLE, false);
+  }
+
+  /**
+   * Get the task partition
+   *
+   * @return The task partition or -1 if not set
+   */
+  public int getTaskPartition() {
+    return getInt("mapred.task.partition", -1);
+  }
+
+  /**
+   * Get the ZooKeeper list.
+   *
+   * @return ZooKeeper list of strings, comma separated or null if none set.
+   */
+  public String getZookeeperList() {
+    return get(ZOOKEEPER_LIST);
+  }
+
+  public String getLocalLevel() {
+    return get(LOG_LEVEL, LOG_LEVEL_DEFAULT);
+  }
+
+  /**
+   * Use the log thread layout option?
+   *
+   * @return True if use the log thread layout option, false otherwise
+   */
+  public boolean useLogThreadLayout() {
+    return getBoolean(LOG_THREAD_LAYOUT, LOG_THREAD_LAYOUT_DEFAULT);
+  }
+
+  public boolean getLocalTestMode() {
+    return getBoolean(LOCAL_TEST_MODE, LOCAL_TEST_MODE_DEFAULT);
+  }
+
+  public int getZooKeeperServerCount() {
+    return getInt(ZOOKEEPER_SERVER_COUNT,
+        ZOOKEEPER_SERVER_COUNT_DEFAULT);
+  }
+
+  /**
+   * Set the ZooKeeper jar classpath
+   *
+   * @param classPath Classpath for the ZooKeeper jar
+   */
+  public void setZooKeeperJar(String classPath) {
+    set(ZOOKEEPER_JAR, classPath);
+  }
+
+  public int getZooKeeperSessionTimeout() {
+    return getInt(ZOOKEEPER_SESSION_TIMEOUT,
+        ZOOKEEPER_SESSION_TIMEOUT_DEFAULT);
+  }
+
+  public int getZookeeperOpsMaxAttempts() {
+    return getInt(ZOOKEEPER_OPS_MAX_ATTEMPTS,
+        ZOOKEEPER_OPS_MAX_ATTEMPTS_DEFAULT);
+  }
+
+  public int getZookeeperOpsRetryWaitMsecs() {
+    return getInt(ZOOKEEPER_OPS_RETRY_WAIT_MSECS,
+        ZOOKEEPER_OPS_RETRY_WAIT_MSECS_DEFAULT);
+  }
+
+  public boolean getNettyServerUseExecutionHandler() {
+    return getBoolean(NETTY_SERVER_USE_EXECUTION_HANDLER,
+        NETTY_SERVER_USE_EXECUTION_HANDLER_DEFAULT);
+  }
+
+  public int getNettyServerThreads() {
+    return getInt(NETTY_SERVER_THREADS, NETTY_SERVER_THREADS_DEFAULT);
+  }
+
+  public int getNettyServerExecutionThreads() {
+    return getInt(NETTY_SERVER_EXECUTION_THREADS,
+        NETTY_SERVER_EXECUTION_THREADS_DEFAULT);
+  }
+
+  /**
+   * Get the netty server execution concurrency.  This depends on whether the
+   * netty server execution handler exists.
+   *
+   * @return Server concurrency
+   */
+  public int getNettyServerExecutionConcurrency() {
+    if (getNettyServerUseExecutionHandler()) {
+      return getNettyServerExecutionThreads();
+    } else {
+      return getNettyServerThreads();
+    }
+  }
+
+  public int getZookeeperConnectionAttempts() {
+    return getInt(ZOOKEEPER_CONNECTION_ATTEMPTS,
+        ZOOKEEPER_CONNECTION_ATTEMPTS_DEFAULT);
+  }
+
+  public int getZooKeeperMinSessionTimeout() {
+    return getInt(ZOOKEEPER_MIN_SESSION_TIMEOUT,
+        DEFAULT_ZOOKEEPER_MIN_SESSION_TIMEOUT);
+  }
+
+  public int getZooKeeperMaxSessionTimeout() {
+    return getInt(ZOOKEEPER_MAX_SESSION_TIMEOUT,
+        DEFAULT_ZOOKEEPER_MAX_SESSION_TIMEOUT);
+  }
+
+  public String getZooKeeperForceSync() {
+    return get(ZOOKEEPER_FORCE_SYNC, DEFAULT_ZOOKEEPER_FORCE_SYNC);
+  }
+
+  public String getZooKeeperSkipAcl() {
+    return get(ZOOKEEPER_SKIP_ACL, DEFAULT_ZOOKEEPER_SKIP_ACL);
+  }
+
+  /**
+   * Get the number of map tasks in this job
+   *
+   * @return Number of map tasks in this job
+   */
+  public int getMapTasks() {
+    int mapTasks = getInt("mapred.map.tasks", -1);
+    if (mapTasks == -1) {
+      throw new IllegalStateException("getMapTasks: Failed to get the map " +
+          "tasks!");
+    }
+    return mapTasks;
+  }
+
+  /**
+   * Use authentication? (if supported)
+   *
+   * @return True if should authenticate, false otherwise
+   */
+  public boolean authenticate() {
+    return getBoolean(AUTHENTICATE, DEFAULT_AUTHENTICATE);
+  }
+
+  /**
+   * Set the number of compute threads
+   *
+   * @param numComputeThreads Number of compute threads to use
+   */
+  public void setNumComputeThreads(int numComputeThreads) {
+    setInt(NUM_COMPUTE_THREADS, numComputeThreads);
+  }
+
+  public int getNumComputeThreads() {
+    return getInt(NUM_COMPUTE_THREADS, NUM_COMPUTE_THREADS_DEFAULT);
+  }
+
+  /**
+   * Set the number of input split threads
+   *
+   * @param numInputSplitsThreads Number of input split threads to use
+   */
+  public void setNumInputSplitsThreads(int numInputSplitsThreads) {
+    setInt(NUM_INPUT_SPLITS_THREADS, numInputSplitsThreads);
+  }
+
+  public int getNumInputSplitsThreads() {
+    return getInt(NUM_INPUT_SPLITS_THREADS, NUM_INPUT_SPLITS_THREADS_DEFAULT);
+  }
+
+  public long getInputSplitMaxVertices() {
+    return getLong(INPUT_SPLIT_MAX_VERTICES, INPUT_SPLIT_MAX_VERTICES_DEFAULT);
+  }
+
+  public long getInputSplitMaxEdges() {
+    return getLong(INPUT_SPLIT_MAX_EDGES, INPUT_SPLIT_MAX_EDGES_DEFAULT);
+  }
+
+  /**
+   * Set whether to use unsafe serialization
+   *
+   * @param useUnsafeSerialization If true, use unsafe serialization
+   */
+  public void useUnsafeSerialization(boolean useUnsafeSerialization) {
+    setBoolean(USE_UNSAFE_SERIALIZATION, useUnsafeSerialization);
+  }
+
+  /**
+   * Use message size encoding?  This feature may help with complex message
+   * objects.
+   *
+   * @return Whether to use message size encoding
+   */
+  public boolean useMessageSizeEncoding() {
+    return getBoolean(
+        USE_MESSAGE_SIZE_ENCODING, USE_MESSAGE_SIZE_ENCODING_DEFAULT);
+  }
+
+  /**
+   * Set the checkpoint frequeuncy of how many supersteps to wait before
+   * checkpointing
+   *
+   * @param checkpointFrequency How often to checkpoint (0 means never)
+   */
+  public void setCheckpointFrequency(int checkpointFrequency) {
+    setInt(CHECKPOINT_FREQUENCY, checkpointFrequency);
+  }
+
+  /**
+   * Get the checkpoint frequeuncy of how many supersteps to wait
+   * before checkpointing
+   *
+   * @return Checkpoint frequency (0 means never)
+   */
+  public int getCheckpointFrequency() {
+    return getInt(CHECKPOINT_FREQUENCY, CHECKPOINT_FREQUENCY_DEFAULT);
+  }
+
+  /**
+   * Set the max task attempts
+   *
+   * @param maxTaskAttempts Max task attempts to use
+   */
+  public void setMaxTaskAttempts(int maxTaskAttempts) {
+    setInt(MAX_TASK_ATTEMPTS, maxTaskAttempts);
+  }
+
+  /**
+   * Get the max task attempts
+   *
+   * @return Max task attempts or -1, if not set
+   */
+  public int getMaxTaskAttempts() {
+    return getInt(MAX_TASK_ATTEMPTS, -1);
+  }
+
+  /**
+   * Get the number of milliseconds to wait for an event before continuing on
+   *
+   * @return Number of milliseconds to wait for an event before continuing on
+   */
+  public int getEventWaitMsecs() {
+    return getInt(EVENT_WAIT_MSECS, EVENT_WAIT_MSECS_DEFAULT);
+  }
+
+  /**
+   * Set the number of milliseconds to wait for an event before continuing on
+   *
+   * @param eventWaitMsecs Number of milliseconds to wait for an event before
+   *                       continuing on
+   */
+  public void setEventWaitMsecs(int eventWaitMsecs) {
+    setInt(EVENT_WAIT_MSECS, eventWaitMsecs);
+  }
+
+  /**
+   * Get the maximum milliseconds to wait before giving up trying to get the
+   * minimum number of workers before a superstep.
+   *
+   * @return Maximum milliseconds to wait before giving up trying to get the
+   *         minimum number of workers before a superstep
+   */
+  public int getMaxMasterSuperstepWaitMsecs() {
+    return getInt(MAX_MASTER_SUPERSTEP_WAIT_MSECS,
+        MAX_MASTER_SUPERSTEP_WAIT_MSECS_DEFAULT);
+  }
+
+  /**
+   * Set the maximum milliseconds to wait before giving up trying to get the
+   * minimum number of workers before a superstep.
+   *
+   * @param maxMasterSuperstepWaitMsecs Maximum milliseconds to wait before
+   *                                    giving up trying to get the minimum
+   *                                    number of workers before a superstep
+   */
+  public void setMaxMasterSuperstepWaitMsecs(int maxMasterSuperstepWaitMsecs) {
+    setInt(MAX_MASTER_SUPERSTEP_WAIT_MSECS, maxMasterSuperstepWaitMsecs);
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
new file mode 100644
index 0000000..11d4a41
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
@@ -0,0 +1,569 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.conf;
+
+/**
+ * Constants used all over Giraph for configuration.
+ */
+// CHECKSTYLE: stop InterfaceIsTypeCheck
+public interface GiraphConstants {
+  /** Vertex class - required */
+  String VERTEX_CLASS = "giraph.vertexClass";
+
+  /** Class for Master - optional */
+  String MASTER_COMPUTE_CLASS = "giraph.masterComputeClass";
+  /** Classes for Observer Master - optional */
+  String MASTER_OBSERVER_CLASSES = "giraph.master.observers";
+  /** Vertex combiner class - optional */
+  String VERTEX_COMBINER_CLASS = "giraph.combinerClass";
+  /** Vertex resolver class - optional */
+  String VERTEX_RESOLVER_CLASS = "giraph.vertexResolverClass";
+  /**
+   * Option of whether to create vertexes that were not existent before but
+   * received messages
+   */
+  String RESOLVER_CREATE_VERTEX_ON_MSGS =
+      "giraph.vertex.resolver.create.on.msgs";
+  /** Graph partitioner factory class - optional */
+  String GRAPH_PARTITIONER_FACTORY_CLASS =
+      "giraph.graphPartitionerFactoryClass";
+
+  // At least one of the input format classes is required.
+  /** VertexInputFormat class */
+  String VERTEX_INPUT_FORMAT_CLASS = "giraph.vertexInputFormatClass";
+  /** EdgeInputFormat class */
+  String EDGE_INPUT_FORMAT_CLASS = "giraph.edgeInputFormatClass";
+
+  /** VertexOutputFormat class */
+  String VERTEX_OUTPUT_FORMAT_CLASS = "giraph.vertexOutputFormatClass";
+
+  /** Vertex index class */
+  String VERTEX_ID_CLASS = "giraph.vertexIdClass";
+  /** Vertex value class */
+  String VERTEX_VALUE_CLASS = "giraph.vertexValueClass";
+  /** Edge value class */
+  String EDGE_VALUE_CLASS = "giraph.edgeValueClass";
+  /** Message value class */
+  String MESSAGE_VALUE_CLASS = "giraph.messageValueClass";
+  /** Worker context class */
+  String WORKER_CONTEXT_CLASS = "giraph.workerContextClass";
+  /** AggregatorWriter class - optional */
+  String AGGREGATOR_WRITER_CLASS = "giraph.aggregatorWriterClass";
+
+  /** Partition class - optional */
+  String PARTITION_CLASS = "giraph.partitionClass";
+
+  /**
+   * Minimum number of simultaneous workers before this job can run (int)
+   */
+  String MIN_WORKERS = "giraph.minWorkers";
+  /**
+   * Maximum number of simultaneous worker tasks started by this job (int).
+   */
+  String MAX_WORKERS = "giraph.maxWorkers";
+
+  /**
+   * Separate the workers and the master tasks.  This is required
+   * to support dynamic recovery. (boolean)
+   */
+  String SPLIT_MASTER_WORKER = "giraph.SplitMasterWorker";
+  /**
+   * Default on whether to separate the workers and the master tasks.
+   * Needs to be "true" to support dynamic recovery.
+   */
+  boolean SPLIT_MASTER_WORKER_DEFAULT = true;
+
+  /** Indicates whether this job is run in an internal unit test */
+  String LOCAL_TEST_MODE = "giraph.localTestMode";
+
+  /** not in local test mode per default */
+  boolean LOCAL_TEST_MODE_DEFAULT = false;
+
+  /** Override the Hadoop log level and set the desired log level. */
+  String LOG_LEVEL = "giraph.logLevel";
+  /** Default log level is INFO (same as Hadoop) */
+  String LOG_LEVEL_DEFAULT = "info";
+
+  /** Use thread level debugging? */
+  String LOG_THREAD_LAYOUT = "giraph.logThreadLayout";
+  /** Default to not use thread-level debugging */
+  boolean LOG_THREAD_LAYOUT_DEFAULT = false;
+
+  /**
+   * Minimum percent of the maximum number of workers that have responded
+   * in order to continue progressing. (float)
+   */
+  String MIN_PERCENT_RESPONDED = "giraph.minPercentResponded";
+  /** Default 100% response rate for workers */
+  float MIN_PERCENT_RESPONDED_DEFAULT = 100.0f;
+
+  /** Enable the Metrics system **/
+  String METRICS_ENABLE = "giraph.metrics.enable";
+
+  /**
+   *  ZooKeeper comma-separated list (if not set,
+   *  will start up ZooKeeper locally)
+   */
+  String ZOOKEEPER_LIST = "giraph.zkList";
+
+  /** ZooKeeper session millisecond timeout */
+  String ZOOKEEPER_SESSION_TIMEOUT = "giraph.zkSessionMsecTimeout";
+  /** Default Zookeeper session millisecond timeout */
+  int ZOOKEEPER_SESSION_TIMEOUT_DEFAULT = 60 * 1000;
+
+  /** Polling interval to check for the ZooKeeper server data */
+  String ZOOKEEPER_SERVERLIST_POLL_MSECS = "giraph.zkServerlistPollMsecs";
+  /** Default polling interval to check for the ZooKeeper server data */
+  int ZOOKEEPER_SERVERLIST_POLL_MSECS_DEFAULT = 3 * 1000;
+
+  /** Number of nodes (not tasks) to run Zookeeper on */
+  String ZOOKEEPER_SERVER_COUNT = "giraph.zkServerCount";
+  /** Default number of nodes to run Zookeeper on */
+  int ZOOKEEPER_SERVER_COUNT_DEFAULT = 1;
+
+  /** ZooKeeper port to use */
+  String ZOOKEEPER_SERVER_PORT = "giraph.zkServerPort";
+  /** Default ZooKeeper port to use */
+  int ZOOKEEPER_SERVER_PORT_DEFAULT = 22181;
+
+  /** Location of the ZooKeeper jar - Used internally, not meant for users */
+  String ZOOKEEPER_JAR = "giraph.zkJar";
+
+  /** Local ZooKeeper directory to use */
+  String ZOOKEEPER_DIR = "giraph.zkDir";
+
+  /** Max attempts for handling ZooKeeper connection loss */
+  String ZOOKEEPER_OPS_MAX_ATTEMPTS = "giraph.zkOpsMaxAttempts";
+  /** Default of 3 attempts for handling ZooKeeper connection loss */
+  int ZOOKEEPER_OPS_MAX_ATTEMPTS_DEFAULT = 3;
+
+  /**
+   * Msecs to wait before retrying a failed ZooKeeper op due to connection
+   * loss.
+   */
+  String ZOOKEEPER_OPS_RETRY_WAIT_MSECS = "giraph.zkOpsRetryWaitMsecs";
+  /**
+   * Default to wait 5 seconds before retrying a failed ZooKeeper op due to
+   * connection loss.
+   */
+  int ZOOKEEPER_OPS_RETRY_WAIT_MSECS_DEFAULT = 5 * 1000;
+
+  /** TCP backlog (defaults to number of workers) */
+  String TCP_BACKLOG = "giraph.tcpBacklog";
+  /**
+   * Default TCP backlog default if the number of workers is not specified
+   * (i.e unittests)
+   */
+  int TCP_BACKLOG_DEFAULT = 1;
+
+  /** How big to make the default buffer? */
+  String NETTY_REQUEST_ENCODER_BUFFER_SIZE =
+      "giraph.nettyRequestEncoderBufferSize";
+  /** Start with 32K */
+  int NETTY_REQUEST_ENCODER_BUFFER_SIZE_DEFAULT = 32 * 1024;
+
+  /** Netty client threads */
+  String NETTY_CLIENT_THREADS = "giraph.nettyClientThreads";
+  /** Default is 4 */
+  int NETTY_CLIENT_THREADS_DEFAULT = 4;
+
+  /** Netty server threads */
+  String NETTY_SERVER_THREADS = "giraph.nettyServerThreads";
+  /** Default is 16 */
+  int NETTY_SERVER_THREADS_DEFAULT = 16;
+
+  /** Use the execution handler in netty on the client? */
+  String NETTY_CLIENT_USE_EXECUTION_HANDLER =
+      "giraph.nettyClientUseExecutionHandler";
+  /** Use the execution handler in netty on the client - default true */
+  boolean NETTY_CLIENT_USE_EXECUTION_HANDLER_DEFAULT = true;
+
+  /** Netty client execution threads (execution handler) */
+  String NETTY_CLIENT_EXECUTION_THREADS =
+      "giraph.nettyClientExecutionThreads";
+  /** Default Netty client execution threads (execution handler) of 8 */
+  int NETTY_CLIENT_EXECUTION_THREADS_DEFAULT = 8;
+
+  /** Where to place the netty client execution handle? */
+  String NETTY_CLIENT_EXECUTION_AFTER_HANDLER =
+      "giraph.nettyClientExecutionAfterHandler";
+  /**
+   * Default is to use the netty client execution handle after the request
+   * encoder.
+   */
+  String NETTY_CLIENT_EXECUTION_AFTER_HANDLER_DEFAULT = "requestEncoder";
+
+  /** Use the execution handler in netty on the server? */
+  String NETTY_SERVER_USE_EXECUTION_HANDLER =
+      "giraph.nettyServerUseExecutionHandler";
+  /** Use the execution handler in netty on the server - default true */
+  boolean NETTY_SERVER_USE_EXECUTION_HANDLER_DEFAULT = true;
+
+  /** Netty server execution threads (execution handler) */
+  String NETTY_SERVER_EXECUTION_THREADS = "giraph.nettyServerExecutionThreads";
+  /** Default Netty server execution threads (execution handler) of 8 */
+  int NETTY_SERVER_EXECUTION_THREADS_DEFAULT = 8;
+
+  /** Where to place the netty server execution handle? */
+  String NETTY_SERVER_EXECUTION_AFTER_HANDLER =
+      "giraph.nettyServerExecutionAfterHandler";
+  /**
+   * Default is to use the netty server execution handle after the request
+   * frame decoder.
+   */
+  String NETTY_SERVER_EXECUTION_AFTER_HANDLER_DEFAULT = "requestFrameDecoder";
+
+  /** Netty simulate a first request closed */
+  String NETTY_SIMULATE_FIRST_REQUEST_CLOSED =
+      "giraph.nettySimulateFirstRequestClosed";
+  /** Default of not simulating failure for first request */
+  boolean NETTY_SIMULATE_FIRST_REQUEST_CLOSED_DEFAULT = false;
+
+  /** Netty simulate a first response failed */
+  String NETTY_SIMULATE_FIRST_RESPONSE_FAILED =
+      "giraph.nettySimulateFirstResponseFailed";
+  /** Default of not simulating failure for first reponse */
+  boolean NETTY_SIMULATE_FIRST_RESPONSE_FAILED_DEFAULT = false;
+
+  /** Max resolve address attempts */
+  String MAX_RESOLVE_ADDRESS_ATTEMPTS = "giraph.maxResolveAddressAttempts";
+  /** Default max resolve address attempts */
+  int MAX_RESOLVE_ADDRESS_ATTEMPTS_DEFAULT = 5;
+
+  /** Msecs to wait between waiting for all requests to finish */
+  String WAITING_REQUEST_MSECS = "giraph.waitingRequestMsecs";
+  /** Default msecs to wait between waiting for all requests to finish */
+  int WAITING_REQUEST_MSECS_DEFAULT = 15000;
+
+  /** Millseconds to wait for an event before continuing */
+  String EVENT_WAIT_MSECS = "giraph.eventWaitMsecs";
+  /**
+   * Default milliseconds to wait for an event before continuing (30 seconds)
+   */
+  int EVENT_WAIT_MSECS_DEFAULT = 30 * 1000;
+
+  /**
+   * Maximum milliseconds to wait before giving up trying to get the minimum
+   * number of workers before a superstep (int).
+   */
+  String MAX_MASTER_SUPERSTEP_WAIT_MSECS = "giraph.maxMasterSuperstepWaitMsecs";
+  /**
+   * Default maximum milliseconds to wait before giving up trying to get
+   * the minimum number of workers before a superstep (10 minutes).
+   */
+  int MAX_MASTER_SUPERSTEP_WAIT_MSECS_DEFAULT = 10 * 60 * 1000;
+
+  /** Milliseconds for a request to complete (or else resend) */
+  String MAX_REQUEST_MILLISECONDS = "giraph.maxRequestMilliseconds";
+  /** Maximum number of milliseconds for a request to complete (10 minutes) */
+  int MAX_REQUEST_MILLISECONDS_DEFAULT = 10 * 60 * 1000;
+
+  /** Netty max connection failures */
+  String NETTY_MAX_CONNECTION_FAILURES = "giraph.nettyMaxConnectionFailures";
+  /** Default Netty max connection failures */
+  int NETTY_MAX_CONNECTION_FAILURES_DEFAULT = 1000;
+
+  /** Initial port to start using for the IPC communication */
+  String IPC_INITIAL_PORT = "giraph.ipcInitialPort";
+  /** Default port to start using for the IPC communication */
+  int IPC_INITIAL_PORT_DEFAULT = 30000;
+
+  /** Maximum bind attempts for different IPC ports */
+  String MAX_IPC_PORT_BIND_ATTEMPTS = "giraph.maxIpcPortBindAttempts";
+  /** Default maximum bind attempts for different IPC ports */
+  int MAX_IPC_PORT_BIND_ATTEMPTS_DEFAULT = 20;
+  /**
+   * Fail first IPC port binding attempt, simulate binding failure
+   * on real grid testing
+   */
+  String FAIL_FIRST_IPC_PORT_BIND_ATTEMPT =
+      "giraph.failFirstIpcPortBindAttempt";
+  /** Default fail first IPC port binding attempt flag */
+  boolean FAIL_FIRST_IPC_PORT_BIND_ATTEMPT_DEFAULT = false;
+
+  /** Client send buffer size */
+  String CLIENT_SEND_BUFFER_SIZE = "giraph.clientSendBufferSize";
+  /** Default client send buffer size of 0.5 MB */
+  int DEFAULT_CLIENT_SEND_BUFFER_SIZE = 512 * 1024;
+
+  /** Client receive buffer size */
+  String CLIENT_RECEIVE_BUFFER_SIZE = "giraph.clientReceiveBufferSize";
+  /** Default client receive buffer size of 32 k */
+  int DEFAULT_CLIENT_RECEIVE_BUFFER_SIZE = 32 * 1024;
+
+  /** Server send buffer size */
+  String SERVER_SEND_BUFFER_SIZE = "giraph.serverSendBufferSize";
+  /** Default server send buffer size of 32 k */
+  int DEFAULT_SERVER_SEND_BUFFER_SIZE = 32 * 1024;
+
+  /** Server receive buffer size */
+  String SERVER_RECEIVE_BUFFER_SIZE = "giraph.serverReceiveBufferSize";
+  /** Default server receive buffer size of 0.5 MB */
+  int DEFAULT_SERVER_RECEIVE_BUFFER_SIZE = 512 * 1024;
+
+  /** Maximum size of messages (in bytes) per peer before flush */
+  String MAX_MSG_REQUEST_SIZE = "giraph.msgRequestSize";
+  /** Default maximum size of messages per peer before flush of 0.5MB */
+  int MAX_MSG_REQUEST_SIZE_DEFAULT = 512 * 1024;
+
+  /** Maximum number of messages per peer before flush */
+  String MSG_SIZE = "giraph.msgSize";
+  /** Default maximum number of messages per peer before flush */
+  int MSG_SIZE_DEFAULT = 2000;
+
+  /** Maximum number of mutations per partition before flush */
+  String MAX_MUTATIONS_PER_REQUEST = "giraph.maxMutationsPerRequest";
+  /** Default maximum number of mutations per partition before flush */
+  int MAX_MUTATIONS_PER_REQUEST_DEFAULT = 100;
+
+  /**
+   * Use message size encoding (typically better for complex objects,
+   * not meant for primitive wrapped messages)
+   */
+  String USE_MESSAGE_SIZE_ENCODING = "giraph.useMessageSizeEncoding";
+  /**
+   * By default, do not use message size encoding as it is experimental.
+   */
+  boolean USE_MESSAGE_SIZE_ENCODING_DEFAULT = false;
+
+  /** Number of channels used per server */
+  String CHANNELS_PER_SERVER = "giraph.channelsPerServer";
+  /** Default number of channels used per server of 1 */
+  int DEFAULT_CHANNELS_PER_SERVER = 1;
+
+  /** Number of flush threads per peer */
+  String MSG_NUM_FLUSH_THREADS = "giraph.msgNumFlushThreads";
+
+  /** Number of threads for vertex computation */
+  String NUM_COMPUTE_THREADS = "giraph.numComputeThreads";
+  /** Default number of threads for vertex computation */
+  int NUM_COMPUTE_THREADS_DEFAULT = 1;
+
+  /** Number of threads for input splits loading */
+  String NUM_INPUT_SPLITS_THREADS = "giraph.numInputSplitsThreads";
+  /** Default number of threads for input splits loading */
+  int NUM_INPUT_SPLITS_THREADS_DEFAULT = 1;
+
+  /** Minimum stragglers of the superstep before printing them out */
+  String PARTITION_LONG_TAIL_MIN_PRINT = "giraph.partitionLongTailMinPrint";
+  /** Only print stragglers with one as a default */
+  int PARTITION_LONG_TAIL_MIN_PRINT_DEFAULT = 1;
+
+  /** Use superstep counters? (boolean) */
+  String USE_SUPERSTEP_COUNTERS = "giraph.useSuperstepCounters";
+  /** Default is to use the superstep counters */
+  boolean USE_SUPERSTEP_COUNTERS_DEFAULT = true;
+
+  /**
+   * Set the multiplicative factor of how many partitions to create from
+   * a single InputSplit based on the number of total InputSplits.  For
+   * example, if there are 10 total InputSplits and this is set to 0.5, then
+   * you will get 0.5 * 10 = 5 partitions for every InputSplit (given that the
+   * minimum size is met).
+   */
+  String TOTAL_INPUT_SPLIT_MULTIPLIER = "giraph.totalInputSplitMultiplier";
+
+  /**
+   * Input split sample percent - Used only for sampling and testing, rather
+   * than an actual job.  The idea is that to test, you might only want a
+   * fraction of the actual input splits from your VertexInputFormat to
+   * load (values should be [0, 100]).
+   */
+  String INPUT_SPLIT_SAMPLE_PERCENT = "giraph.inputSplitSamplePercent";
+  /** Default is to use all the input splits */
+  float INPUT_SPLIT_SAMPLE_PERCENT_DEFAULT = 100f;
+
+  /**
+   * To limit outlier vertex input splits from producing too many vertices or
+   * to help with testing, the number of vertices loaded from an input split
+   * can be limited.  By default, everything is loaded.
+   */
+  String INPUT_SPLIT_MAX_VERTICES = "giraph.InputSplitMaxVertices";
+  /**
+   * Default is that all the vertices are to be loaded from the input split
+   */
+  long INPUT_SPLIT_MAX_VERTICES_DEFAULT = -1;
+
+  /**
+   * To limit outlier vertex input splits from producing too many vertices or
+   * to help with testing, the number of edges loaded from an input split
+   * can be limited.  By default, everything is loaded.
+   */
+  String INPUT_SPLIT_MAX_EDGES = "giraph.InputSplitMaxEdges";
+  /**
+   * Default is that all the edges are to be loaded from the input split
+   */
+  long INPUT_SPLIT_MAX_EDGES_DEFAULT = -1;
+
+  /** Java opts passed to ZooKeeper startup */
+  String ZOOKEEPER_JAVA_OPTS = "giraph.zkJavaOpts";
+  /** Default java opts passed to ZooKeeper startup */
+  String ZOOKEEPER_JAVA_OPTS_DEFAULT =
+      "-Xmx512m -XX:ParallelGCThreads=4 -XX:+UseConcMarkSweepGC " +
+          "-XX:CMSInitiatingOccupancyFraction=70 -XX:MaxGCPauseMillis=100";
+
+  /**
+   *  How often to checkpoint (i.e. 0, means no checkpoint,
+   *  1 means every superstep, 2 is every two supersteps, etc.).
+   */
+  String CHECKPOINT_FREQUENCY = "giraph.checkpointFrequency";
+
+  /** Default checkpointing frequency of none. */
+  int CHECKPOINT_FREQUENCY_DEFAULT = 0;
+
+  /**
+   * Delete checkpoints after a successful job run?
+   */
+  String CLEANUP_CHECKPOINTS_AFTER_SUCCESS =
+      "giraph.cleanupCheckpointsAfterSuccess";
+  /** Default is to clean up the checkponts after a successful job */
+  boolean CLEANUP_CHECKPOINTS_AFTER_SUCCESS_DEFAULT = true;
+
+  /**
+   * An application can be restarted manually by selecting a superstep.  The
+   * corresponding checkpoint must exist for this to work.  The user should
+   * set a long value.  Default is start from scratch.
+   */
+  String RESTART_SUPERSTEP = "giraph.restartSuperstep";
+
+  /**
+   * Base ZNode for Giraph's state in the ZooKeeper cluster.  Must be a root
+   * znode on the cluster beginning with "/"
+   */
+  String BASE_ZNODE_KEY = "giraph.zkBaseZNode";
+
+  /**
+   * If ZOOKEEPER_LIST is not set, then use this directory to manage
+   * ZooKeeper
+   */
+  String ZOOKEEPER_MANAGER_DIRECTORY = "giraph.zkManagerDirectory";
+  /**
+   * Default ZooKeeper manager directory (where determining the servers in
+   * HDFS files will go).  directory path will also have job number
+   * for uniqueness.
+   */
+  String ZOOKEEPER_MANAGER_DIR_DEFAULT = "_bsp/_defaultZkManagerDir";
+
+  /** Number of ZooKeeper client connection attempts before giving up. */
+  String ZOOKEEPER_CONNECTION_ATTEMPTS = "giraph.zkConnectionAttempts";
+  /** Default of 10 ZooKeeper client connection attempts before giving up. */
+  int ZOOKEEPER_CONNECTION_ATTEMPTS_DEFAULT = 10;
+
+  /** This directory has/stores the available checkpoint files in HDFS. */
+  String CHECKPOINT_DIRECTORY = "giraph.checkpointDirectory";
+  /**
+   * Default checkpoint directory (where checkpoing files go in HDFS).  Final
+   * directory path will also have the job number for uniqueness
+   */
+  String CHECKPOINT_DIRECTORY_DEFAULT = "_bsp/_checkpoints/";
+
+  /** Directory in the local file system for out-of-core messages. */
+  String MESSAGES_DIRECTORY = "giraph.messagesDirectory";
+  /**
+   * Default messages directory. directory path will also have the
+   * job number for uniqueness
+   */
+  String MESSAGES_DIRECTORY_DEFAULT = "_bsp/_messages/";
+
+  /** Whether or not to use out-of-core messages */
+  String USE_OUT_OF_CORE_MESSAGES = "giraph.useOutOfCoreMessages";
+  /** Default choice about using out-of-core messaging */
+  boolean USE_OUT_OF_CORE_MESSAGES_DEFAULT = false;
+  /**
+   * If using out-of-core messaging, it tells how much messages do we keep
+   * in memory.
+   */
+  String MAX_MESSAGES_IN_MEMORY = "giraph.maxMessagesInMemory";
+  /** Default maximum number of messages in memory. */
+  int MAX_MESSAGES_IN_MEMORY_DEFAULT = 1000000;
+  /** Size of buffer when reading and writing messages out-of-core. */
+  String MESSAGES_BUFFER_SIZE = "giraph.messagesBufferSize";
+  /** Default size of buffer when reading and writing messages out-of-core. */
+  int MESSAGES_BUFFER_SIZE_DEFAULT = 8192;
+
+  /** Directory in the local filesystem for out-of-core partitions. */
+  String PARTITIONS_DIRECTORY = "giraph.partitionsDirectory";
+  /** Default directory for out-of-core partitions. */
+  String PARTITIONS_DIRECTORY_DEFAULT = "_bsp/_partitions";
+
+  /** Enable out-of-core graph. */
+  String USE_OUT_OF_CORE_GRAPH = "giraph.useOutOfCoreGraph";
+  /** Default is not to use out-of-core graph. */
+  boolean USE_OUT_OF_CORE_GRAPH_DEFAULT = false;
+
+  /** Maximum number of partitions to hold in memory for each worker. */
+  String MAX_PARTITIONS_IN_MEMORY = "giraph.maxPartitionsInMemory";
+  /** Default maximum number of in-memory partitions. */
+  int MAX_PARTITIONS_IN_MEMORY_DEFAULT = 10;
+
+  /** Keep the zookeeper output for debugging? Default is to remove it. */
+  String KEEP_ZOOKEEPER_DATA = "giraph.keepZooKeeperData";
+  /** Default is to remove ZooKeeper data. */
+  Boolean KEEP_ZOOKEEPER_DATA_DEFAULT = false;
+
+  /** Default ZooKeeper tick time. */
+  int DEFAULT_ZOOKEEPER_TICK_TIME = 6000;
+  /** Default ZooKeeper init limit (in ticks). */
+  int DEFAULT_ZOOKEEPER_INIT_LIMIT = 10;
+  /** Default ZooKeeper sync limit (in ticks). */
+  int DEFAULT_ZOOKEEPER_SYNC_LIMIT = 5;
+  /** Default ZooKeeper snap count. */
+  int DEFAULT_ZOOKEEPER_SNAP_COUNT = 50000;
+  /** Default ZooKeeper maximum client connections. */
+  int DEFAULT_ZOOKEEPER_MAX_CLIENT_CNXNS = 10000;
+  /** ZooKeeper minimum session timeout */
+  String ZOOKEEPER_MIN_SESSION_TIMEOUT = "giraph.zKMinSessionTimeout";
+  /** Default ZooKeeper minimum session timeout of 10 minutes (in msecs). */
+  int DEFAULT_ZOOKEEPER_MIN_SESSION_TIMEOUT = 600 * 1000;
+  /** ZooKeeper maximum session timeout */
+  String ZOOKEEPER_MAX_SESSION_TIMEOUT = "giraph.zkMaxSessionTimeout";
+  /** Default ZooKeeper maximum session timeout of 15 minutes (in msecs). */
+  int DEFAULT_ZOOKEEPER_MAX_SESSION_TIMEOUT = 900 * 1000;
+  /** ZooKeeper force sync */
+  String ZOOKEEPER_FORCE_SYNC = "giraph.zKForceSync";
+  /** Default ZooKeeper force sync is off (for performance) */
+  String DEFAULT_ZOOKEEPER_FORCE_SYNC = "no";
+  /** ZooKeeper skip ACLs */
+  String ZOOKEEPER_SKIP_ACL = "giraph.ZkSkipAcl";
+  /** Default ZooKeeper skip ACLs true (for performance) */
+  String DEFAULT_ZOOKEEPER_SKIP_ACL = "yes";
+
+  /**
+   * Whether to use SASL with DIGEST and Hadoop Job Tokens to authenticate
+   * and authorize Netty BSP Clients to Servers.
+   */
+  String AUTHENTICATE = "giraph.authenticate";
+  /** Default is not to do authenticate and authorization with Netty. */
+  boolean DEFAULT_AUTHENTICATE = false;
+
+  /** Use unsafe serialization? */
+  String USE_UNSAFE_SERIALIZATION = "giraph.useUnsafeSerialization";
+  /**
+   * Use unsafe serialization default is true (use it if you can,
+   * its much faster)!
+   */
+  boolean USE_UNSAFE_SERIALIZATION_DEFAULT = true;
+
+  /**
+   * Maximum number of attempts a master/worker will retry before killing
+   * the job.  This directly maps to the number of map task attempts in
+   * Hadoop.
+   */
+  String MAX_TASK_ATTEMPTS = "mapred.map.max.attempts";
+}
+// CHECKSTYLE: resume InterfaceIsTypeCheck

http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfigurable.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfigurable.java b/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfigurable.java
new file mode 100644
index 0000000..e4351a2
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfigurable.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.conf;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+/**
+ * Can be instantiated with ImmutableClassesGiraphConfiguration
+ *
+ * @param <I> Vertex id
+ * @param <V> Vertex data
+ * @param <E> Edge data
+ * @param <M> Message data
+ */
+public interface ImmutableClassesGiraphConfigurable<
+    I extends WritableComparable, V extends Writable, E extends Writable,
+    M extends Writable> {
+  /**
+   * Set the configuration to be used by this object.
+   *
+   * @param configuration Set configuration
+   */
+  void setConf(ImmutableClassesGiraphConfiguration<I, V, E, M>
+                   configuration);
+
+  /**
+   * Return the configuration used by this object.
+   *
+   * @return Set configuration
+   */
+  ImmutableClassesGiraphConfiguration<I, V, E, M> getConf();
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java b/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java
new file mode 100644
index 0000000..00e4135
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java
@@ -0,0 +1,554 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.conf;
+
+import org.apache.giraph.graph.AggregatorWriter;
+import org.apache.giraph.graph.Combiner;
+import org.apache.giraph.graph.EdgeInputFormat;
+import org.apache.giraph.graph.GraphState;
+import org.apache.giraph.graph.MasterCompute;
+import org.apache.giraph.graph.Vertex;
+import org.apache.giraph.graph.VertexInputFormat;
+import org.apache.giraph.graph.VertexOutputFormat;
+import org.apache.giraph.graph.VertexResolver;
+import org.apache.giraph.graph.WorkerContext;
+import org.apache.giraph.graph.partition.GraphPartitionerFactory;
+import org.apache.giraph.graph.partition.MasterGraphPartitioner;
+import org.apache.giraph.graph.partition.Partition;
+import org.apache.giraph.graph.partition.PartitionStats;
+import org.apache.giraph.master.MasterObserver;
+import org.apache.giraph.utils.ExtendedByteArrayDataInput;
+import org.apache.giraph.utils.ExtendedByteArrayDataOutput;
+import org.apache.giraph.utils.ExtendedDataInput;
+import org.apache.giraph.utils.ExtendedDataOutput;
+import org.apache.giraph.utils.ReflectionUtils;
+import org.apache.giraph.utils.UnsafeByteArrayInputStream;
+import org.apache.giraph.utils.UnsafeByteArrayOutputStream;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.util.Progressable;
+
+/**
+ * The classes set here are immutable, the remaining configuration is mutable.
+ * Classes are immutable and final to provide the best performance for
+ * instantiation.  Everything is thread-safe.
+ *
+ * @param <I> Vertex id
+ * @param <V> Vertex data
+ * @param <E> Edge data
+ * @param <M> Message data
+ */
+public class ImmutableClassesGiraphConfiguration<I extends WritableComparable,
+    V extends Writable, E extends Writable, M extends Writable> extends
+    GiraphConfiguration {
+  /** Master graph partitioner - cached for fast access */
+  protected final MasterGraphPartitioner<I, V, E, M> masterGraphPartitioner;
+
+  /** Holder for all the classes */
+  private final GiraphClasses classes;
+
+  /**
+   * Use unsafe serialization? Cached for fast access to instantiate the
+   * extended data input/output classes
+   */
+  private final boolean useUnsafeSerialization;
+
+  /**
+   * Constructor.  Takes the configuration and then gets the classes out of
+   * them for Giraph
+   *
+   * @param conf Configuration
+   */
+  public ImmutableClassesGiraphConfiguration(Configuration conf) {
+    super(conf);
+
+    classes = new GiraphClasses(conf);
+    masterGraphPartitioner = (MasterGraphPartitioner<I, V, E, M>)
+        createGraphPartitioner().createMasterGraphPartitioner();
+
+    useUnsafeSerialization = getBoolean(USE_UNSAFE_SERIALIZATION,
+        USE_UNSAFE_SERIALIZATION_DEFAULT);
+  }
+
+  /**
+   * Get the user's subclassed
+   * {@link org.apache.giraph.graph.partition.GraphPartitionerFactory}.
+   *
+   * @return User's graph partitioner
+   */
+  public Class<? extends GraphPartitionerFactory<I, V, E, M>>
+  getGraphPartitionerClass() {
+    return classes.getGraphPartitionerFactoryClass();
+  }
+
+  /**
+   * Create a user graph partitioner class
+   *
+   * @return Instantiated user graph partitioner class
+   */
+  public GraphPartitionerFactory<I, V, E, M> createGraphPartitioner() {
+    Class<? extends GraphPartitionerFactory<I, V, E, M>> klass =
+        classes.getGraphPartitionerFactoryClass();
+    return ReflectionUtils.newInstance(klass, this);
+  }
+
+  /**
+   * Create a user graph partitioner partition stats class
+   *
+   * @return Instantiated user graph partition stats class
+   */
+  public PartitionStats createGraphPartitionStats() {
+    return getMasterGraphPartitioner().createPartitionStats();
+  }
+
+  /**
+   * Get the cached MasterGraphPartitioner.
+   *
+   * @return MasterGraphPartitioner cached in this class.
+   */
+  public MasterGraphPartitioner<I, V, E, M> getMasterGraphPartitioner() {
+    return masterGraphPartitioner;
+  }
+
+  /**
+   * Does the job have a {@link VertexInputFormat}?
+   *
+   * @return True iff a {@link VertexInputFormat} has been specified.
+   */
+  public boolean hasVertexInputFormat() {
+    return classes.hasVertexInputFormat();
+  }
+
+  /**
+   * Get the user's subclassed
+   * {@link org.apache.giraph.graph.VertexInputFormat}.
+   *
+   * @return User's vertex input format class
+   */
+  public Class<? extends VertexInputFormat<I, V, E, M>>
+  getVertexInputFormatClass() {
+    return classes.getVertexInputFormatClass();
+  }
+
+  /**
+   * Create a user vertex input format class
+   *
+   * @return Instantiated user vertex input format class
+   */
+  public VertexInputFormat<I, V, E, M>
+  createVertexInputFormat() {
+    Class<? extends VertexInputFormat<I, V, E, M>> klass =
+        classes.getVertexInputFormatClass();
+    return ReflectionUtils.newInstance(klass, this);
+  }
+
+  /**
+   * Get the user's subclassed
+   * {@link org.apache.giraph.graph.VertexOutputFormat}.
+   *
+   * @return User's vertex output format class
+   */
+  public Class<? extends VertexOutputFormat<I, V, E>>
+  getVertexOutputFormatClass() {
+    return classes.getVertexOutputFormatClass();
+  }
+
+  /**
+   * Create a user vertex output format class
+   *
+   * @return Instantiated user vertex output format class
+   */
+  @SuppressWarnings("rawtypes")
+  public VertexOutputFormat<I, V, E> createVertexOutputFormat() {
+    Class<? extends VertexOutputFormat<I, V, E>> klass =
+        classes.getVertexOutputFormatClass();
+    return ReflectionUtils.newInstance(klass, this);
+  }
+
+  /**
+   * Does the job have an {@link EdgeInputFormat}?
+   *
+   * @return True iff an {@link EdgeInputFormat} has been specified.
+   */
+  public boolean hasEdgeInputFormat() {
+    return classes.hasEdgeInputFormat();
+  }
+
+  /**
+   * Get the user's subclassed
+   * {@link org.apache.giraph.graph.EdgeInputFormat}.
+   *
+   * @return User's edge input format class
+   */
+  public Class<? extends EdgeInputFormat<I, E>> getEdgeInputFormatClass() {
+    return classes.getEdgeInputFormatClass();
+  }
+
+  /**
+   * Create a user edge input format class
+   *
+   * @return Instantiated user edge input format class
+   */
+  public EdgeInputFormat<I, E> createEdgeInputFormat() {
+    Class<? extends EdgeInputFormat<I, E>> klass = getEdgeInputFormatClass();
+    return ReflectionUtils.newInstance(klass, this);
+  }
+
+  /**
+   * Get the user's subclassed {@link org.apache.giraph.graph.AggregatorWriter}.
+   *
+   * @return User's aggregator writer class
+   */
+  public Class<? extends AggregatorWriter> getAggregatorWriterClass() {
+    return classes.getAggregatorWriterClass();
+  }
+
+  /**
+   * Create a user aggregator output format class
+   *
+   * @return Instantiated user aggregator writer class
+   */
+  public AggregatorWriter createAggregatorWriter() {
+    return ReflectionUtils.newInstance(getAggregatorWriterClass(), this);
+  }
+
+  /**
+   * Create a user combiner class
+   *
+   * @return Instantiated user combiner class
+   */
+  @SuppressWarnings("rawtypes")
+  public Combiner<I, M> createCombiner() {
+    Class<? extends Combiner<I, M>> klass = classes.getCombinerClass();
+    return ReflectionUtils.newInstance(klass, this);
+  }
+
+  /**
+   * Check if user set a combiner
+   *
+   * @return True iff user set a combiner class
+   */
+  public boolean useCombiner() {
+    return classes.hasCombinerClass();
+  }
+
+  /**
+   * Get the user's subclassed VertexResolver.
+   *
+   * @return User's vertex resolver class
+   */
+  public Class<? extends VertexResolver<I, V, E, M>> getVertexResolverClass() {
+    return classes.getVertexResolverClass();
+  }
+
+  /**
+   * Create a user vertex revolver
+   *
+   * @param graphState State of the graph from the worker
+   * @return Instantiated user vertex resolver
+   */
+  @SuppressWarnings("rawtypes")
+  public VertexResolver<I, V, E, M> createVertexResolver(
+                       GraphState<I, V, E, M> graphState) {
+    VertexResolver<I, V, E, M> resolver =
+        ReflectionUtils.newInstance(getVertexResolverClass(), this);
+    resolver.setGraphState(graphState);
+    return resolver;
+  }
+
+  /**
+   * Get the user's subclassed WorkerContext.
+   *
+   * @return User's worker context class
+   */
+  public Class<? extends WorkerContext> getWorkerContextClass() {
+    return classes.getWorkerContextClass();
+  }
+
+  /**
+   * Create a user worker context
+   *
+   * @param graphState State of the graph from the worker
+   * @return Instantiated user worker context
+   */
+  @SuppressWarnings("rawtypes")
+  public WorkerContext createWorkerContext(GraphState<I, V, E, M> graphState) {
+    WorkerContext workerContext =
+        ReflectionUtils.newInstance(getWorkerContextClass(), this);
+    workerContext.setGraphState(graphState);
+    return workerContext;
+  }
+
+  /**
+   * Get the user's subclassed {@link org.apache.giraph.graph.MasterCompute}
+   *
+   * @return User's master class
+   */
+  public Class<? extends MasterCompute> getMasterComputeClass() {
+    return classes.getMasterComputeClass();
+  }
+
+  /**
+   * Create a user master
+   *
+   * @return Instantiated user master
+   */
+  public MasterCompute createMasterCompute() {
+    return ReflectionUtils.newInstance(getMasterComputeClass(), this);
+  }
+
+  /**
+   * Get the user's subclassed {@link org.apache.giraph.graph.Vertex}
+   *
+   * @return User's vertex class
+   */
+  public Class<? extends Vertex<I, V, E, M>> getVertexClass() {
+    return classes.getVertexClass();
+  }
+
+  /**
+   * Create a user vertex
+   *
+   * @return Instantiated user vertex
+   */
+  public Vertex<I, V, E, M> createVertex() {
+    return ReflectionUtils.newInstance(getVertexClass(), this);
+  }
+
+  /**
+   * Get the user's subclassed vertex index class.
+   *
+   * @return User's vertex index class
+   */
+  public Class<I> getVertexIdClass() {
+    return classes.getVertexIdClass();
+  }
+
+  /**
+   * Create a user vertex index
+   *
+   * @return Instantiated user vertex index
+   */
+  public I createVertexId() {
+    try {
+      return getVertexIdClass().newInstance();
+    } catch (InstantiationException e) {
+      throw new IllegalArgumentException(
+          "createVertexId: Failed to instantiate", e);
+    } catch (IllegalAccessException e) {
+      throw new IllegalArgumentException(
+          "createVertexId: Illegally accessed", e);
+    }
+  }
+
+  /**
+   * Get the user's subclassed vertex value class.
+   *
+   * @return User's vertex value class
+   */
+  public Class<V> getVertexValueClass() {
+    return classes.getVertexValueClass();
+  }
+
+  /**
+   * Create a user vertex value
+   *
+   * @return Instantiated user vertex value
+   */
+  @SuppressWarnings("unchecked")
+  public V createVertexValue() {
+    Class<V> klass = getVertexValueClass();
+    if (klass == NullWritable.class) {
+      return (V) NullWritable.get();
+    } else {
+      try {
+        return klass.newInstance();
+      } catch (InstantiationException e) {
+        throw new IllegalArgumentException(
+            "createVertexValue: Failed to instantiate", e);
+      } catch (IllegalAccessException e) {
+        throw new IllegalArgumentException(
+            "createVertexValue: Illegally accessed", e);
+      }
+    }
+  }
+
+  /**
+   * Create array of MasterObservers.
+   *
+   * @return Instantiated array of MasterObservers.
+   */
+  public MasterObserver[] createMasterObservers() {
+    Class<? extends MasterObserver>[] klasses = getMasterObserverClasses();
+    MasterObserver[] objects = new MasterObserver[klasses.length];
+    for (int i = 0; i < klasses.length; ++i) {
+      objects[i] = ReflectionUtils.newInstance(klasses[i], this);
+    }
+    return objects;
+  }
+
+  /**
+   * Get the user's subclassed edge value class.
+   *
+   * @return User's vertex edge value class
+   */
+  public Class<E> getEdgeValueClass() {
+    return classes.getEdgeValueClass();
+  }
+
+  /**
+   * Create a user edge value
+   *
+   * @return Instantiated user edge value
+   */
+  public E createEdgeValue() {
+    Class<E> klass = getEdgeValueClass();
+    if (klass == NullWritable.class) {
+      return (E) NullWritable.get();
+    } else {
+      try {
+        return klass.newInstance();
+      } catch (InstantiationException e) {
+        throw new IllegalArgumentException(
+            "createEdgeValue: Failed to instantiate", e);
+      } catch (IllegalAccessException e) {
+        throw new IllegalArgumentException(
+            "createEdgeValue: Illegally accessed", e);
+      }
+    }
+  }
+
+  /**
+   * Get the user's subclassed vertex message value class.
+   *
+   * @return User's vertex message value class
+   */
+  @SuppressWarnings("unchecked")
+  public Class<M> getMessageValueClass() {
+    return classes.getMessageValueClass();
+  }
+
+  /**
+   * Create a user vertex message value
+   *
+   * @return Instantiated user vertex message value
+   */
+  public M createMessageValue() {
+    Class<M> klass = getMessageValueClass();
+    if (klass == NullWritable.class) {
+      return (M) NullWritable.get();
+    } else {
+      try {
+        return klass.newInstance();
+      } catch (InstantiationException e) {
+        throw new IllegalArgumentException(
+            "createMessageValue: Failed to instantiate", e);
+      } catch (IllegalAccessException e) {
+        throw new IllegalArgumentException(
+            "createMessageValue: Illegally accessed", e);
+      }
+    }
+  }
+
+  /**
+   * Create a partition
+   *
+   * @param id Partition id
+   * @param progressable Progressable for reporting progress
+   * @return Instantiated partition
+   */
+  public Partition<I, V, E, M> createPartition(
+      int id, Progressable progressable) {
+    Class<? extends Partition<I, V, E, M>> klass = classes.getPartitionClass();
+    Partition<I, V, E, M> partition = ReflectionUtils.newInstance(klass, this);
+    partition.initialize(id, progressable);
+    return partition;
+  }
+
+  /**
+   * Use unsafe serialization?
+   *
+   * @return True if using unsafe serialization, false otherwise.
+   */
+  public boolean useUnsafeSerialization() {
+    return useUnsafeSerialization;
+  }
+
+  /**
+   * Create an extended data output (can be subclassed)
+   *
+   * @return ExtendedDataOutput object
+   */
+  public ExtendedDataOutput createExtendedDataOutput() {
+    if (useUnsafeSerialization) {
+      return new UnsafeByteArrayOutputStream();
+    } else {
+      return new ExtendedByteArrayDataOutput();
+    }
+  }
+
+  /**
+   * Create an extended data output (can be subclassed)
+   *
+   * @param expectedSize Expected size
+   * @return ExtendedDataOutput object
+   */
+  public ExtendedDataOutput createExtendedDataOutput(int expectedSize) {
+    if (useUnsafeSerialization) {
+      return new UnsafeByteArrayOutputStream(expectedSize);
+    } else {
+      return new ExtendedByteArrayDataOutput(expectedSize);
+    }
+  }
+
+  /**
+   * Create an extended data output (can be subclassed)
+   *
+   * @param buf Buffer to use for the output (reuse perhaps)
+   * @param pos How much of the buffer is already used
+   * @return ExtendedDataOutput object
+   */
+  public ExtendedDataOutput createExtendedDataOutput(byte[] buf,
+                                                     int pos) {
+    if (useUnsafeSerialization) {
+      return new UnsafeByteArrayOutputStream(buf, pos);
+    } else {
+      return new ExtendedByteArrayDataOutput(buf, pos);
+    }
+  }
+
+  /**
+   * Create an extended data input (can be subclassed)
+   *
+   * @param buf Buffer to use for the input
+   * @param off Where to start reading in the buffer
+   * @param length Maximum length of the buffer
+   * @return ExtendedDataInput object
+   */
+  public ExtendedDataInput createExtendedDataInput(
+      byte[] buf, int off, int length) {
+    if (useUnsafeSerialization) {
+      return new UnsafeByteArrayInputStream(buf, off, length);
+    } else {
+      return new ExtendedByteArrayDataInput(buf, off, length);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph-core/src/main/java/org/apache/giraph/conf/package-info.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/package-info.java b/giraph-core/src/main/java/org/apache/giraph/conf/package-info.java
new file mode 100644
index 0000000..3b6b4b1
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/package-info.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Package of Giraph configuration related things.
+ */
+package org.apache.giraph.conf;

http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph-core/src/main/java/org/apache/giraph/counters/GiraphHadoopCounter.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/counters/GiraphHadoopCounter.java b/giraph-core/src/main/java/org/apache/giraph/counters/GiraphHadoopCounter.java
new file mode 100644
index 0000000..948c9ab
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/counters/GiraphHadoopCounter.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.counters;
+
+import com.google.common.base.Objects;
+import org.apache.hadoop.mapreduce.Counter;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+/**
+ * Wrapper around Hadoop Counter to make it easier to use.
+ */
+public class GiraphHadoopCounter {
+  /** Hadoop Counter we're wrapping. */
+  private Counter counter;
+
+  /**
+   * Create wrapping a Hadoop Counter.
+   *
+   * @param counter Hadoop Counter to wrap.
+   */
+  public GiraphHadoopCounter(Counter counter) {
+    this.counter = counter;
+  }
+
+  /**
+   * Get underlying Hadoop Counter we're wrapping.
+   *
+   * @return Hadoop Counter being wrapped.
+   */
+  public Counter getHadoopCounter() {
+    return counter;
+  }
+
+  @Override
+  public int hashCode() {
+    return counter.hashCode();
+  }
+
+  @Override
+  public boolean equals(Object genericRight) {
+    if (genericRight == null) {
+      return false;
+    }
+    if (getClass() != genericRight.getClass()) {
+      return false;
+    }
+    GiraphHadoopCounter right = (GiraphHadoopCounter) genericRight;
+    return Objects.equal(counter, right.counter);
+  }
+
+  /**
+   * Set counter to value. Should be greater than current value.
+   *
+   * @param value long value to set to.
+   */
+  public void setValue(long value) {
+    increment(value - getValue());
+  }
+
+  /**
+   * Increment counter value by 1.
+   */
+  public void increment() {
+    increment(1);
+  }
+
+  /**
+   * Increment counter value.
+   *
+   * @param incr amount to increment by.
+   */
+  public void increment(long incr) {
+    counter.increment(incr);
+  }
+
+  /**
+   * Get counter value
+   *
+   * @return long value of counter
+   */
+  public long getValue() {
+    return counter.getValue();
+  }
+
+  /**
+   * Get counter display name.
+   *
+   * @return String Hadoop counter display name.
+   */
+  public String getDisplayName() {
+    return counter.getDisplayName();
+  }
+
+  /**
+   * Get counter name.
+   *
+   * @return String Hadoop counter name.
+   */
+  public String getName() {
+    return counter.getName();
+  }
+
+  /**
+   * Write to Hadoop output.
+   *
+   * @param out DataOutput to write to.
+   * @throws IOException if something goes wrong.
+   */
+  public void write(DataOutput out) throws IOException {
+    counter.write(out);
+  }
+
+  /**
+   * Read from Hadoop input.
+   *
+   * @param in DataInput to read from.
+   * @throws IOException if something goes wrong reading.
+   */
+  public void readFields(DataInput in) throws IOException {
+    counter.readFields(in);
+  }
+}


Mime
View raw message