giraph-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From pava...@apache.org
Subject [3/3] git commit: updated refs/heads/trunk to 4a133f5
Date Sun, 08 Jun 2014 18:28:51 GMT
GIRAPH-908: support for partitioned input in giraph (pavanka)


Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/4a133f57
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/4a133f57
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/4a133f57

Branch: refs/heads/trunk
Commit: 4a133f5766c09362917e0416af503c0a00b24e87
Parents: 535a333
Author: Pavan Kumar <pavanka@fb.com>
Authored: Sun Jun 8 10:36:03 2014 -0700
Committer: Pavan Kumar <pavanka@fb.com>
Committed: Sun Jun 8 10:36:03 2014 -0700

----------------------------------------------------------------------
 CHANGELOG                                       |   2 +
 .../java/org/apache/giraph/bsp/BspService.java  | 208 +++++++++++++-----
 .../giraph/bsp/CentralizedServiceMaster.java    |  10 +
 .../org/apache/giraph/conf/GiraphClasses.java   |  24 ++-
 .../org/apache/giraph/conf/GiraphConstants.java |  32 +++
 .../ImmutableClassesGiraphConfiguration.java    | 165 +++++++++++++++
 .../apache/giraph/io/MappingInputFormat.java    |  64 ++++++
 .../org/apache/giraph/io/MappingReader.java     | 124 +++++++++++
 .../io/internal/WrappedMappingInputFormat.java  |  99 +++++++++
 .../io/internal/WrappedMappingReader.java       | 105 ++++++++++
 .../io/iterables/MappingReaderWrapper.java      |  95 +++++++++
 .../giraph/mapping/AbstractLongByteOps.java     |  60 ++++++
 .../mapping/DefaultEmbeddedLongByteOps.java     |  73 +++++++
 .../giraph/mapping/DefaultLongByteOps.java      |  57 +++++
 .../giraph/mapping/LongByteMappingStore.java    | 143 +++++++++++++
 .../org/apache/giraph/mapping/MappingEntry.java |  62 ++++++
 .../org/apache/giraph/mapping/MappingStore.java |  70 +++++++
 .../apache/giraph/mapping/MappingStoreOps.java  |  72 +++++++
 .../org/apache/giraph/mapping/package-info.java |  23 ++
 .../translate/LongByteTranslateEdge.java        | 123 +++++++++++
 .../giraph/mapping/translate/TranslateEdge.java |  57 +++++
 .../giraph/mapping/translate/package-info.java  |  22 ++
 .../apache/giraph/master/BspServiceMaster.java  |  22 +-
 .../org/apache/giraph/master/MasterThread.java  |   5 +-
 .../partition/GraphPartitionerFactory.java      |  10 +-
 .../partition/HashPartitionerFactory.java       |  24 +--
 .../partition/HashRangePartitionerFactory.java  |  24 +--
 .../LongMappingStorePartitionerFactory.java     |  61 ++++++
 .../SimpleIntRangePartitionerFactory.java       |  18 +-
 .../SimpleLongRangePartitionerFactory.java      |  18 +-
 .../partition/SimplePartitionerFactory.java     |  37 ++--
 .../partition/SimpleWorkerPartitioner.java      |  34 ++-
 .../apache/giraph/worker/BspServiceWorker.java  | 138 +++++++++++-
 .../giraph/worker/EdgeInputSplitsCallable.java  |  23 ++
 .../giraph/worker/FullInputSplitCallable.java   | 210 +++++++++++++++++++
 .../org/apache/giraph/worker/LocalData.java     |  93 ++++++++
 .../worker/MappingInputSplitsCallable.java      | 109 ++++++++++
 .../MappingInputSplitsCallableFactory.java      |  96 +++++++++
 .../worker/VertexInputSplitsCallable.java       |  59 ++++++
 .../SimpleRangePartitionFactoryTest.java        |   2 +
 .../apache/giraph/hive/HiveGiraphRunner.java    |  73 +++++++
 .../giraph/hive/common/GiraphHiveConstants.java |   4 +
 .../apache/giraph/hive/common/HiveUtils.java    |  28 +++
 .../input/mapping/AbstractHiveToMapping.java    |  39 ++++
 .../input/mapping/HiveMappingInputFormat.java   | 116 ++++++++++
 .../hive/input/mapping/HiveMappingReader.java   | 100 +++++++++
 .../hive/input/mapping/HiveToMapping.java       |  44 ++++
 .../hive/input/mapping/SimpleHiveToMapping.java | 105 ++++++++++
 .../mapping/examples/LongByteHiveToMapping.java |  56 +++++
 .../examples/LongInt2ByteHiveToMapping.java     |  81 +++++++
 .../input/mapping/examples/package-info.java    |  22 ++
 .../giraph/hive/input/mapping/package-info.java |  22 ++
 52 files changed, 3239 insertions(+), 124 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/giraph/blob/4a133f57/CHANGELOG
----------------------------------------------------------------------
diff --git a/CHANGELOG b/CHANGELOG
index a0e94c1..37b94e2 100644
--- a/CHANGELOG
+++ b/CHANGELOG
@@ -1,6 +1,8 @@
 Giraph Change Log
 
 Release 1.1.0 - unreleased
+  GIRAPH-908: support for partitioned input in giraph (pavanka)
+
   GIRAPH-907: refactor giraph code to support multiple implementations of vertexId data (pavanka)  
 
   GIRAPH-899: Remove hcatalog from hadoop_facebook profile (pavanka)

http://git-wip-us.apache.org/repos/asf/giraph/blob/4a133f57/giraph-core/src/main/java/org/apache/giraph/bsp/BspService.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/bsp/BspService.java b/giraph-core/src/main/java/org/apache/giraph/bsp/BspService.java
index ec0ddbb..2e35373 100644
--- a/giraph-core/src/main/java/org/apache/giraph/bsp/BspService.java
+++ b/giraph-core/src/main/java/org/apache/giraph/bsp/BspService.java
@@ -76,6 +76,25 @@ public abstract class BspService<I extends WritableComparable,
   public static final String BASE_DIR = "/_hadoopBsp";
   /** Master job state znode above base dir */
   public static final String MASTER_JOB_STATE_NODE = "/_masterJobState";
+
+  /** Mapping input split directory about base dir */
+  public static final String MAPPING_INPUT_SPLIT_DIR = "/_mappingInputSplitDir";
+  /** Mapping input split done directory about base dir */
+  public static final String MAPPING_INPUT_SPLIT_DONE_DIR =
+      "/_mappingInputSplitDoneDir";
+  /** Denotes a reserved mapping input split */
+  public static final String MAPPING_INPUT_SPLIT_RESERVED_NODE =
+      "/_mappingInputSplitReserved";
+  /** Denotes a finished mapping input split */
+  public static final String MAPPING_INPUT_SPLIT_FINISHED_NODE =
+      "/_mappingInputSplitFinished";
+  /** Denotes that all the mapping input splits are are ready for consumption */
+  public static final String MAPPING_INPUT_SPLITS_ALL_READY_NODE =
+      "/_mappingInputSplitsAllReady";
+  /** Denotes that all the mapping input splits are done. */
+  public static final String MAPPING_INPUT_SPLITS_ALL_DONE_NODE =
+      "/_mappingInputSplitsAllDone";
+
   /** Vertex input split directory about base dir */
   public static final String VERTEX_INPUT_SPLIT_DIR = "/_vertexInputSplitDir";
   /** Vertex input split done directory about base dir */
@@ -93,6 +112,7 @@ public abstract class BspService<I extends WritableComparable,
   /** Denotes that all the vertex input splits are done. */
   public static final String VERTEX_INPUT_SPLITS_ALL_DONE_NODE =
       "/_vertexInputSplitsAllDone";
+
   /** Edge input split directory about base dir */
   public static final String EDGE_INPUT_SPLIT_DIR = "/_edgeInputSplitDir";
   /** Edge input split done directory about base dir */
@@ -188,10 +208,14 @@ public abstract class BspService<I extends WritableComparable,
   protected final String basePath;
   /** Path to the job state determined by the master (informative only) */
   protected final String masterJobStatePath;
+  /** ZooKeeper paths for mapping input splits. */
+  protected final InputSplitPaths mappingInputSplitsPaths;
   /** ZooKeeper paths for vertex input splits. */
   protected final InputSplitPaths vertexInputSplitsPaths;
   /** ZooKeeper paths for edge input splits. */
   protected final InputSplitPaths edgeInputSplitsPaths;
+  /** Mapping input splits events */
+  protected final InputSplitEvents mappingInputSplitsEvents;
   /** Vertex input split events. */
   protected final InputSplitEvents vertexInputSplitsEvents;
   /** Edge input split events. */
@@ -263,6 +287,7 @@ public abstract class BspService<I extends WritableComparable,
   public BspService(
       Mapper<?, ?, ?, ?>.Context context,
       GraphTaskManager<I, V, E> graphTaskManager) {
+    this.mappingInputSplitsEvents = new InputSplitEvents(context);
     this.vertexInputSplitsEvents = new InputSplitEvents(context);
     this.edgeInputSplitsEvents = new InputSplitEvents(context);
     this.connectedEvent = new PredicateLock(context);
@@ -313,6 +338,10 @@ public abstract class BspService<I extends WritableComparable,
     getContext().getCounter(GiraphConstants.ZOOKEEPER_BASE_PATH_COUNTER_GROUP,
         basePath);
     masterJobStatePath = basePath + MASTER_JOB_STATE_NODE;
+    mappingInputSplitsPaths = new InputSplitPaths(basePath,
+        MAPPING_INPUT_SPLIT_DIR, MAPPING_INPUT_SPLIT_DONE_DIR,
+        MAPPING_INPUT_SPLITS_ALL_READY_NODE,
+        MAPPING_INPUT_SPLITS_ALL_DONE_NODE);
     vertexInputSplitsPaths = new InputSplitPaths(basePath,
         VERTEX_INPUT_SPLIT_DIR, VERTEX_INPUT_SPLIT_DONE_DIR,
         VERTEX_INPUT_SPLITS_ALL_READY_NODE, VERTEX_INPUT_SPLITS_ALL_DONE_NODE);
@@ -676,8 +705,6 @@ public abstract class BspService<I extends WritableComparable,
    * watches to see if the master commanded job state changes.
    *
    * @return Last job state or null if none
-   * @throws InterruptedException
-   * @throws KeeperException
    */
   public final JSONObject getJobState() {
     try {
@@ -784,8 +811,6 @@ public abstract class BspService<I extends WritableComparable,
    * Get the latest superstep and cache it.
    *
    * @return the latest superstep
-   * @throws InterruptedException
-   * @throws KeeperException
    */
   public final long getSuperstep() {
     if (cachedSuperstep != UNSET_SUPERSTEP) {
@@ -959,7 +984,125 @@ public abstract class BspService<I extends WritableComparable,
       }
       workerHealthRegistrationChanged.signal();
       eventProcessed = true;
+    } else if (processMappingEvent(event) || processVertexEvent(event) ||
+        processEdgeEvent(event)) {
+      return;
+    } else if (event.getPath().contains(ADDRESSES_AND_PARTITIONS_DIR) &&
+        event.getType() == EventType.NodeCreated) {
+      if (LOG.isInfoEnabled()) {
+        LOG.info("process: partitionAssignmentsReadyChanged " +
+            "(partitions are assigned)");
+      }
+      addressesAndPartitionsReadyChanged.signal();
+      eventProcessed = true;
+    } else if (event.getPath().contains(SUPERSTEP_FINISHED_NODE) &&
+        event.getType() == EventType.NodeCreated) {
+      if (LOG.isInfoEnabled()) {
+        LOG.info("process: superstepFinished signaled");
+      }
+      superstepFinished.signal();
+      eventProcessed = true;
+    } else if (event.getPath().endsWith(applicationAttemptsPath) &&
+        event.getType() == EventType.NodeChildrenChanged) {
+      if (LOG.isInfoEnabled()) {
+        LOG.info("process: applicationAttemptChanged signaled");
+      }
+      applicationAttemptChanged.signal();
+      eventProcessed = true;
+    } else if (event.getPath().contains(MASTER_ELECTION_DIR) &&
+        event.getType() == EventType.NodeChildrenChanged) {
+      if (LOG.isInfoEnabled()) {
+        LOG.info("process: masterElectionChildrenChanged signaled");
+      }
+      masterElectionChildrenChanged.signal();
+      eventProcessed = true;
+    } else if (event.getPath().equals(cleanedUpPath) &&
+        event.getType() == EventType.NodeChildrenChanged) {
+      if (LOG.isInfoEnabled()) {
+        LOG.info("process: cleanedUpChildrenChanged signaled");
+      }
+      cleanedUpChildrenChanged.signal();
+      eventProcessed = true;
+    }
+
+    if (!(processEvent(event)) && (!eventProcessed)) {
+      LOG.warn("process: Unknown and unprocessed event (path=" +
+          event.getPath() + ", type=" + event.getType() +
+          ", state=" + event.getState() + ")");
+    }
+  }
+
+  /**
+   * Process WatchedEvent for Mapping Inputsplits
+   *
+   * @param event watched event
+   * @return true if event processed
+   */
+  public final boolean processMappingEvent(WatchedEvent event) {
+    boolean eventProcessed = false;
+    if (event.getPath().equals(
+        mappingInputSplitsPaths.getAllReadyPath()) &&
+        (event.getType() == EventType.NodeCreated)) {
+      if (LOG.isInfoEnabled()) {
+        LOG.info("process: inputSplitsReadyChanged " +
+            "(input splits ready)");
+      }
+      mappingInputSplitsEvents.getAllReadyChanged().signal();
+      eventProcessed = true;
+    } else if (event.getPath().endsWith(MAPPING_INPUT_SPLIT_RESERVED_NODE) &&
+        (event.getType() == EventType.NodeCreated)) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("process: mappingInputSplitsStateChanged " +
+            "(made a reservation)");
+      }
+      mappingInputSplitsEvents.getStateChanged().signal();
+      eventProcessed = true;
+    } else if (event.getPath().endsWith(MAPPING_INPUT_SPLIT_RESERVED_NODE) &&
+        (event.getType() == EventType.NodeDeleted)) {
+      if (LOG.isInfoEnabled()) {
+        LOG.info("process: mappingInputSplitsStateChanged " +
+            "(lost a reservation)");
+      }
+      mappingInputSplitsEvents.getStateChanged().signal();
+      eventProcessed = true;
+    } else if (event.getPath().endsWith(MAPPING_INPUT_SPLIT_FINISHED_NODE) &&
+        (event.getType() == EventType.NodeCreated)) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("process: mappingInputSplitsStateChanged " +
+            "(finished inputsplit)");
+      }
+      mappingInputSplitsEvents.getStateChanged().signal();
+      eventProcessed = true;
+    } else if (event.getPath().endsWith(MAPPING_INPUT_SPLIT_DONE_DIR) &&
+        (event.getType() == EventType.NodeChildrenChanged)) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("process: mappingInputSplitsDoneStateChanged " +
+            "(worker finished sending)");
+      }
+      mappingInputSplitsEvents.getDoneStateChanged().signal();
+      eventProcessed = true;
     } else if (event.getPath().equals(
+        mappingInputSplitsPaths.getAllDonePath()) &&
+        (event.getType() == EventType.NodeCreated)) {
+      if (LOG.isInfoEnabled()) {
+        LOG.info("process: mappingInputSplitsAllDoneChanged " +
+            "(all entries sent from input splits)");
+      }
+      mappingInputSplitsEvents.getAllDoneChanged().signal();
+      eventProcessed = true;
+    }
+    return eventProcessed;
+  }
+
+  /**
+   * Process WatchedEvent for Vertex Inputsplits
+   *
+   * @param event watched event
+   * @return true if event processed
+   */
+  public final boolean processVertexEvent(WatchedEvent event) {
+    boolean eventProcessed = false;
+    if (event.getPath().equals(
         vertexInputSplitsPaths.getAllReadyPath()) &&
         (event.getType() == EventType.NodeCreated)) {
       if (LOG.isInfoEnabled()) {
@@ -1009,7 +1152,19 @@ public abstract class BspService<I extends WritableComparable,
       }
       vertexInputSplitsEvents.getAllDoneChanged().signal();
       eventProcessed = true;
-    } else if (event.getPath().equals(
+    }
+    return eventProcessed;
+  }
+
+  /**
+   * Process WatchedEvent for Edge Inputsplits
+   *
+   * @param event watched event
+   * @return true if event processed
+   */
+  public final boolean processEdgeEvent(WatchedEvent event) {
+    boolean eventProcessed = false;
+    if (event.getPath().equals(
         edgeInputSplitsPaths.getAllReadyPath()) &&
         (event.getType() == EventType.NodeCreated)) {
       if (LOG.isInfoEnabled()) {
@@ -1059,48 +1214,7 @@ public abstract class BspService<I extends WritableComparable,
       }
       edgeInputSplitsEvents.getAllDoneChanged().signal();
       eventProcessed = true;
-    } else if (event.getPath().contains(ADDRESSES_AND_PARTITIONS_DIR) &&
-        event.getType() == EventType.NodeCreated) {
-      if (LOG.isInfoEnabled()) {
-        LOG.info("process: partitionAssignmentsReadyChanged " +
-            "(partitions are assigned)");
-      }
-      addressesAndPartitionsReadyChanged.signal();
-      eventProcessed = true;
-    } else if (event.getPath().contains(SUPERSTEP_FINISHED_NODE) &&
-        event.getType() == EventType.NodeCreated) {
-      if (LOG.isInfoEnabled()) {
-        LOG.info("process: superstepFinished signaled");
-      }
-      superstepFinished.signal();
-      eventProcessed = true;
-    } else if (event.getPath().endsWith(applicationAttemptsPath) &&
-        event.getType() == EventType.NodeChildrenChanged) {
-      if (LOG.isInfoEnabled()) {
-        LOG.info("process: applicationAttemptChanged signaled");
-      }
-      applicationAttemptChanged.signal();
-      eventProcessed = true;
-    } else if (event.getPath().contains(MASTER_ELECTION_DIR) &&
-        event.getType() == EventType.NodeChildrenChanged) {
-      if (LOG.isInfoEnabled()) {
-        LOG.info("process: masterElectionChildrenChanged signaled");
-      }
-      masterElectionChildrenChanged.signal();
-      eventProcessed = true;
-    } else if (event.getPath().equals(cleanedUpPath) &&
-        event.getType() == EventType.NodeChildrenChanged) {
-      if (LOG.isInfoEnabled()) {
-        LOG.info("process: cleanedUpChildrenChanged signaled");
-      }
-      cleanedUpChildrenChanged.signal();
-      eventProcessed = true;
-    }
-
-    if (!(processEvent(event)) && (!eventProcessed)) {
-      LOG.warn("process: Unknown and unprocessed event (path=" +
-          event.getPath() + ", type=" + event.getType() +
-          ", state=" + event.getState() + ")");
     }
+    return eventProcessed;
   }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/4a133f57/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedServiceMaster.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedServiceMaster.java b/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedServiceMaster.java
index bda967d..e5b7cf3 100644
--- a/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedServiceMaster.java
+++ b/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedServiceMaster.java
@@ -71,6 +71,16 @@ public interface CentralizedServiceMaster<I extends WritableComparable,
 
   /**
    * Create the {@link BspInputSplit} objects from the index range based on the
+   * user-defined MappingInputFormat.  The {@link BspInputSplit} objects will
+   * processed by the workers later on during the INPUT_SUPERSTEP.
+   *
+   * @return Number of splits. Returns -1 on failure to create
+   *         valid input splits.
+   */
+  int createMappingInputSplits();
+
+  /**
+   * Create the {@link BspInputSplit} objects from the index range based on the
    * user-defined VertexInputFormat.  The {@link BspInputSplit} objects will
    * processed by the workers later on during the INPUT_SUPERSTEP.
    *

http://git-wip-us.apache.org/repos/asf/giraph/blob/4a133f57/giraph-core/src/main/java/org/apache/giraph/conf/GiraphClasses.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphClasses.java b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphClasses.java
index 3337621..e7b18aa 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphClasses.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphClasses.java
@@ -31,6 +31,7 @@ import org.apache.giraph.graph.VertexValueCombiner;
 import org.apache.giraph.graph.VertexResolver;
 import org.apache.giraph.io.EdgeInputFormat;
 import org.apache.giraph.io.EdgeOutputFormat;
+import org.apache.giraph.io.MappingInputFormat;
 import org.apache.giraph.io.VertexInputFormat;
 import org.apache.giraph.io.VertexOutputFormat;
 import org.apache.giraph.io.filters.DefaultEdgeInputFilter;
@@ -85,6 +86,9 @@ public class GiraphClasses<I extends WritableComparable,
   /** Vertex output format class - cached for fast access */
   protected Class<? extends VertexOutputFormat<I, V, E>>
   vertexOutputFormatClass;
+  /** Mapping input format - cached for fast access */
+  protected Class<? extends MappingInputFormat<I, V, E, ? extends Writable>>
+  mappingInputFormatClass;
   /** Edge input format class - cached for fast access */
   protected Class<? extends EdgeInputFormat<I, E>>
   edgeInputFormatClass;
@@ -113,8 +117,7 @@ public class GiraphClasses<I extends WritableComparable,
   /** Edge Input Filter class */
   protected Class<? extends EdgeInputFilter<I, E>> edgeInputFilterClass;
   /** Vertex Input Filter class */
-  protected Class<? extends VertexInputFilter<I, V, E>>
-  vertexInputFilterClass;
+  protected Class<? extends VertexInputFilter<I, V, E>> vertexInputFilterClass;
 
   /**
    * Empty constructor. Initialize with default classes or null.
@@ -181,6 +184,9 @@ public class GiraphClasses<I extends WritableComparable,
         EDGE_INPUT_FORMAT_CLASS.get(conf);
     edgeOutputFormatClass = (Class<? extends EdgeOutputFormat<I, V, E>>)
         EDGE_OUTPUT_FORMAT_CLASS.get(conf);
+    mappingInputFormatClass = (Class<? extends MappingInputFormat<I, V, E,
+        ? extends Writable>>)
+        MAPPING_INPUT_FORMAT_CLASS.get(conf);
 
     aggregatorWriterClass = AGGREGATOR_WRITER_CLASS.get(conf);
     messageCombinerClass =
@@ -335,6 +341,15 @@ public class GiraphClasses<I extends WritableComparable,
   }
 
   /**
+   * Check if MappingInputFormat is set
+   *
+   * @return true if MappingInputFormat is set
+   */
+  public boolean hasMappingInputFormat() {
+    return mappingInputFormatClass != null;
+  }
+
+  /**
    * Get VertexOutputFormat set
    *
    * @return VertexOutputFormat
@@ -344,6 +359,11 @@ public class GiraphClasses<I extends WritableComparable,
     return vertexOutputFormatClass;
   }
 
+  public Class<? extends MappingInputFormat<I, V, E, ? extends Writable>>
+  getMappingInputFormatClass() {
+    return mappingInputFormatClass;
+  }
+
   /**
    * Check if EdgeInputFormat is set
    *

http://git-wip-us.apache.org/repos/asf/giraph/blob/4a133f57/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
index 611a0dc..dd0c9ae 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
@@ -45,6 +45,7 @@ import org.apache.giraph.graph.VertexValueCombiner;
 import org.apache.giraph.graph.VertexResolver;
 import org.apache.giraph.io.EdgeInputFormat;
 import org.apache.giraph.io.EdgeOutputFormat;
+import org.apache.giraph.io.MappingInputFormat;
 import org.apache.giraph.io.VertexInputFormat;
 import org.apache.giraph.io.VertexOutputFormat;
 import org.apache.giraph.io.filters.DefaultEdgeInputFilter;
@@ -56,6 +57,9 @@ import org.apache.giraph.job.DefaultJobObserver;
 import org.apache.giraph.job.GiraphJobObserver;
 import org.apache.giraph.job.GiraphJobRetryChecker;
 import org.apache.giraph.job.HaltApplicationUtils;
+import org.apache.giraph.mapping.MappingStore;
+import org.apache.giraph.mapping.MappingStoreOps;
+import org.apache.giraph.mapping.translate.TranslateEdge;
 import org.apache.giraph.master.DefaultMasterCompute;
 import org.apache.giraph.master.MasterCompute;
 import org.apache.giraph.master.MasterObserver;
@@ -80,6 +84,30 @@ public interface GiraphConstants {
   /** 1KB in bytes */
   int ONE_KB = 1024;
 
+  /** Mapping related information */
+  ClassConfOption<? extends MappingStore> MAPPING_STORE_CLASS =
+      ClassConfOption.create("giraph.mappingStoreClass", null,
+          MappingStore.class, "MappingStore Class");
+
+  /** Class to use for performing read operations on mapping store */
+  ClassConfOption<? extends MappingStoreOps> MAPPING_STORE_OPS_CLASS =
+      ClassConfOption.create("giraph.mappingStoreOpsClass", null,
+          MappingStoreOps.class, "MappingStoreOps class");
+
+  /** Upper value of LongByteMappingStore */
+  IntConfOption LB_MAPPINGSTORE_UPPER =
+      new IntConfOption("giraph.lbMappingStoreUpper", -1,
+          "'upper' value used by lbmappingstore");
+  /** Lower value of LongByteMappingStore */
+  IntConfOption LB_MAPPINGSTORE_LOWER =
+      new IntConfOption("giraph.lbMappingStoreLower", -1,
+          "'lower' value used by lbMappingstore");
+  /** Class used to conduct expensive edge translation during vertex input */
+  ClassConfOption EDGE_TRANSLATION_CLASS =
+      ClassConfOption.create("giraph.edgeTranslationClass", null,
+          TranslateEdge.class, "Class used to conduct expensive edge " +
+              "translation during vertex input phase");
+
   /** Computation class - required */
   ClassConfOption<Computation> COMPUTATION_CLASS =
       ClassConfOption.create("giraph.computationClass", null,
@@ -230,6 +258,10 @@ public interface GiraphConstants {
   ClassConfOption<EdgeInputFormat> EDGE_INPUT_FORMAT_CLASS =
       ClassConfOption.create("giraph.edgeInputFormatClass", null,
           EdgeInputFormat.class, "EdgeInputFormat class");
+  /** MappingInputFormat class */
+  ClassConfOption<MappingInputFormat> MAPPING_INPUT_FORMAT_CLASS =
+      ClassConfOption.create("giraph.mappingInputFormatClass", null,
+          MappingInputFormat.class, "MappingInputFormat class");
 
   /** EdgeInputFilter class */
   ClassConfOption<EdgeInputFilter> EDGE_INPUT_FILTER_CLASS =

http://git-wip-us.apache.org/repos/asf/giraph/blob/4a133f57/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java b/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java
index e9f50f9..3d7b3db 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java
@@ -18,6 +18,7 @@
 
 package org.apache.giraph.conf;
 
+import com.google.common.base.Preconditions;
 import org.apache.giraph.aggregators.AggregatorWriter;
 import org.apache.giraph.combiner.MessageCombiner;
 import org.apache.giraph.edge.Edge;
@@ -39,12 +40,14 @@ import org.apache.giraph.graph.VertexResolver;
 import org.apache.giraph.graph.VertexValueCombiner;
 import org.apache.giraph.io.EdgeInputFormat;
 import org.apache.giraph.io.EdgeOutputFormat;
+import org.apache.giraph.io.MappingInputFormat;
 import org.apache.giraph.io.VertexInputFormat;
 import org.apache.giraph.io.VertexOutputFormat;
 import org.apache.giraph.io.filters.EdgeInputFilter;
 import org.apache.giraph.io.filters.VertexInputFilter;
 import org.apache.giraph.io.internal.WrappedEdgeInputFormat;
 import org.apache.giraph.io.internal.WrappedEdgeOutputFormat;
+import org.apache.giraph.io.internal.WrappedMappingInputFormat;
 import org.apache.giraph.io.internal.WrappedVertexInputFormat;
 import org.apache.giraph.io.internal.WrappedVertexOutputFormat;
 import org.apache.giraph.io.superstep_output.MultiThreadedSuperstepOutput;
@@ -53,6 +56,9 @@ import org.apache.giraph.io.superstep_output.SuperstepOutput;
 import org.apache.giraph.io.superstep_output.SynchronizedSuperstepOutput;
 import org.apache.giraph.job.GiraphJobObserver;
 import org.apache.giraph.job.GiraphJobRetryChecker;
+import org.apache.giraph.mapping.MappingStore;
+import org.apache.giraph.mapping.MappingStoreOps;
+import org.apache.giraph.mapping.translate.TranslateEdge;
 import org.apache.giraph.master.MasterCompute;
 import org.apache.giraph.master.MasterObserver;
 import org.apache.giraph.master.SuperstepClasses;
@@ -65,6 +71,7 @@ import org.apache.giraph.utils.ExtendedDataOutput;
 import org.apache.giraph.utils.ReflectionUtils;
 import org.apache.giraph.utils.UnsafeByteArrayInputStream;
 import org.apache.giraph.utils.UnsafeByteArrayOutputStream;
+import org.apache.giraph.utils.WritableUtils;
 import org.apache.giraph.utils.io.BigDataInputOutput;
 import org.apache.giraph.utils.io.DataInputOutput;
 import org.apache.giraph.utils.io.ExtendedDataInputOutput;
@@ -91,6 +98,8 @@ public class ImmutableClassesGiraphConfiguration<I extends WritableComparable,
     V extends Writable, E extends Writable> extends GiraphConfiguration {
   /** Holder for all the classes */
   private final GiraphClasses classes;
+  /** Mapping target class */
+  private Class<? extends Writable> mappingTargetClass = null;
   /** Value (IVEMM) Factories */
   private final ValueFactories<I, V, E> valueFactories;
   /** Language values (IVEMM) are implemented in */
@@ -148,6 +157,27 @@ public class ImmutableClassesGiraphConfiguration<I extends WritableComparable,
   }
 
   /**
+   * Get the class used for edge translation during vertex input
+   *
+   * @return edge translation class
+   */
+  public Class<? extends TranslateEdge> edgeTranslationClass() {
+    return EDGE_TRANSLATION_CLASS.get(this);
+  }
+
+  /**
+   * Instance of TranslateEdge that contains helper method for edge translation
+   *
+   * @return instance of TranslateEdge
+   */
+  public TranslateEdge<I, E> edgeTranslationInstance() {
+    if (edgeTranslationClass() != null) {
+      return ReflectionUtils.newInstance(edgeTranslationClass(), this);
+    }
+    return null;
+  }
+
+  /**
    * Get the vertex input filter class
    *
    * @return VertexInputFilter class
@@ -274,6 +304,25 @@ public class ImmutableClassesGiraphConfiguration<I extends WritableComparable,
   }
 
   /**
+   * Get MappingInputFormatClass
+   *
+   * @return MappingInputFormatClass
+   */
+  public Class<? extends MappingInputFormat<I, V, E, ? extends Writable>>
+  getMappingInputFormatClass() {
+    return classes.getMappingInputFormatClass();
+  }
+
+  /**
+   * Check if mappingInputFormat is set
+   *
+   * @return true if mappingInputFormat is set
+   */
+  public boolean hasMappingInputFormat() {
+    return classes.hasMappingInputFormat();
+  }
+
+  /**
    * Create a user vertex output format class.
    * Note: Giraph should only use WrappedVertexOutputFormat,
    * which makes sure that Configuration parameters are set properly.
@@ -287,6 +336,20 @@ public class ImmutableClassesGiraphConfiguration<I extends WritableComparable,
   }
 
   /**
+   * Create a user mapping input format class.
+   * Note: Giraph should only use WrappedMappingInputFormat,
+   * which makes sure that Configuration parameters are set properly.
+   *
+   * @return Instantiated user mapping input format class
+   */
+  private MappingInputFormat<I, V, E, ? extends Writable>
+  createMappingInputFormat() {
+    Class<? extends MappingInputFormat<I, V, E, ? extends Writable>> klass =
+        getMappingInputFormatClass();
+    return ReflectionUtils.newInstance(klass, this);
+  }
+
+  /**
    * Create a wrapper for user vertex output format,
    * which makes sure that Configuration parameters are set properly in all
    * methods related to this format.
@@ -300,6 +363,22 @@ public class ImmutableClassesGiraphConfiguration<I extends WritableComparable,
     return wrappedVertexOutputFormat;
   }
 
+  /**
+   * Create a wrapper for user mapping input format,
+   * which makes sure that Configuration parameters are set properly in all
+   * methods related to this format.
+   *
+   * @return Wrapper around user mapping input format
+   */
+  public WrappedMappingInputFormat<I, V, E, ? extends Writable>
+  createWrappedMappingInputFormat() {
+    WrappedMappingInputFormat<I, V, E, ? extends Writable>
+      wrappedMappingInputFormat =
+        new WrappedMappingInputFormat<>(createMappingInputFormat());
+    configureIfPossible(wrappedMappingInputFormat);
+    return wrappedMappingInputFormat;
+  }
+
   @Override
   public boolean hasEdgeOutputFormat() {
     return classes.hasEdgeOutputFormat();
@@ -756,6 +835,24 @@ public class ImmutableClassesGiraphConfiguration<I extends WritableComparable,
   }
 
   /**
+   * Create edge based on #createEdge definition
+   *
+   * @param translateEdge instance of TranslateEdge
+   * @param edge edge to be translated
+   * @return translated edge
+   */
+  public Edge<I, E> createEdge(TranslateEdge<I, E>
+    translateEdge, Edge<I, E> edge) {
+    I translatedId = translateEdge.translateId(edge.getTargetVertexId());
+    if (isEdgeValueNullWritable()) {
+      return (Edge<I, E>) EdgeFactory.create(translatedId);
+    } else {
+      return EdgeFactory.create(translatedId,
+        translateEdge.cloneValue(edge.getValue()));
+    }
+  }
+
+  /**
    * Create a reusable edge.
    *
    * @return Instantiated reusable edge.
@@ -856,6 +953,74 @@ public class ImmutableClassesGiraphConfiguration<I extends WritableComparable,
   }
 
   /**
+   * Get MappingStore class to be used
+   *
+   * @return MappingStore class set by user
+   */
+  public Class<? extends MappingStore> getMappingStoreClass() {
+    return MAPPING_STORE_CLASS.get(this);
+  }
+
+  /**
+   * Create a {@link org.apache.giraph.mapping.MappingStore} instance
+   *
+   * @return MappingStore Instance
+   */
+  public MappingStore<I, ? extends Writable> createMappingStore() {
+    if (getMappingStoreClass() != null) {
+      return ReflectionUtils.newInstance(getMappingStoreClass(), this);
+    } else {
+      return null;
+    }
+  }
+
+  /**
+   * Get MappingStoreOps class to be used
+   *
+   * @return MappingStoreOps class set by user
+   */
+  public Class<? extends MappingStoreOps> getMappingStoreOpsClass() {
+    return MAPPING_STORE_OPS_CLASS.get(this);
+  }
+
+  /**
+   * Create a {@link org.apache.giraph.mapping.MappingStoreOps} instance
+   *
+   * @return MappingStoreOps Instance
+   */
+  public MappingStoreOps<I, ? extends Writable> createMappingStoreOps() {
+    if (getMappingStoreOpsClass() != null) {
+      return ReflectionUtils.newInstance(getMappingStoreOpsClass(), this);
+    } else {
+      return null;
+    }
+  }
+
+  /**
+   * Get mappingTarget class
+   *
+   * @return mappingTarget class
+   */
+  public Class<? extends Writable> getMappingTargetClass() {
+    if (mappingTargetClass == null) {
+      Class<?>[] classList = ReflectionUtils.getTypeArguments(
+        MappingStore.class, getMappingStoreClass());
+      Preconditions.checkArgument(classList.length == 2);
+      mappingTargetClass = (Class<? extends Writable>) classList[1];
+    }
+    return mappingTargetClass;
+  }
+
+  /**
+   * Create and return mappingTarget instance
+   *
+   * @return mappingTarget instance
+   */
+  public Writable createMappingTarget() {
+    return WritableUtils.createWritable(getMappingTargetClass());
+  }
+
+  /**
    * Create a user {@link org.apache.giraph.edge.OutEdges}
    *
    * @return Instantiated user OutEdges

http://git-wip-us.apache.org/repos/asf/giraph/blob/4a133f57/giraph-core/src/main/java/org/apache/giraph/io/MappingInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/MappingInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/MappingInputFormat.java
new file mode 100644
index 0000000..2666268
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/io/MappingInputFormat.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.io;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import java.io.IOException;
+
+/**
+ *
+ * Use this to load data for a BSP application.  Note that the InputSplit must
+ * also implement Writable.
+ *
+ * It's guaranteed that whatever parameters are set in the configuration are
+ * also going to be available in all method arguments related to this input
+ * format (context in getSplits and createVertexReader; methods invoked on
+ * MappingReader). So if backing input format relies on some parameters from
+ * configuration, you can safely set them for example in
+ * {@link #setConf(org.apache.giraph.conf.ImmutableClassesGiraphConfiguration)}.
+ *
+ * @param <I> vertexId type
+ * @param <V> vertexValue type
+ * @param <E> edgeValue type
+ * @param <B> mappingTarget type
+ */
+@SuppressWarnings("unchecked")
+public abstract class MappingInputFormat<I extends WritableComparable,
+    V extends Writable, E extends Writable, B extends Writable>
+    extends GiraphInputFormat<I, V, E> {
+
+  /**
+   * Create a vertex reader for a given split. Guaranteed to have been
+   * configured with setConf() prior to use.  The framework will also call
+   * {@link VertexReader#initialize(InputSplit,
+   * org.apache.hadoop.mapreduce.TaskAttemptContext)} before
+   * the split is used.
+   *
+   * @param split the split to be read
+   * @param context the information about the task
+   * @return a new record reader
+   * @throws IOException
+   */
+  public abstract MappingReader<I, V, E, B> createMappingReader(
+      InputSplit split, TaskAttemptContext context) throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/4a133f57/giraph-core/src/main/java/org/apache/giraph/io/MappingReader.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/MappingReader.java b/giraph-core/src/main/java/org/apache/giraph/io/MappingReader.java
new file mode 100644
index 0000000..b7ce97c
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/io/MappingReader.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.io;
+
+import org.apache.giraph.conf.DefaultImmutableClassesGiraphConfigurable;
+import org.apache.giraph.mapping.MappingEntry;
+import org.apache.giraph.worker.WorkerAggregatorUsage;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import java.io.IOException;
+
+/**
+ * Will read the mapping from an input split.
+ *
+ * @param <I> vertexId type
+ * @param <V> vertexValue type
+ * @param <E> edgeValue type
+ * @param <B> mappingTarget type
+ */
+public abstract class MappingReader<I extends WritableComparable,
+    V extends Writable, E extends Writable, B extends Writable>
+    extends DefaultImmutableClassesGiraphConfigurable<I, V, E>
+    implements WorkerAggregatorUsage {
+
+  /** Aggregator usage for vertex reader */
+  private WorkerAggregatorUsage workerAggregatorUsage;
+
+  /**
+   * Use the input split and context to setup reading the vertices.
+   * Guaranteed to be called prior to any other function.
+   *
+   * @param inputSplit Input split to be used for reading vertices.
+   * @param context Context from the task.
+   * @throws java.io.IOException
+   * @throws InterruptedException
+   */
+  public abstract void initialize(InputSplit inputSplit,
+    TaskAttemptContext context)
+    throws IOException, InterruptedException;
+
+
+  /**
+   * Set aggregator usage. It provides the functionality
+   * of aggregation operation in reading a vertex.
+   * It is invoked just after initialization.
+   * E.g.,
+   * vertexReader.initialize(inputSplit, context);
+   * vertexReader.setAggregator(aggregatorUsage);
+   * This method is only for use by the infrastructure.
+   *
+   * @param agg aggregator usage for vertex reader
+   */
+  public void setWorkerAggregatorUse(WorkerAggregatorUsage agg) {
+    workerAggregatorUsage = agg;
+  }
+
+  /**
+   *
+   * @return false iff there are no more vertices
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  public abstract boolean nextEntry() throws IOException,
+    InterruptedException;
+
+
+  /**
+   * Get the current entry.
+   *
+   * @return the current entry which has been read.
+   *         nextVEntry() should be called first.
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  public abstract MappingEntry<I, B> getCurrentEntry()
+    throws IOException, InterruptedException;
+
+
+  /**
+   * Close this {@link MappingReader} to future operations.
+   *
+   * @throws IOException
+   */
+  public abstract void close() throws IOException;
+
+  /**
+   * How much of the input has the {@link VertexReader} consumed i.e.
+   * has been processed by?
+   *
+   * @return Progress from <code>0.0</code> to <code>1.0</code>.
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  public abstract float getProgress() throws IOException, InterruptedException;
+
+  @Override
+  public <A extends Writable> void aggregate(String name, A value) {
+    workerAggregatorUsage.aggregate(name, value);
+  }
+
+  @Override
+  public <A extends Writable> A getAggregatedValue(String name) {
+    return workerAggregatorUsage.getAggregatedValue(name);
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/4a133f57/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedMappingInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedMappingInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedMappingInputFormat.java
new file mode 100644
index 0000000..72f8177
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedMappingInputFormat.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.io.internal;
+
+import org.apache.giraph.io.MappingInputFormat;
+import org.apache.giraph.io.MappingReader;
+import org.apache.giraph.job.HadoopUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * For internal use only.
+ *
+ * Wraps user set {@link org.apache.giraph.io.VertexInputFormat} to make
+ * sure proper configuration parameters are passed around, that user can set
+ * parameters in configuration and they will be available in other methods
+ * related to this format.
+ *
+ * @param <I> vertexId type
+ * @param <V> vertexValue type
+ * @param <E> edgeValue type
+ * @param <B> mappingTarget type
+ */
+public class WrappedMappingInputFormat<I extends WritableComparable,
+  V extends Writable, E extends Writable, B extends Writable>
+  extends MappingInputFormat<I, V, E, B> {
+  /** originalInputFormat to wrap over */
+  private MappingInputFormat<I, V, E, B> originalInputFormat;
+
+  /**
+   * Constructor
+   *
+   * @param mappingInputFormat original mappingInputFormat
+   */
+  public WrappedMappingInputFormat(
+      MappingInputFormat<I, V, E, B> mappingInputFormat) {
+    originalInputFormat = mappingInputFormat;
+  }
+
+  @Override
+  public void checkInputSpecs(Configuration conf) {
+    originalInputFormat.checkInputSpecs(conf);
+  }
+
+  @Override
+  public List<InputSplit> getSplits(JobContext context, int minSplitCountHint)
+    throws IOException, InterruptedException {
+    return originalInputFormat.getSplits(
+        HadoopUtils.makeJobContext(getConf(), context),
+        minSplitCountHint);
+  }
+
+  @Override
+  public MappingReader<I, V, E, B> createMappingReader(InputSplit split,
+    TaskAttemptContext context) throws IOException {
+    final MappingReader<I, V, E, B> mappingReader = originalInputFormat
+        .createMappingReader(split,
+            HadoopUtils.makeTaskAttemptContext(getConf(), context));
+    return new WrappedMappingReader<>(mappingReader, getConf());
+  }
+
+
+  @Override
+  public void writeInputSplit(InputSplit inputSplit,
+    DataOutput dataOutput) throws IOException {
+    originalInputFormat.writeInputSplit(inputSplit, dataOutput);
+  }
+
+  @Override
+  public InputSplit readInputSplit(
+    DataInput dataInput) throws IOException, ClassNotFoundException {
+    return originalInputFormat.readInputSplit(dataInput);
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/4a133f57/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedMappingReader.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedMappingReader.java b/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedMappingReader.java
new file mode 100644
index 0000000..7d1c4c9
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedMappingReader.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.io.internal;
+
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.io.MappingReader;
+import org.apache.giraph.job.HadoopUtils;
+import org.apache.giraph.mapping.MappingEntry;
+import org.apache.giraph.worker.WorkerAggregatorUsage;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import java.io.IOException;
+
+/**
+ * For internal use only.
+ *
+ * Wraps {@link org.apache.giraph.io.MappingReader} to make sure proper
+ * configuration parameters are passed around, that parameters set in original
+ * configuration are available in methods of this reader
+ *
+ * @param <I> vertexId type
+ * @param <V> vertexValue type
+ * @param <E> edgeValue type
+ * @param <B> mappingTarget type
+ */
+public class WrappedMappingReader<I extends WritableComparable,
+  V extends Writable, E extends Writable, B extends Writable>
+  extends MappingReader<I, V, E, B> {
+  /** User set baseMappingReader wrapped over */
+  private final MappingReader<I, V, E, B> baseMappingReader;
+
+  /**
+   * Constructor
+   *
+   * @param baseMappingReader User set baseMappingReader
+   * @param conf configuration
+   */
+  public WrappedMappingReader(MappingReader<I, V, E, B> baseMappingReader,
+    ImmutableClassesGiraphConfiguration<I, V, E> conf) {
+    this.baseMappingReader = baseMappingReader;
+    super.setConf(conf);
+    baseMappingReader.setConf(conf);
+  }
+
+  @Override
+  public void setConf(
+      ImmutableClassesGiraphConfiguration<I, V, E> conf) {
+    // We don't want to use external configuration
+  }
+
+  @Override
+  public void initialize(InputSplit inputSplit,
+    TaskAttemptContext context) throws IOException, InterruptedException {
+    baseMappingReader.initialize(inputSplit,
+        HadoopUtils.makeTaskAttemptContext(getConf(), context));
+  }
+
+
+  @Override
+  public void setWorkerAggregatorUse(WorkerAggregatorUsage agg) {
+    // Set aggregator usage for vertex reader
+    baseMappingReader.setWorkerAggregatorUse(agg);
+  }
+
+  @Override
+  public boolean nextEntry() throws IOException, InterruptedException {
+    return baseMappingReader.nextEntry();
+  }
+
+  @Override
+  public MappingEntry<I, B> getCurrentEntry()
+    throws IOException, InterruptedException {
+    return baseMappingReader.getCurrentEntry();
+  }
+
+
+  @Override
+  public void close() throws IOException {
+    baseMappingReader.close();
+  }
+
+  @Override
+  public float getProgress() throws IOException, InterruptedException {
+    return baseMappingReader.getProgress();
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/4a133f57/giraph-core/src/main/java/org/apache/giraph/io/iterables/MappingReaderWrapper.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/iterables/MappingReaderWrapper.java b/giraph-core/src/main/java/org/apache/giraph/io/iterables/MappingReaderWrapper.java
new file mode 100644
index 0000000..d8b9c31
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/io/iterables/MappingReaderWrapper.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.io.iterables;
+
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.io.MappingReader;
+import org.apache.giraph.mapping.MappingEntry;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import java.io.IOException;
+
+/**
+ * Wraps {@link GiraphReader} for mapping into
+ * {@link org.apache.giraph.io.MappingReader}
+ *
+ * @param <I> vertexId type
+ * @param <V> vertexValue type
+ * @param <E> edgeValue type
+ * @param <B> mappingTarget type
+ */
+public class MappingReaderWrapper<I extends WritableComparable,
+  V extends Writable, E extends  Writable, B extends Writable>
+  extends MappingReader<I, V, E, B> {
+  /** Wrapped mapping reader */
+  private GiraphReader<MappingEntry<I, B>> mappingReader;
+  /**
+   * {@link org.apache.giraph.io.MappingReader}-like wrapper of
+   * {@link #mappingReader}
+   */
+  private IteratorToReaderWrapper<MappingEntry<I, B>> iterator;
+
+  /**
+   * Constructor
+   *
+   * @param mappingReader user supplied mappingReader
+   */
+  public MappingReaderWrapper(GiraphReader<MappingEntry<I, B>> mappingReader) {
+    this.mappingReader = mappingReader;
+    iterator = new IteratorToReaderWrapper<>(mappingReader);
+  }
+
+  @Override
+  public void setConf(
+      ImmutableClassesGiraphConfiguration<I, V, E> conf) {
+    super.setConf(conf);
+    conf.configureIfPossible(mappingReader);
+  }
+
+  @Override
+  public boolean nextEntry() throws IOException, InterruptedException {
+    return iterator.nextObject();
+  }
+
+  @Override
+  public MappingEntry<I, B> getCurrentEntry()
+    throws IOException, InterruptedException {
+    return iterator.getCurrentObject();
+  }
+
+
+  @Override
+  public void initialize(InputSplit inputSplit,
+    TaskAttemptContext context) throws IOException, InterruptedException {
+    mappingReader.initialize(inputSplit, context);
+  }
+
+  @Override
+  public void close() throws IOException {
+    mappingReader.close();
+  }
+
+  @Override
+  public float getProgress() throws IOException, InterruptedException {
+    return mappingReader.getProgress();
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/4a133f57/giraph-core/src/main/java/org/apache/giraph/mapping/AbstractLongByteOps.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/mapping/AbstractLongByteOps.java b/giraph-core/src/main/java/org/apache/giraph/mapping/AbstractLongByteOps.java
new file mode 100644
index 0000000..2e2310b
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/mapping/AbstractLongByteOps.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.mapping;
+
+import org.apache.hadoop.io.ByteWritable;
+import org.apache.hadoop.io.LongWritable;
+
+/**
+ * Implementation of basic methods in MappingStoreOps
+ */
+@SuppressWarnings("unchecked, rawtypes")
+public abstract class AbstractLongByteOps
+  implements MappingStoreOps<LongWritable, ByteWritable> {
+  /** Mapping store instance to operate on */
+  protected LongByteMappingStore mappingStore;
+
+  @Override
+  public void initialize(MappingStore<LongWritable,
+      ByteWritable> mappingStore) {
+    this.mappingStore = (LongByteMappingStore) mappingStore;
+  }
+
+  /**
+   * Compute partition given id, partitionCount, workerCount & target
+   * @param id vertex id
+   * @param partitionCount number of partitions
+   * @param workerCount number of workers
+   * @param target target worker
+   * @return partition number
+   */
+  protected int computePartition(LongWritable id, int partitionCount,
+    int workerCount, byte target) {
+    int numRows = partitionCount / workerCount;
+    numRows = (numRows * workerCount == partitionCount) ? numRows : numRows + 1;
+    if (target == -1) {
+      // default to hash based partitioning
+      return Math.abs(id.hashCode() % partitionCount);
+    } else {
+      int targetWorker = target & 0xFF;
+      // assume zero based indexing of partition & worker [also consecutive]
+      return numRows * targetWorker + Math.abs(id.hashCode() % numRows);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/4a133f57/giraph-core/src/main/java/org/apache/giraph/mapping/DefaultEmbeddedLongByteOps.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/mapping/DefaultEmbeddedLongByteOps.java b/giraph-core/src/main/java/org/apache/giraph/mapping/DefaultEmbeddedLongByteOps.java
new file mode 100644
index 0000000..0b6f3e4
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/mapping/DefaultEmbeddedLongByteOps.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.mapping;
+
+import org.apache.hadoop.io.LongWritable;
+
+/**
+ * MappingStoreOps implementation used to embed target information into
+ * vertex id. Stores information in the higher order bits of the long id
+ */
+public class DefaultEmbeddedLongByteOps extends AbstractLongByteOps {
+  /** Bit mask for first 9 bits in a long */
+  private static final long MASK = ((long) 0x1FF) << 55;
+  /** Inverse of MASK */
+  private static final long IMASK = ~ MASK;
+
+  /**
+   * Default constructor (do not use)
+   */
+  public DefaultEmbeddedLongByteOps() {
+  }
+
+  @Override
+  public boolean hasEmbedding() {
+    return true;
+  }
+
+  @Override
+  public void embedTargetInfo(LongWritable id) {
+    if ((id.get() & MASK) != 0) {
+      throw new IllegalStateException("Expected first 9 bits of long " +
+          " to be empty");
+    }
+    byte target = mappingStore.getByteTarget(id);
+    // first bit = 0 & rest 8 bits set to target
+    // add 1 to distinguish between not set and assignment to worker-0
+    // (prefix bits = 0 can mean one of two things :
+    // no entry in the mapping, in which case target = -1, so -1 + 1 = 0
+    // vertex is created later during computation, so prefix bits are 0 anyway)
+    long maskValue = ((1L + target) & 0xFF) << 55;
+    id.set(id.get() | maskValue);
+  }
+
+  @Override
+  public void removeTargetInfo(LongWritable id) {
+    id.set(id.get() & IMASK);
+  }
+
+  @Override
+  public int getPartition(LongWritable id, int partitionCount,
+    int workerCount) {
+    // extract last 8 bits
+    // subtract 1 since added 1 during embedInfo (unset = -1)
+    byte target = (byte) (((id.get() >>> 55) & 0xFF) - 1);
+    return computePartition(id, partitionCount, workerCount, target);
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/4a133f57/giraph-core/src/main/java/org/apache/giraph/mapping/DefaultLongByteOps.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/mapping/DefaultLongByteOps.java b/giraph-core/src/main/java/org/apache/giraph/mapping/DefaultLongByteOps.java
new file mode 100644
index 0000000..57ece94
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/mapping/DefaultLongByteOps.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.mapping;
+
+import org.apache.hadoop.io.LongWritable;
+
+/**
+ * MappingStoreOps implementation which reads partition information from map
+ */
+@SuppressWarnings("unchecked, rawtypes")
+public class DefaultLongByteOps extends AbstractLongByteOps {
+
+  /**
+   * Default constructor (do not use)
+   */
+  public DefaultLongByteOps() {
+  }
+
+  @Override
+  public boolean hasEmbedding() {
+    return false;
+  }
+
+  @Override
+  public void embedTargetInfo(LongWritable id) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void removeTargetInfo(LongWritable id) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public int getPartition(LongWritable id, int partitionCount,
+    int workerCount) {
+    byte target = mappingStore.getByteTarget(id);
+    return computePartition(id, partitionCount, workerCount, target);
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/giraph/blob/4a133f57/giraph-core/src/main/java/org/apache/giraph/mapping/LongByteMappingStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/mapping/LongByteMappingStore.java b/giraph-core/src/main/java/org/apache/giraph/mapping/LongByteMappingStore.java
new file mode 100644
index 0000000..996fed0
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/mapping/LongByteMappingStore.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.mapping;
+
+import com.google.common.collect.MapMaker;
+import it.unimi.dsi.fastutil.longs.Long2ObjectOpenHashMap;
+import org.apache.giraph.conf.DefaultImmutableClassesGiraphConfigurable;
+import org.apache.giraph.conf.GiraphConstants;
+import org.apache.hadoop.io.ByteWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.log4j.Logger;
+
+import java.util.Arrays;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicLong;
+
+import javax.annotation.concurrent.ThreadSafe;
+
+/**
+ *
+ * An implementation of MappingStore<LongWritable, ByteWritable>
+ *
+ * Methods implemented here are thread safe by default because it is guaranteed
+ * that each entry is written to only once.
+ * It can represent up to a maximum of 254 workers
+ * any byte passed is treated as unsigned
+ */
+@ThreadSafe
+public class LongByteMappingStore
+  extends DefaultImmutableClassesGiraphConfigurable<LongWritable, Writable,
+  Writable> implements MappingStore<LongWritable, ByteWritable> {
+  /** Logger instance */
+  private static final Logger LOG = Logger.getLogger(
+    LongByteMappingStore.class);
+
+  /** Counts number of entries added */
+  private final AtomicLong numEntries = new AtomicLong(0);
+
+  /** Id prefix to bytesArray index mapping */
+  private ConcurrentMap<Long, byte[]> concurrentIdToBytes;
+  /** Primitive idToBytes for faster querying */
+  private Long2ObjectOpenHashMap<byte[]> idToBytes;
+  /** Number of lower order bits */
+  private int lower;
+  /** Number of distinct prefixes */
+  private int upper;
+  /** Bit mask for lowerOrder suffix bits */
+  private int lowerBitMask;
+  /** LowerOrder bits count */
+  private int lowerOrder;
+
+  @Override
+  public void initialize() {
+    upper = GiraphConstants.LB_MAPPINGSTORE_UPPER.get(getConf());
+    lower = GiraphConstants.LB_MAPPINGSTORE_LOWER.get(getConf());
+
+    if ((lower & (lower - 1)) != 0) {
+      throw new IllegalStateException("lower not a power of two");
+    }
+
+    lowerBitMask = lower - 1;
+    lowerOrder = Integer.numberOfTrailingZeros(lower); // log_2_(lower)
+    concurrentIdToBytes = new MapMaker()
+        .initialCapacity(upper)
+        .concurrencyLevel(getConf().getNumInputSplitsThreads())
+        .makeMap();
+    idToBytes = new Long2ObjectOpenHashMap<>(upper);
+  }
+
+  /**
+   * Auxiliary method to be used by getTarget
+   *
+   * @param vertexId vertexId
+   * @return return byte value of target
+   */
+  public byte getByteTarget(LongWritable vertexId) {
+    long key = vertexId.get() >>> lowerOrder;
+    int suffix = (int) (vertexId.get() & lowerBitMask);
+    if (!idToBytes.containsKey(key)) {
+      return -1;
+    }
+    return idToBytes.get(key)[suffix];
+  }
+
+  @Override
+  public void addEntry(LongWritable vertexId, ByteWritable target) {
+    long key = vertexId.get() >>> lowerOrder;
+    byte[] bytes = concurrentIdToBytes.get(key);
+    if (bytes == null) {
+      byte[] newBytes = new byte[lower];
+      Arrays.fill(newBytes, (byte) -1);
+      bytes = concurrentIdToBytes.putIfAbsent(key, newBytes);
+      if (bytes == null) {
+        bytes = newBytes;
+      }
+    }
+    bytes[(int) (vertexId.get() & lowerBitMask)] = target.get();
+    numEntries.getAndIncrement(); // increment count
+  }
+
+  @Override
+  public ByteWritable getTarget(LongWritable vertexId,
+    ByteWritable target) {
+    Byte bval = getByteTarget(vertexId);
+    if (bval == -1) { // worker not assigned by mapping
+      return null;
+    }
+    target.set(bval);
+    return target;
+  }
+
+  @Override
+  public void postFilling() {
+    // not thread-safe
+    for (Long id : concurrentIdToBytes.keySet()) {
+      idToBytes.put(id, concurrentIdToBytes.get(id));
+    }
+    concurrentIdToBytes.clear();
+    concurrentIdToBytes = null;
+  }
+
+  @Override
+  public long getStats() {
+    return numEntries.longValue();
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/4a133f57/giraph-core/src/main/java/org/apache/giraph/mapping/MappingEntry.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/mapping/MappingEntry.java b/giraph-core/src/main/java/org/apache/giraph/mapping/MappingEntry.java
new file mode 100644
index 0000000..8c8efa1
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/mapping/MappingEntry.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.mapping;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+/**
+ * An entry in MappingStore
+ *
+ * @param <I> vertexId type
+ * @param <B> mappingTarget type
+ */
+public class MappingEntry<I extends WritableComparable, B extends Writable> {
+  /** Vertex Id */
+  private I vertexId;
+  /** Mapping Target */
+  private B mappingTarget;
+
+  /**
+   * Constructor
+   *
+   * @param vertexId vertexId
+   * @param mappingTarget mappingTarget
+   */
+  public MappingEntry(I vertexId, B mappingTarget) {
+    this.vertexId = vertexId;
+    this.mappingTarget = mappingTarget;
+  }
+
+  public I getVertexId() {
+    return vertexId;
+  }
+
+  public B getMappingTarget() {
+    return mappingTarget;
+  }
+
+  public void setVertexId(I vertexId) {
+    this.vertexId = vertexId;
+  }
+
+  public void setMappingTarget(B mappingTarget) {
+    this.mappingTarget = mappingTarget;
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/4a133f57/giraph-core/src/main/java/org/apache/giraph/mapping/MappingStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/mapping/MappingStore.java b/giraph-core/src/main/java/org/apache/giraph/mapping/MappingStore.java
new file mode 100644
index 0000000..5b872a4
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/mapping/MappingStore.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.mapping;
+
+import org.apache.giraph.conf.ImmutableClassesGiraphConfigurable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+/**
+ * MappingStore used to store the vertexId - target map supplied by user
+ * Methods implemented in this class need to be thread safe
+ *
+ * @param <I> vertexId type
+ * @param <B> mappingTarget type
+ */
+public interface MappingStore<I extends WritableComparable, B extends Writable>
+  extends ImmutableClassesGiraphConfigurable<I, Writable, Writable> {
+
+  /**
+   * Must be called before anything else can be done
+   * on this instance
+   */
+  void initialize();
+
+  /**
+   * Add an entry to the mapping store
+   *
+   * @param vertexId vertexId
+   * @param target target
+   */
+  void addEntry(I vertexId, B target);
+
+  /**
+   * Get target for given vertexId
+   *
+   * @param vertexId vertexId
+   * @param target instance to use for storing target information
+   * @return target instance
+   */
+  B getTarget(I vertexId, B target);
+
+  /**
+   * Operations to perform after adding entries
+   * to the mapping store
+   */
+  void postFilling();
+
+  /**
+   * Get stats about the MappingStore
+   *
+   * @return numEntries
+   */
+  long getStats();
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/4a133f57/giraph-core/src/main/java/org/apache/giraph/mapping/MappingStoreOps.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/mapping/MappingStoreOps.java b/giraph-core/src/main/java/org/apache/giraph/mapping/MappingStoreOps.java
new file mode 100644
index 0000000..aba034e
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/mapping/MappingStoreOps.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.mapping;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+/**
+ * Interface of operations that can be done on mapping store
+ * once it is fully loaded
+ *
+ * @param <I> vertex id type
+ * @param <B> mapping target type
+ */
+public interface MappingStoreOps<I extends WritableComparable,
+  B extends Writable> {
+
+  /**
+   * Must be called before anything else can be done
+   * on this instance
+   * @param mappingStore mapping store instance to operate on
+   */
+  void initialize(MappingStore<I, B> mappingStore);
+
+  /**
+   * True if MappingStoreOps is based on embedding info
+   *
+   * @return true if worker info is embedded into vertex ids
+   */
+  boolean hasEmbedding();
+
+  /**
+   * Embed target information into vertexId
+   *
+   * @param id vertexId
+   */
+  void embedTargetInfo(I id);
+
+  /**
+   * Remove target information from vertexId
+   *
+   * @param id vertexId
+   */
+  void removeTargetInfo(I id);
+
+
+  /**
+   * Get partition id for a vertex id
+   *
+   * @param id vertexId
+   * @param partitionCount partitionCount
+   * @param workerCount workerCount
+   * @return partition of vertex id
+   */
+  int getPartition(I id, int partitionCount, int workerCount);
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/4a133f57/giraph-core/src/main/java/org/apache/giraph/mapping/package-info.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/mapping/package-info.java b/giraph-core/src/main/java/org/apache/giraph/mapping/package-info.java
new file mode 100644
index 0000000..ae23475
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/mapping/package-info.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Package contains definition and implementations of MappingStore and
+ * related concepts
+ */
+package org.apache.giraph.mapping;

http://git-wip-us.apache.org/repos/asf/giraph/blob/4a133f57/giraph-core/src/main/java/org/apache/giraph/mapping/translate/LongByteTranslateEdge.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/mapping/translate/LongByteTranslateEdge.java b/giraph-core/src/main/java/org/apache/giraph/mapping/translate/LongByteTranslateEdge.java
new file mode 100644
index 0000000..102ef50
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/mapping/translate/LongByteTranslateEdge.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.mapping.translate;
+
+import org.apache.giraph.conf.DefaultImmutableClassesGiraphConfigurable;
+import org.apache.giraph.worker.BspServiceWorker;
+import org.apache.giraph.worker.LocalData;
+import org.apache.hadoop.io.ByteWritable;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Writable;
+
+/**
+ * Basic implementation of Translate Edge
+ * where I = LongWritable & B = ByteWritable
+ *
+ * @param <E> edge value type
+ */
+@SuppressWarnings("unchecked")
+public class LongByteTranslateEdge<E extends Writable>
+  extends DefaultImmutableClassesGiraphConfigurable
+  implements TranslateEdge<LongWritable, E> {
+
+  /** Local data used for targetId translation of edge */
+  private LocalData<LongWritable,
+    ? extends Writable, E, ByteWritable> localData;
+
+  @Override
+  public void initialize(BspServiceWorker<LongWritable,
+    ? extends Writable, E> service) {
+    localData = (LocalData<LongWritable, ? extends Writable, E, ByteWritable>)
+      service.getLocalData();
+  }
+
+  @Override
+  public LongWritable translateId(LongWritable targetId) {
+    LongWritable translatedId = new LongWritable();
+    translatedId.set(targetId.get());
+    localData.getMappingStoreOps().embedTargetInfo(translatedId);
+    return translatedId;
+  }
+
+  @Override
+  public E cloneValue(E edgeValue) {
+    // If vertex input does not have create edges,
+    // then you can use LongByteTranslateEdge directly
+    throw new UnsupportedOperationException();
+  }
+
+  /**
+   * Correct implementation of cloneValue when edgevalue = nullwritable
+   */
+  public static class NoEdgeValue
+    extends LongByteTranslateEdge<NullWritable> {
+    @Override
+    public NullWritable cloneValue(NullWritable edgeValue) {
+      return NullWritable.get();
+    }
+  }
+
+  /**
+   * Correct implementation of cloneValue when edgevalue = intwritable
+   */
+  public static class IntEdgeValue
+      extends LongByteTranslateEdge<IntWritable> {
+    @Override
+    public IntWritable cloneValue(IntWritable edgeValue) {
+      return new IntWritable(edgeValue.get());
+    }
+  }
+
+  /**
+   * Correct implementation of cloneValue when edgevalue = longwritable
+   */
+  public static class LongEdgeValue
+    extends LongByteTranslateEdge<LongWritable> {
+    @Override
+    public LongWritable cloneValue(LongWritable edgeValue) {
+      return new LongWritable(edgeValue.get());
+    }
+  }
+
+  /**
+   * Correct implementation of cloneValue when edgevalue = floatwritable
+   */
+  public static class FloatEdgeValue
+    extends LongByteTranslateEdge<FloatWritable> {
+    @Override
+    public FloatWritable cloneValue(FloatWritable edgeValue) {
+      return new FloatWritable(edgeValue.get());
+    }
+  }
+
+  /**
+   * Correct implementation of cloneValue when edgevalue = doublewritable
+   */
+  public static class DoubleEdgeValue
+    extends LongByteTranslateEdge<DoubleWritable> {
+    @Override
+    public DoubleWritable cloneValue(DoubleWritable edgeValue) {
+      return new DoubleWritable(edgeValue.get());
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/4a133f57/giraph-core/src/main/java/org/apache/giraph/mapping/translate/TranslateEdge.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/mapping/translate/TranslateEdge.java b/giraph-core/src/main/java/org/apache/giraph/mapping/translate/TranslateEdge.java
new file mode 100644
index 0000000..85e8768
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/mapping/translate/TranslateEdge.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.mapping.translate;
+
+import org.apache.giraph.conf.ImmutableClassesGiraphConfigurable;
+import org.apache.giraph.worker.BspServiceWorker;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+/**
+ * Used to conduct expensive translation of edges
+ * during vertex input phase
+ *
+ * @param <I> vertexId type
+ * @param <E> edgeValue type
+ */
+public interface TranslateEdge<I extends WritableComparable, E extends Writable>
+  extends ImmutableClassesGiraphConfigurable {
+  /**
+   * Must be called before other methods can be used
+   *
+   * @param service bsp service worker
+   */
+  void initialize(BspServiceWorker<I, ? extends Writable, E> service);
+
+  /**
+   * Translate Id & return a new instance
+   *
+   * @param targetId edge target Id
+   * @return a new translated Id instance
+   */
+  I translateId(I targetId);
+
+  /**
+   * Clone edge value
+   *
+   * @param edgeValue edge value
+   * @return clone of edge value
+   */
+  E cloneValue(E edgeValue);
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/4a133f57/giraph-core/src/main/java/org/apache/giraph/mapping/translate/package-info.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/mapping/translate/package-info.java b/giraph-core/src/main/java/org/apache/giraph/mapping/translate/package-info.java
new file mode 100644
index 0000000..8536e83
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/mapping/translate/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Definitions & sample implementations of edge translation logic
+ */
+package org.apache.giraph.mapping.translate;

http://git-wip-us.apache.org/repos/asf/giraph/blob/4a133f57/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java b/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
index 90dc9f3..e367b94 100644
--- a/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
+++ b/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
@@ -40,6 +40,7 @@ import org.apache.giraph.graph.GraphState;
 import org.apache.giraph.io.EdgeInputFormat;
 import org.apache.giraph.io.GiraphInputFormat;
 import org.apache.giraph.graph.GraphTaskManager;
+import org.apache.giraph.io.MappingInputFormat;
 import org.apache.giraph.io.VertexInputFormat;
 import org.apache.giraph.partition.MasterGraphPartitioner;
 import org.apache.giraph.partition.PartitionOwner;
@@ -119,7 +120,7 @@ import static org.apache.giraph.conf.GiraphConstants.USE_INPUT_SPLIT_LOCALITY;
  * @param <V> Vertex data
  * @param <E> Edge data
  */
-@SuppressWarnings("rawtypes")
+@SuppressWarnings("rawtypes, unchecked")
 public class BspServiceMaster<I extends WritableComparable,
     V extends Writable, E extends Writable>
     extends BspService<I, V, E>
@@ -217,7 +218,7 @@ public class BspServiceMaster<I extends WritableComparable,
     observers = conf.createMasterObservers();
 
     GiraphMetrics.get().addSuperstepResetObserver(this);
-    GiraphStats.init(context);
+    GiraphStats.init((Mapper.Context) context);
   }
 
   @Override
@@ -676,6 +677,17 @@ public class BspServiceMaster<I extends WritableComparable,
   }
 
   @Override
+  public int createMappingInputSplits() {
+    if (!getConfiguration().hasMappingInputFormat()) {
+      return 0;
+    }
+    MappingInputFormat<I, V, E, ? extends Writable> mappingInputFormat =
+      getConfiguration().createWrappedMappingInputFormat();
+    return createInputSplits(mappingInputFormat, mappingInputSplitsPaths,
+      "Mapping");
+  }
+
+  @Override
   public int createVertexInputSplits() {
     // Short-circuit if there is no vertex input format
     if (!getConfiguration().hasVertexInputFormat()) {
@@ -1589,8 +1601,12 @@ public class BspServiceMaster<I extends WritableComparable,
 
     if (getSuperstep() == INPUT_SUPERSTEP) {
       // Initialize aggregators before coordinating
-      // vertex loading and edge loading
       initializeAggregatorInputSuperstep();
+      if (getConfiguration().hasMappingInputFormat()) {
+        coordinateInputSplits(mappingInputSplitsPaths, mappingInputSplitsEvents,
+            "Mapping");
+      }
+      // vertex loading and edge loading
       if (getConfiguration().hasVertexInputFormat()) {
         coordinateInputSplits(vertexInputSplitsPaths, vertexInputSplitsEvents,
             "Vertex");


Mime
View raw message