giraph-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From pava...@apache.org
Subject [2/3] GIRAPH-908: support for partitioned input in giraph (pavanka)
Date Sun, 08 Jun 2014 18:28:50 GMT
http://git-wip-us.apache.org/repos/asf/giraph/blob/4a133f57/giraph-core/src/main/java/org/apache/giraph/master/MasterThread.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/master/MasterThread.java b/giraph-core/src/main/java/org/apache/giraph/master/MasterThread.java
index 15dbe07..0635210 100644
--- a/giraph-core/src/main/java/org/apache/giraph/master/MasterThread.java
+++ b/giraph-core/src/main/java/org/apache/giraph/master/MasterThread.java
@@ -107,7 +107,8 @@ public class MasterThread<I extends WritableComparable, V extends Writable,
         // Attempt to create InputSplits if necessary. Bail out if that fails.
         if (bspServiceMaster.getRestartedSuperstep() !=
             BspService.UNSET_SUPERSTEP ||
-            (bspServiceMaster.createVertexInputSplits() != -1 &&
+            (bspServiceMaster.createMappingInputSplits() != -1 &&
+                bspServiceMaster.createVertexInputSplits() != -1 &&
                 bspServiceMaster.createEdgeInputSplits() != -1)) {
           long setupMillis = System.currentTimeMillis() - initializeMillis;
           GiraphTimers.getInstance().getSetupMs().increment(setupMillis);
@@ -123,7 +124,7 @@ public class MasterThread<I extends WritableComparable, V extends Writable,
             superstepState = bspServiceMaster.coordinateSuperstep();
             long superstepMillis = System.currentTimeMillis() -
                 startSuperstepMillis;
-            superstepSecsMap.put(Long.valueOf(cachedSuperstep),
+            superstepSecsMap.put(cachedSuperstep,
                 superstepMillis / 1000.0d);
             if (LOG.isInfoEnabled()) {
               LOG.info("masterThread: Coordination of superstep " +

http://git-wip-us.apache.org/repos/asf/giraph/blob/4a133f57/giraph-core/src/main/java/org/apache/giraph/partition/GraphPartitionerFactory.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/GraphPartitionerFactory.java b/giraph-core/src/main/java/org/apache/giraph/partition/GraphPartitionerFactory.java
index 4200d79..c5e2f3e 100644
--- a/giraph-core/src/main/java/org/apache/giraph/partition/GraphPartitionerFactory.java
+++ b/giraph-core/src/main/java/org/apache/giraph/partition/GraphPartitionerFactory.java
@@ -19,6 +19,7 @@
 package org.apache.giraph.partition;
 
 import org.apache.giraph.conf.ImmutableClassesGiraphConfigurable;
+import org.apache.giraph.worker.LocalData;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 
@@ -32,7 +33,14 @@ import org.apache.hadoop.io.WritableComparable;
 @SuppressWarnings("rawtypes")
 public interface GraphPartitionerFactory<I extends WritableComparable,
     V extends Writable, E extends Writable> extends
-    ImmutableClassesGiraphConfigurable {
+    ImmutableClassesGiraphConfigurable<I, V, E> {
+
+  /**
+   * Use some local data present in the worker
+   *
+   * @param localData localData present in the worker
+   */
+  void initialize(LocalData<I, V, E, ? extends Writable> localData);
   /**
    * Create the {@link MasterGraphPartitioner} used by the master.
    * Instantiated once by the master and reused.

http://git-wip-us.apache.org/repos/asf/giraph/blob/4a133f57/giraph-core/src/main/java/org/apache/giraph/partition/HashPartitionerFactory.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/HashPartitionerFactory.java b/giraph-core/src/main/java/org/apache/giraph/partition/HashPartitionerFactory.java
index 7cc5651..221e50d 100644
--- a/giraph-core/src/main/java/org/apache/giraph/partition/HashPartitionerFactory.java
+++ b/giraph-core/src/main/java/org/apache/giraph/partition/HashPartitionerFactory.java
@@ -18,7 +18,8 @@
 
 package org.apache.giraph.partition;
 
-import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.conf.DefaultImmutableClassesGiraphConfigurable;
+import org.apache.giraph.worker.LocalData;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 
@@ -32,10 +33,13 @@ import org.apache.hadoop.io.WritableComparable;
  */
 @SuppressWarnings("rawtypes")
 public class HashPartitionerFactory<I extends WritableComparable,
-    V extends Writable, E extends Writable>
-    implements GraphPartitionerFactory<I, V, E> {
-  /** Saved configuration */
-  private ImmutableClassesGiraphConfiguration conf;
+  V extends Writable, E extends Writable>
+  extends DefaultImmutableClassesGiraphConfigurable<I, V, E>
+  implements GraphPartitionerFactory<I, V, E>  {
+
+  @Override
+  public void initialize(LocalData<I, V, E, ? extends Writable> localData) {
+  }
 
   @Override
   public MasterGraphPartitioner<I, V, E> createMasterGraphPartitioner() {
@@ -46,14 +50,4 @@ public class HashPartitionerFactory<I extends WritableComparable,
   public WorkerGraphPartitioner<I, V, E> createWorkerGraphPartitioner() {
     return new HashWorkerPartitioner<I, V, E>();
   }
-
-  @Override
-  public ImmutableClassesGiraphConfiguration getConf() {
-    return conf;
-  }
-
-  @Override
-  public void setConf(ImmutableClassesGiraphConfiguration conf) {
-    this.conf = conf;
-  }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/4a133f57/giraph-core/src/main/java/org/apache/giraph/partition/HashRangePartitionerFactory.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/HashRangePartitionerFactory.java b/giraph-core/src/main/java/org/apache/giraph/partition/HashRangePartitionerFactory.java
index 1eeece7..5f7ee40 100644
--- a/giraph-core/src/main/java/org/apache/giraph/partition/HashRangePartitionerFactory.java
+++ b/giraph-core/src/main/java/org/apache/giraph/partition/HashRangePartitionerFactory.java
@@ -18,7 +18,8 @@
 
 package org.apache.giraph.partition;
 
-import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.conf.DefaultImmutableClassesGiraphConfigurable;
+import org.apache.giraph.worker.LocalData;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 
@@ -32,10 +33,13 @@ import org.apache.hadoop.io.WritableComparable;
  */
 @SuppressWarnings("rawtypes")
 public class HashRangePartitionerFactory<I extends WritableComparable,
-    V extends Writable, E extends Writable>
-    implements GraphPartitionerFactory<I, V, E> {
-  /** Saved configuration */
-  private ImmutableClassesGiraphConfiguration<I, V, E> conf;
+  V extends Writable, E extends Writable>
+  extends DefaultImmutableClassesGiraphConfigurable<I, V, E>
+  implements GraphPartitionerFactory<I, V, E> {
+
+  @Override
+  public void initialize(LocalData<I, V, E, ? extends Writable> localData) {
+  }
 
   @Override
   public MasterGraphPartitioner<I, V, E> createMasterGraphPartitioner() {
@@ -46,14 +50,4 @@ public class HashRangePartitionerFactory<I extends WritableComparable,
   public WorkerGraphPartitioner<I, V, E> createWorkerGraphPartitioner() {
     return new HashRangeWorkerPartitioner<I, V, E>();
   }
-
-  @Override
-  public ImmutableClassesGiraphConfiguration getConf() {
-    return conf;
-  }
-
-  @Override
-  public void setConf(ImmutableClassesGiraphConfiguration conf) {
-    this.conf = conf;
-  }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/4a133f57/giraph-core/src/main/java/org/apache/giraph/partition/LongMappingStorePartitionerFactory.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/LongMappingStorePartitionerFactory.java b/giraph-core/src/main/java/org/apache/giraph/partition/LongMappingStorePartitionerFactory.java
new file mode 100644
index 0000000..e129050
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/partition/LongMappingStorePartitionerFactory.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.partition;
+
+import org.apache.giraph.worker.LocalData;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.log4j.Logger;
+
+/**
+ * Factory for long-byte mapping based partitioners.
+ *
+ * @param <V> vertexValue type
+ * @param <E> edgeValue type
+ */
+@SuppressWarnings("unchecked")
+public class LongMappingStorePartitionerFactory<V extends Writable,
+    E extends Writable> extends SimplePartitionerFactory<LongWritable, V, E> {
+  /** Logger Instance */
+  private static final Logger LOG = Logger.getLogger(
+      LongMappingStorePartitionerFactory.class);
+  /** Local Data that supplies the mapping store */
+  protected LocalData<LongWritable, V, E, ? extends Writable> localData = null;
+
+  @Override
+  public void initialize(LocalData<LongWritable, V, E,
+    ? extends Writable> localData) {
+    this.localData = localData;
+    LOG.info("Initializing LongMappingStorePartitionerFactory with localData");
+  }
+
+  @Override
+  protected int getPartition(LongWritable id, int partitionCount,
+    int workerCount) {
+    return localData.getMappingStoreOps().getPartition(id,
+        partitionCount, workerCount);
+  }
+
+  @Override
+  protected int getWorker(int partition, int partitionCount, int workerCount) {
+    int numRows = partitionCount / workerCount;
+    numRows = (numRows * workerCount == partitionCount) ? numRows : numRows + 1;
+    return partition / numRows;
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/4a133f57/giraph-core/src/main/java/org/apache/giraph/partition/SimpleIntRangePartitionerFactory.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/SimpleIntRangePartitionerFactory.java b/giraph-core/src/main/java/org/apache/giraph/partition/SimpleIntRangePartitionerFactory.java
index 8ab692f..5dd580b 100644
--- a/giraph-core/src/main/java/org/apache/giraph/partition/SimpleIntRangePartitionerFactory.java
+++ b/giraph-core/src/main/java/org/apache/giraph/partition/SimpleIntRangePartitionerFactory.java
@@ -31,14 +31,26 @@ import org.apache.hadoop.io.Writable;
  * @param <V> Vertex value type
  * @param <E> Edge value type
  */
-public class SimpleIntRangePartitionerFactory
-    <V extends Writable, E extends Writable>
-    extends SimplePartitionerFactory<IntWritable, V, E> {
+public class SimpleIntRangePartitionerFactory<V extends Writable,
+  E extends Writable> extends SimplePartitionerFactory<IntWritable, V, E> {
 
   /** Vertex key space size. */
   private int keySpaceSize;
 
   @Override
+  protected int getPartition(IntWritable id, int partitionCount,
+    int workerCount) {
+    return getPartition(id, partitionCount);
+  }
+
+  /**
+   * Calculates in which partition current vertex belongs to,
+   * from interval [0, partitionCount).
+   *
+   * @param id Vertex id
+   * @param partitionCount Number of partitions
+   * @return partition
+   */
   protected int getPartition(IntWritable id, int partitionCount) {
     return getPartitionInRange(id.get(), keySpaceSize, partitionCount);
   }

http://git-wip-us.apache.org/repos/asf/giraph/blob/4a133f57/giraph-core/src/main/java/org/apache/giraph/partition/SimpleLongRangePartitionerFactory.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/SimpleLongRangePartitionerFactory.java b/giraph-core/src/main/java/org/apache/giraph/partition/SimpleLongRangePartitionerFactory.java
index 2989598..e637e16 100644
--- a/giraph-core/src/main/java/org/apache/giraph/partition/SimpleLongRangePartitionerFactory.java
+++ b/giraph-core/src/main/java/org/apache/giraph/partition/SimpleLongRangePartitionerFactory.java
@@ -31,14 +31,26 @@ import org.apache.hadoop.io.Writable;
  * @param <V> Vertex value type
  * @param <E> Edge value type
  */
-public class SimpleLongRangePartitionerFactory
-    <V extends Writable, E extends Writable>
-    extends SimplePartitionerFactory<LongWritable, V, E> {
+public class SimpleLongRangePartitionerFactory<V extends Writable,
+  E extends Writable> extends SimplePartitionerFactory<LongWritable, V, E> {
 
   /** Vertex key space size. */
   private long keySpaceSize;
 
   @Override
+  protected int getPartition(LongWritable id, int partitionCount,
+    int workerCount) {
+    return getPartition(id, partitionCount);
+  }
+
+  /**
+   * Calculates in which partition current vertex belongs to,
+   * from interval [0, partitionCount).
+   *
+   * @param id Vertex id
+   * @param partitionCount Number of partitions
+   * @return partition
+   */
   protected int getPartition(LongWritable id, int partitionCount) {
     return getPartitionInRange(id.get(), keySpaceSize, partitionCount);
   }

http://git-wip-us.apache.org/repos/asf/giraph/blob/4a133f57/giraph-core/src/main/java/org/apache/giraph/partition/SimplePartitionerFactory.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/SimplePartitionerFactory.java b/giraph-core/src/main/java/org/apache/giraph/partition/SimplePartitionerFactory.java
index 15b0756..1e29846 100644
--- a/giraph-core/src/main/java/org/apache/giraph/partition/SimplePartitionerFactory.java
+++ b/giraph-core/src/main/java/org/apache/giraph/partition/SimplePartitionerFactory.java
@@ -18,7 +18,8 @@
 
 package org.apache.giraph.partition;
 
-import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.conf.DefaultImmutableClassesGiraphConfigurable;
+import org.apache.giraph.worker.LocalData;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 
@@ -33,14 +34,17 @@ import org.apache.hadoop.io.WritableComparable;
  * @param <E> Edge value
  */
 public abstract class SimplePartitionerFactory<I extends WritableComparable,
-    V extends Writable, E extends Writable>
-    implements GraphPartitionerFactory<I, V, E> {
-  /** Configuration. */
-  private ImmutableClassesGiraphConfiguration conf;
+  V extends Writable, E extends Writable>
+  extends DefaultImmutableClassesGiraphConfigurable<I, V, E>
+  implements GraphPartitionerFactory<I, V, E> {
+
+  @Override
+  public void initialize(LocalData<I, V, E, ? extends Writable> localData) {
+  }
 
   @Override
   public final MasterGraphPartitioner<I, V, E> createMasterGraphPartitioner() {
-    return new SimpleMasterPartitioner<I, V, E>(conf) {
+    return new SimpleMasterPartitioner<I, V, E>(getConf()) {
       @Override
       protected int getWorkerIndex(int partition, int partitionCount,
           int workerCount) {
@@ -54,31 +58,26 @@ public abstract class SimplePartitionerFactory<I extends WritableComparable,
   public final WorkerGraphPartitioner<I, V, E> createWorkerGraphPartitioner() {
     return new SimpleWorkerPartitioner<I, V, E>() {
       @Override
-      protected int getPartitionIndex(I id, int partitionCount) {
-        return SimplePartitionerFactory.this.getPartition(id, partitionCount);
+      protected int getPartitionIndex(I id, int partitionCount,
+        int workerCount) {
+        return SimplePartitionerFactory.this.getPartition(id,
+            partitionCount, workerCount);
       }
     };
   }
 
-  @Override
-  public void setConf(ImmutableClassesGiraphConfiguration conf) {
-    this.conf = conf;
-  }
-
-  @Override
-  public final ImmutableClassesGiraphConfiguration getConf() {
-    return conf;
-  }
-
   /**
    * Calculates in which partition current vertex belongs to,
    * from interval [0, partitionCount).
    *
    * @param id Vertex id
    * @param partitionCount Number of partitions
+   * @param workerCount Number of workers
    * @return partition
    */
-  protected abstract int getPartition(I id, int partitionCount);
+  protected abstract int getPartition(I id, int partitionCount,
+    int workerCount);
+
   /**
    * Calculates worker that should be responsible for passed partition.
    *

http://git-wip-us.apache.org/repos/asf/giraph/blob/4a133f57/giraph-core/src/main/java/org/apache/giraph/partition/SimpleWorkerPartitioner.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/SimpleWorkerPartitioner.java b/giraph-core/src/main/java/org/apache/giraph/partition/SimpleWorkerPartitioner.java
index 600d7a3..3c0de44 100644
--- a/giraph-core/src/main/java/org/apache/giraph/partition/SimpleWorkerPartitioner.java
+++ b/giraph-core/src/main/java/org/apache/giraph/partition/SimpleWorkerPartitioner.java
@@ -19,13 +19,16 @@
 package org.apache.giraph.partition;
 
 import java.util.Collection;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Set;
 
 import org.apache.giraph.worker.WorkerInfo;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 
 import com.google.common.collect.Lists;
+import org.apache.log4j.Logger;
 
 /**
  * Abstracts and implements all WorkerGraphPartitioner logic on top of a single
@@ -38,8 +41,13 @@ import com.google.common.collect.Lists;
 public abstract class SimpleWorkerPartitioner<I extends WritableComparable,
     V extends Writable, E extends Writable>
     implements WorkerGraphPartitioner<I, V, E> {
+  /** Logger instance */
+  private static final Logger LOG = Logger.getLogger(
+      SimpleWorkerPartitioner.class);
   /** List of {@link PartitionOwner}s for this worker. */
   private List<PartitionOwner> partitionOwnerList = Lists.newArrayList();
+  /** List of available workers */
+  private Set<WorkerInfo> availableWorkers = new HashSet<>();
 
   @Override
   public PartitionOwner createPartitionOwner() {
@@ -49,7 +57,8 @@ public abstract class SimpleWorkerPartitioner<I extends WritableComparable,
   @Override
   public PartitionOwner getPartitionOwner(I vertexId) {
     return partitionOwnerList.get(
-        getPartitionIndex(vertexId, partitionOwnerList.size()));
+        getPartitionIndex(vertexId, partitionOwnerList.size(),
+            availableWorkers.size()));
   }
 
   @Override
@@ -64,8 +73,11 @@ public abstract class SimpleWorkerPartitioner<I extends WritableComparable,
   public PartitionExchange updatePartitionOwners(WorkerInfo myWorkerInfo,
       Collection<? extends PartitionOwner> masterSetPartitionOwners,
       PartitionStore<I, V, E> partitionStore) {
-    return PartitionBalancer.updatePartitionOwners(partitionOwnerList,
-        myWorkerInfo, masterSetPartitionOwners, partitionStore);
+    PartitionExchange exchange = PartitionBalancer.updatePartitionOwners(
+        partitionOwnerList, myWorkerInfo, masterSetPartitionOwners,
+        partitionStore);
+    extractAvailableWorkers();
+    return exchange;
   }
 
   @Override
@@ -74,12 +86,26 @@ public abstract class SimpleWorkerPartitioner<I extends WritableComparable,
   }
 
   /**
+   * Update availableWorkers
+   */
+  public void extractAvailableWorkers() {
+    availableWorkers.clear();
+    for (PartitionOwner partitionOwner : partitionOwnerList) {
+      availableWorkers.add(partitionOwner.getWorkerInfo());
+    }
+    LOG.info("After updating partitionOwnerList " + availableWorkers.size() +
+        " workers are available");
+  }
+
+  /**
    * Calculates in which partition current vertex belongs to,
    * from interval [0, partitionCount).
    *
    * @param id Vertex id
    * @param partitionCount Number of partitions
+   * @param workerCount Number of active workers
    * @return partition
    */
-  protected abstract int getPartitionIndex(I id, int partitionCount);
+  protected abstract int getPartitionIndex(I id, int partitionCount,
+    int workerCount);
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/4a133f57/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java b/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
index aff7084..104932c 100644
--- a/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
@@ -46,6 +46,7 @@ import org.apache.giraph.io.EdgeWriter;
 import org.apache.giraph.io.VertexOutputFormat;
 import org.apache.giraph.io.VertexWriter;
 import org.apache.giraph.io.superstep_output.SuperstepOutput;
+import org.apache.giraph.mapping.translate.TranslateEdge;
 import org.apache.giraph.master.MasterInfo;
 import org.apache.giraph.master.SuperstepClasses;
 import org.apache.giraph.metrics.GiraphMetrics;
@@ -133,7 +134,10 @@ public class BspServiceWorker<I extends WritableComparable,
   private final WorkerInfo workerInfo;
   /** Worker graph partitioner */
   private final WorkerGraphPartitioner<I, V, E> workerGraphPartitioner;
-
+  /** Local Data for each worker */
+  private final LocalData<I, V, E, ? extends Writable> localData;
+  /** Used to translate Edges during vertex input phase based on localData */
+  private final TranslateEdge<I, E> translateEdge;
   /** IPC Client */
   private final WorkerClient<I, V, E> workerClient;
   /** IPC Server */
@@ -182,6 +186,11 @@ public class BspServiceWorker<I extends WritableComparable,
     throws IOException, InterruptedException {
     super(context, graphTaskManager);
     ImmutableClassesGiraphConfiguration<I, V, E> conf = getConfiguration();
+    localData = new LocalData<>(conf);
+    translateEdge = getConfiguration().edgeTranslationInstance();
+    if (translateEdge != null) {
+      translateEdge.initialize(this);
+    }
     partitionExchangeChildrenChanged = new PredicateLock(context);
     registerBspEvent(partitionExchangeChildrenChanged);
     workerGraphPartitioner =
@@ -237,6 +246,14 @@ public class BspServiceWorker<I extends WritableComparable,
     return workerClient;
   }
 
+  public LocalData<I, V, E, ? extends Writable> getLocalData() {
+    return localData;
+  }
+
+  public TranslateEdge<I, E> getTranslateEdge() {
+    return translateEdge;
+  }
+
   /**
    * Intended to check the health of the node.  For instance, can it ssh,
    * dmesg, etc. For now, does nothing.
@@ -293,6 +310,55 @@ public class BspServiceWorker<I extends WritableComparable,
     return vertexEdgeCount;
   }
 
+  /**
+   * Load the mapping entries from the user-defined
+   * {@link org.apache.giraph.io.MappingReader}
+   *
+   * @return Count of mapping entries loaded
+   */
+  private Integer loadMapping() throws KeeperException,
+    InterruptedException {
+    List<String> inputSplitPathList =
+        getZkExt().getChildrenExt(mappingInputSplitsPaths.getPath(),
+        false, false, true);
+
+    InputSplitPathOrganizer splitOrganizer =
+        new InputSplitPathOrganizer(getZkExt(),
+            inputSplitPathList, getWorkerInfo().getHostname(),
+            getConfiguration().useInputSplitLocality());
+
+    MappingInputSplitsCallableFactory<I, V, E, ? extends Writable>
+        mappingInputSplitsCallableFactory =
+        new MappingInputSplitsCallableFactory<>(
+            getConfiguration().createWrappedMappingInputFormat(),
+            splitOrganizer,
+            getContext(),
+            getConfiguration(),
+            this,
+            getZkExt());
+
+    int entriesLoaded = 0;
+    // Determine how many threads to use based on the number of input splits
+    int maxInputSplitThreads = inputSplitPathList.size();
+    int numThreads = Math.min(getConfiguration().getNumInputSplitsThreads(),
+        maxInputSplitThreads);
+    if (LOG.isInfoEnabled()) {
+      LOG.info("loadInputSplits: Using " + numThreads + " thread(s), " +
+          "originally " + getConfiguration().getNumInputSplitsThreads() +
+          " threads(s) for " + inputSplitPathList.size() + " total splits.");
+    }
+
+    List<Integer> results =
+        ProgressableUtils.getResultsWithNCallables(
+            mappingInputSplitsCallableFactory,
+            numThreads, "load-mapping-%d", getContext());
+    for (Integer result : results) {
+      entriesLoaded += result;
+    }
+    // after all threads finish loading - call postFilling
+    localData.getMappingStore().postFilling();
+    return entriesLoaded;
+  }
 
   /**
    * Load the vertices from the user-defined
@@ -403,13 +469,15 @@ public class BspServiceWorker<I extends WritableComparable,
   }
 
   /**
-   * Wait for all workers to finish processing input splits.
+   * Mark current worker as done and then wait for all workers
+   * to finish processing input splits.
    *
    * @param inputSplitPaths Input split paths
    * @param inputSplitEvents Input split events
    */
-  private void waitForOtherWorkers(InputSplitPaths inputSplitPaths,
-                                   InputSplitEvents inputSplitEvents) {
+  private void markCurrentWorkerDoneThenWaitForOthers(
+    InputSplitPaths inputSplitPaths,
+    InputSplitEvents inputSplitEvents) {
     String workerInputSplitsDonePath =
         inputSplitPaths.getDonePath() + "/" +
             getWorkerInfo().getHostnameId();
@@ -420,10 +488,12 @@ public class BspServiceWorker<I extends WritableComparable,
           CreateMode.PERSISTENT,
           true);
     } catch (KeeperException e) {
-      throw new IllegalStateException("waitForOtherWorkers: " +
+      throw new IllegalStateException(
+          "markCurrentWorkerDoneThenWaitForOthers: " +
           "KeeperException creating worker done splits", e);
     } catch (InterruptedException e) {
-      throw new IllegalStateException("waitForOtherWorkers: " +
+      throw new IllegalStateException(
+          "markCurrentWorkerDoneThenWaitForOthers: " +
           "InterruptedException creating worker done splits", e);
     }
     while (true) {
@@ -433,10 +503,12 @@ public class BspServiceWorker<I extends WritableComparable,
             getZkExt().exists(inputSplitPaths.getAllDonePath(),
                 true);
       } catch (KeeperException e) {
-        throw new IllegalStateException("waitForOtherWorkers: " +
+        throw new IllegalStateException(
+            "markCurrentWorkerDoneThenWaitForOthers: " +
             "KeeperException waiting on worker done splits", e);
       } catch (InterruptedException e) {
-        throw new IllegalStateException("waitForOtherWorkers: " +
+        throw new IllegalStateException(
+            "markCurrentWorkerDoneThenWaitForOthers: " +
             "InterruptedException waiting on worker done splits", e);
       }
       if (inputSplitsDoneStat != null) {
@@ -501,6 +573,37 @@ public class BspServiceWorker<I extends WritableComparable,
     aggregatorHandler.prepareSuperstep(workerAggregatorRequestProcessor);
 
     VertexEdgeCount vertexEdgeCount;
+    int entriesLoaded = 0;
+
+    if (getConfiguration().hasMappingInputFormat()) {
+      // Ensure the mapping InputSplits are ready for processing
+      ensureInputSplitsReady(mappingInputSplitsPaths, mappingInputSplitsEvents);
+      getContext().progress();
+      try {
+        entriesLoaded = loadMapping();
+        // successfully loaded mapping
+        // now initialize graphPartitionerFactory with this data
+        getGraphPartitionerFactory().initialize(localData);
+      } catch (InterruptedException e) {
+        throw new IllegalStateException(
+            "setup: loadMapping failed with InterruptedException", e);
+      } catch (KeeperException e) {
+        throw new IllegalStateException(
+            "setup: loadMapping failed with KeeperException", e);
+      }
+      getContext().progress();
+      if (LOG.isInfoEnabled()) {
+        LOG.info("setup: Finally loaded a total of " +
+            entriesLoaded + " entries from inputSplits");
+      }
+
+      // Workers wait for each other to finish, coordinated by master
+      markCurrentWorkerDoneThenWaitForOthers(mappingInputSplitsPaths,
+          mappingInputSplitsEvents);
+      // Print stats for data stored in localData once mapping is fully
+      // loaded on all the workers
+      localData.printStats();
+    }
 
     if (getConfiguration().hasVertexInputFormat()) {
       // Ensure the vertex InputSplits are ready for processing
@@ -544,12 +647,14 @@ public class BspServiceWorker<I extends WritableComparable,
 
     if (getConfiguration().hasVertexInputFormat()) {
       // Workers wait for each other to finish, coordinated by master
-      waitForOtherWorkers(vertexInputSplitsPaths, vertexInputSplitsEvents);
+      markCurrentWorkerDoneThenWaitForOthers(vertexInputSplitsPaths,
+          vertexInputSplitsEvents);
     }
 
     if (getConfiguration().hasEdgeInputFormat()) {
       // Workers wait for each other to finish, coordinated by master
-      waitForOtherWorkers(edgeInputSplitsPaths, edgeInputSplitsEvents);
+      markCurrentWorkerDoneThenWaitForOthers(edgeInputSplitsPaths,
+          edgeInputSplitsEvents);
     }
 
     // Create remaining partitions owned by this worker.
@@ -569,6 +674,8 @@ public class BspServiceWorker<I extends WritableComparable,
       getServerData().getEdgeStore().moveEdgesToVertices();
     }
 
+    localData.removeMappingStoreIfPossible();
+
     // Generate the partition stats for the input superstep and process
     // if necessary
     List<PartitionStats> partitionStatsList =
@@ -726,6 +833,17 @@ public class BspServiceWorker<I extends WritableComparable,
         getGraphTaskManager().getGraphFunctions().toString() +
         " - Attempt=" + getApplicationAttempt() +
         ", Superstep=" + getSuperstep());
+
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("startSuperstep: addressesAndPartitions" +
+          addressesAndPartitions.getWorkerInfos());
+      for (PartitionOwner partitionOwner : addressesAndPartitions
+          .getPartitionOwners()) {
+        LOG.debug(partitionOwner.getPartitionId() + " " +
+            partitionOwner.getWorkerInfo());
+      }
+    }
+
     return addressesAndPartitions.getPartitionOwners();
   }
 

http://git-wip-us.apache.org/repos/asf/giraph/blob/4a133f57/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallable.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallable.java b/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallable.java
index 828eac4..35ad94b 100644
--- a/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallable.java
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallable.java
@@ -48,6 +48,7 @@ import java.io.IOException;
  * @param <V> Vertex value
  * @param <E> Edge value
  */
+@SuppressWarnings("unchecked")
 public class EdgeInputSplitsCallable<I extends WritableComparable,
     V extends Writable, E extends Writable>
     extends InputSplitsCallable<I, V, E> {
@@ -62,10 +63,14 @@ public class EdgeInputSplitsCallable<I extends WritableComparable,
 
   /** Aggregator handler */
   private final WorkerThreadAggregatorUsage aggregatorUsage;
+  /** Bsp service worker (only use thread-safe methods) */
+  private final BspServiceWorker<I, V, E> bspServiceWorker;
   /** Edge input format */
   private final EdgeInputFormat<I, E> edgeInputFormat;
   /** Input split max edges (-1 denotes all) */
   private final long inputSplitMaxEdges;
+  /** Can embedInfo in vertexIds */
+  private final boolean canEmbedInIds;
 
   /** Filter to use */
   private final EdgeInputFilter<I, E> edgeInputFilter;
@@ -97,11 +102,19 @@ public class EdgeInputSplitsCallable<I extends WritableComparable,
         zooKeeperExt);
     this.edgeInputFormat = edgeInputFormat;
 
+    this.bspServiceWorker = bspServiceWorker;
     inputSplitMaxEdges = configuration.getInputSplitMaxEdges();
     // Initialize aggregator usage.
     this.aggregatorUsage = bspServiceWorker.getAggregatorHandler()
       .newThreadAggregatorUsage();
     edgeInputFilter = configuration.getEdgeInputFilter();
+    canEmbedInIds = bspServiceWorker
+        .getLocalData()
+        .getMappingStoreOps() != null &&
+        bspServiceWorker
+            .getLocalData()
+            .getMappingStoreOps()
+            .hasEmbedding();
 
     // Initialize Metrics
     totalEdgesMeter = getTotalEdgesLoadedMeter();
@@ -157,6 +170,16 @@ public class EdgeInputSplitsCallable<I extends WritableComparable,
             "readInputSplit: Edge reader returned an edge " +
                 "without a value!  - " + readerEdge);
       }
+      if (canEmbedInIds) {
+        bspServiceWorker
+            .getLocalData()
+            .getMappingStoreOps()
+            .embedTargetInfo(sourceId);
+        bspServiceWorker
+            .getLocalData()
+            .getMappingStoreOps()
+            .embedTargetInfo(readerEdge.getTargetVertexId());
+      }
 
       ++inputSplitEdgesLoaded;
 

http://git-wip-us.apache.org/repos/asf/giraph/blob/4a133f57/giraph-core/src/main/java/org/apache/giraph/worker/FullInputSplitCallable.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/FullInputSplitCallable.java b/giraph-core/src/main/java/org/apache/giraph/worker/FullInputSplitCallable.java
new file mode 100644
index 0000000..4e93ce0
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/FullInputSplitCallable.java
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.worker;
+
+import com.google.common.collect.Lists;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.io.GiraphInputFormat;
+import org.apache.giraph.time.SystemTime;
+import org.apache.giraph.time.Time;
+import org.apache.giraph.time.Times;
+import org.apache.giraph.zk.ZooKeeperExt;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.KeeperException;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * InputSplitCallable to read all the splits
+ *
+ * @param <I> vertexId type
+ * @param <V> vertexValue type
+ * @param <E> edgeValue type
+ */
+public abstract class FullInputSplitCallable<I extends WritableComparable,
+  V extends Writable, E extends Writable>
+  implements Callable<Integer> {
+  /** Class logger */
+  private static final Logger LOG = Logger.getLogger(
+    FullInputSplitCallable.class);
+  /** Class time object */
+  private static final Time TIME = SystemTime.get();
+  /** Configuration */
+  protected final ImmutableClassesGiraphConfiguration<I, V, E> configuration;
+  /** Context */
+  protected final Mapper<?, ?, ?, ?>.Context context;
+
+  /** The List of InputSplit znode paths */
+  private final List<String> pathList;
+  /** Current position in the path list */
+  private final AtomicInteger currentIndex;
+  /** ZooKeeperExt handle */
+  private final ZooKeeperExt zooKeeperExt;
+  /** Get the start time in nanos */
+  private final long startNanos = TIME.getNanoseconds();
+
+  // CHECKSTYLE: stop ParameterNumberCheck
+  /**
+   * Constructor.
+
+   * @param splitOrganizer Input splits organizer
+   * @param context Context
+   * @param configuration Configuration
+   * @param zooKeeperExt Handle to ZooKeeperExt
+   * @param currentIndex Atomic Integer to get splitPath from list
+   */
+  public FullInputSplitCallable(InputSplitPathOrganizer splitOrganizer,
+      Mapper<?, ?, ?, ?>.Context context,
+      ImmutableClassesGiraphConfiguration<I, V, E> configuration,
+      ZooKeeperExt zooKeeperExt,
+      AtomicInteger currentIndex) {
+    this.pathList = Lists.newArrayList(splitOrganizer.getPathList());
+    this.currentIndex = currentIndex;
+    this.zooKeeperExt = zooKeeperExt;
+    this.context = context;
+    this.configuration = configuration;
+  }
+  // CHECKSTYLE: resume ParameterNumberCheck
+
+  /**
+   * Get input format
+   *
+   * @return Input format
+   */
+  public abstract GiraphInputFormat getInputFormat();
+
+  /**
+   * Load mapping entries from all the given input splits
+   *
+   * @param inputSplit Input split to load
+   * @return Count of vertices and edges loaded
+   * @throws java.io.IOException
+   * @throws InterruptedException
+   */
+  protected abstract Integer readInputSplit(InputSplit inputSplit)
+    throws IOException, InterruptedException;
+
+  @Override
+  public Integer call() {
+    int entries = 0;
+    String inputSplitPath;
+    int inputSplitsProcessed = 0;
+    try {
+      while (true) {
+        int pos = currentIndex.getAndIncrement();
+        if (pos >= pathList.size()) {
+          break;
+        }
+        inputSplitPath = pathList.get(pos);
+        entries += loadInputSplit(inputSplitPath);
+        context.progress();
+        ++inputSplitsProcessed;
+      }
+    } catch (InterruptedException e) {
+      throw new IllegalStateException("call: InterruptedException", e);
+    } catch (IOException e) {
+      throw new IllegalStateException("call: IOException", e);
+    } catch (ClassNotFoundException e) {
+      throw new IllegalStateException("call: ClassNotFoundException", e);
+    } catch (InstantiationException e) {
+      throw new IllegalStateException("call: InstantiationException", e);
+    } catch (IllegalAccessException e) {
+      throw new IllegalStateException("call: IllegalAccessException", e);
+    }
+
+    if (LOG.isInfoEnabled()) {
+      float seconds = Times.getNanosSince(TIME, startNanos) /
+          Time.NS_PER_SECOND_AS_FLOAT;
+      float entriesPerSecond = entries / seconds;
+      LOG.info("call: Loaded " + inputSplitsProcessed + " " +
+          "input splits in " + seconds + " secs, " + entries +
+          " " + entriesPerSecond + " entries/sec");
+    }
+    return entries;
+  }
+
+  /**
+   * Extract entries from input split, saving them into mapping store.
+   * Mark the input split finished when done.
+   *
+   * @param inputSplitPath ZK location of input split
+   * @return Number of entries read in this input split
+   * @throws IOException
+   * @throws ClassNotFoundException
+   * @throws InterruptedException
+   * @throws InstantiationException
+   * @throws IllegalAccessException
+   */
+  private Integer loadInputSplit(
+    String inputSplitPath)
+    throws IOException, ClassNotFoundException, InterruptedException,
+    InstantiationException, IllegalAccessException {
+    InputSplit inputSplit = getInputSplit(inputSplitPath);
+    Integer entriesRead = readInputSplit(inputSplit);
+    if (LOG.isInfoEnabled()) {
+      LOG.info("loadFromInputSplit: Finished loading " +
+          inputSplitPath + " " + entriesRead);
+    }
+    return entriesRead;
+  }
+
+  /**
+   * Talk to ZooKeeper to convert the input split path to the actual
+   * InputSplit.
+   *
+   * @param inputSplitPath Location in ZK of input split
+   * @return instance of InputSplit
+   * @throws IOException
+   * @throws ClassNotFoundException
+   */
+  protected InputSplit getInputSplit(String inputSplitPath)
+    throws IOException, ClassNotFoundException {
+    byte[] splitList;
+    try {
+      splitList = zooKeeperExt.getData(inputSplitPath, false, null);
+    } catch (KeeperException e) {
+      throw new IllegalStateException(
+          "getInputSplit: KeeperException on " + inputSplitPath, e);
+    } catch (InterruptedException e) {
+      throw new IllegalStateException(
+          "getInputSplit: IllegalStateException on " + inputSplitPath, e);
+    }
+    context.progress();
+
+    DataInputStream inputStream =
+        new DataInputStream(new ByteArrayInputStream(splitList));
+    InputSplit inputSplit = getInputFormat().readInputSplit(inputStream);
+
+    if (LOG.isInfoEnabled()) {
+      LOG.info("getInputSplit: Processing " + inputSplitPath +
+          " from ZooKeeper and got input split '" +
+          inputSplit.toString() + "'");
+    }
+    return inputSplit;
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/4a133f57/giraph-core/src/main/java/org/apache/giraph/worker/LocalData.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/LocalData.java b/giraph-core/src/main/java/org/apache/giraph/worker/LocalData.java
new file mode 100644
index 0000000..9612344
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/LocalData.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.worker;
+
+import org.apache.giraph.conf.DefaultImmutableClassesGiraphConfigurable;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.mapping.MappingStore;
+import org.apache.giraph.mapping.MappingStoreOps;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.log4j.Logger;
+
+/**
+ * Stores LocalData for each worker
+ *
+ * @param <I> vertexId type
+ * @param <V> vertex value type
+ * @param <E> edge value type
+ * @param <B> mappingTarget type
+ */
+@SuppressWarnings("unchecked")
+public class LocalData<I extends WritableComparable, V extends Writable,
+  E extends Writable, B extends Writable>
+  extends DefaultImmutableClassesGiraphConfigurable<I, V, E> {
+  /** Logger Instance */
+  private static final Logger LOG = Logger.getLogger(LocalData.class);
+  /** MappingStore from vertexId - target */
+  private MappingStore<I, B> mappingStore;
+  /** Do operations using mapping store */
+  private MappingStoreOps<I, B> mappingStoreOps;
+  /**
+   * Constructor
+   *
+   * Set configuration, create & initialize mapping store
+   * @param conf giraph configuration
+   */
+  public LocalData(ImmutableClassesGiraphConfiguration<I, V, E> conf) {
+    // set configuration
+    setConf(conf);
+    // check if user set the mapping store => create & initialize it
+    mappingStore = (MappingStore<I, B>) getConf().createMappingStore();
+    if (mappingStore != null) {
+      mappingStore.initialize();
+    }
+    mappingStoreOps = (MappingStoreOps<I, B>) getConf().createMappingStoreOps();
+    if (mappingStoreOps != null) {
+      mappingStoreOps.initialize(mappingStore);
+    }
+  }
+
+  public MappingStore<I, B> getMappingStore() {
+    return mappingStore;
+  }
+
+  public MappingStoreOps<I, B> getMappingStoreOps() {
+    return mappingStoreOps;
+  }
+
+  /**
+   * Remove mapping store from localData
+   * if mapping data is already embedded into vertexIds
+   */
+  public void removeMappingStoreIfPossible() {
+    if (mappingStoreOps != null && mappingStoreOps.hasEmbedding()) {
+      mappingStore = null;
+    }
+  }
+
+  /**
+   * Prints Stats of individual data it stores
+   */
+  public void printStats() {
+    if (LOG.isInfoEnabled()) {
+      LOG.info("MappingStore has : " + mappingStore.getStats() + " entries");
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/4a133f57/giraph-core/src/main/java/org/apache/giraph/worker/MappingInputSplitsCallable.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/MappingInputSplitsCallable.java b/giraph-core/src/main/java/org/apache/giraph/worker/MappingInputSplitsCallable.java
new file mode 100644
index 0000000..a2279a9
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/MappingInputSplitsCallable.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.worker;
+
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.io.GiraphInputFormat;
+import org.apache.giraph.io.MappingInputFormat;
+import org.apache.giraph.io.MappingReader;
+import org.apache.giraph.mapping.MappingStore;
+import org.apache.giraph.mapping.MappingEntry;
+import org.apache.giraph.zk.ZooKeeperExt;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Mapper;
+
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Load as many mapping input splits as possible.
+ * Every thread will has its own instance of WorkerClientRequestProcessor
+ * to send requests.
+ *
+ * @param <I> vertexId type
+ * @param <V> vertexValue type
+ * @param <E> edgeValue type
+ * @param <B> mappingTarget type
+ */
+@SuppressWarnings("unchecked")
+public class MappingInputSplitsCallable<I extends WritableComparable,
+  V extends Writable, E extends Writable, B extends Writable>
+  extends FullInputSplitCallable<I, V, E> {
+  /** User supplied mappingInputFormat */
+  private final MappingInputFormat<I, V, E, B> mappingInputFormat;
+  /** Link to bspServiceWorker */
+  private final BspServiceWorker<I, V, E> bspServiceWorker;
+
+  /**
+   * Constructor
+   *
+   * @param mappingInputFormat mappingInputFormat
+   * @param splitOrganizer Input splits organizer
+   * @param context Context
+   * @param configuration Configuration
+   * @param zooKeeperExt Handle to ZooKeeperExt
+   * @param currentIndex Atomic Integer to get splitPath from list
+   * @param bspServiceWorker bsp service worker
+   */
+  public MappingInputSplitsCallable(
+      MappingInputFormat<I, V, E, B> mappingInputFormat,
+      InputSplitPathOrganizer splitOrganizer,
+      Mapper<?, ?, ?, ?>.Context context,
+      ImmutableClassesGiraphConfiguration<I, V, E> configuration,
+      ZooKeeperExt zooKeeperExt,
+      AtomicInteger currentIndex,
+      BspServiceWorker<I, V, E> bspServiceWorker) {
+    super(splitOrganizer, context,
+      configuration, zooKeeperExt, currentIndex);
+    this.mappingInputFormat = mappingInputFormat;
+    this.bspServiceWorker = bspServiceWorker;
+  }
+
+  @Override
+  public GiraphInputFormat getInputFormat() {
+    return mappingInputFormat;
+  }
+
+  @Override
+  protected Integer readInputSplit(InputSplit inputSplit)
+    throws IOException, InterruptedException {
+    MappingReader<I, V, E, B> mappingReader =
+        mappingInputFormat.createMappingReader(inputSplit, context);
+    mappingReader.setConf(configuration);
+
+    WorkerThreadAggregatorUsage aggregatorUsage = this.bspServiceWorker
+        .getAggregatorHandler().newThreadAggregatorUsage();
+
+    mappingReader.initialize(inputSplit, context);
+    mappingReader.setWorkerAggregatorUse(aggregatorUsage);
+
+    int entriesLoaded = 0;
+    MappingStore<I, B> mappingStore =
+      (MappingStore<I, B>) bspServiceWorker.getLocalData().getMappingStore();
+
+    while (mappingReader.nextEntry()) {
+      MappingEntry<I, B> entry = mappingReader.getCurrentEntry();
+      entriesLoaded += 1;
+      mappingStore.addEntry(entry.getVertexId(), entry.getMappingTarget());
+    }
+    return entriesLoaded;
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/4a133f57/giraph-core/src/main/java/org/apache/giraph/worker/MappingInputSplitsCallableFactory.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/MappingInputSplitsCallableFactory.java b/giraph-core/src/main/java/org/apache/giraph/worker/MappingInputSplitsCallableFactory.java
new file mode 100644
index 0000000..21a981e
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/MappingInputSplitsCallableFactory.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.worker;
+
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.io.MappingInputFormat;
+import org.apache.giraph.utils.CallableFactory;
+import org.apache.giraph.zk.ZooKeeperExt;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.Mapper;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Factory for {@link org.apache.giraph.worker.MappingInputSplitsCallable}s.
+ *
+ * @param <I> vertexId type
+ * @param <V> vertexValue type
+ * @param <E> edgeValue type
+ * @param <B> mappingTarget type
+ */
+public class MappingInputSplitsCallableFactory<I extends WritableComparable,
+  V extends Writable, E extends Writable, B extends Writable>
+  implements CallableFactory<Integer> {
+  /** Mapping input format */
+  private final MappingInputFormat<I, V, E, B> mappingInputFormat;
+  /** Input split organizer */
+  private final InputSplitPathOrganizer splitOrganizer;
+  /** Mapper context. */
+  private final Mapper<?, ?, ?, ?>.Context context;
+  /** Configuration. */
+  private final ImmutableClassesGiraphConfiguration<I, V, E> configuration;
+  /** {@link BspServiceWorker} we're running on. */
+  private final BspServiceWorker<I, V, E> bspServiceWorker;
+  /** {@link ZooKeeperExt} for this worker. */
+  private final ZooKeeperExt zooKeeperExt;
+  /** Current position in the path list */
+  private final AtomicInteger currentIndex;
+
+
+  /**
+   * Constructor.
+   *
+   * @param mappingInputFormat Mapping input format
+   * @param splitOrganizer Input split organizer
+   * @param context Mapper context
+   * @param configuration Configuration
+   * @param bspServiceWorker Calling {@link BspServiceWorker}
+   * @param zooKeeperExt {@link org.apache.giraph.zk.ZooKeeperExt}
+   *                     for this worker
+   */
+  public MappingInputSplitsCallableFactory(
+      MappingInputFormat<I, V, E, B> mappingInputFormat,
+      InputSplitPathOrganizer splitOrganizer,
+      Mapper<?, ?, ?, ?>.Context context,
+      ImmutableClassesGiraphConfiguration<I, V, E> configuration,
+      BspServiceWorker<I, V, E> bspServiceWorker,
+      ZooKeeperExt zooKeeperExt) {
+    this.mappingInputFormat = mappingInputFormat;
+    this.splitOrganizer = splitOrganizer;
+    this.context = context;
+    this.configuration = configuration;
+    this.bspServiceWorker = bspServiceWorker;
+    this.zooKeeperExt = zooKeeperExt;
+    this.currentIndex = new AtomicInteger(0);
+  }
+
+  @Override
+  public FullInputSplitCallable<I, V, E> newCallable(int threadId) {
+    return new MappingInputSplitsCallable<>(
+        mappingInputFormat,
+        splitOrganizer,
+        context,
+        configuration,
+        zooKeeperExt,
+        currentIndex,
+        bspServiceWorker);
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/4a133f57/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallable.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallable.java b/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallable.java
index e3e04d6..4c85765 100644
--- a/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallable.java
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallable.java
@@ -19,12 +19,15 @@
 package org.apache.giraph.worker;
 
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.edge.Edge;
+import org.apache.giraph.edge.OutEdges;
 import org.apache.giraph.graph.Vertex;
 import org.apache.giraph.graph.VertexEdgeCount;
 import org.apache.giraph.io.GiraphInputFormat;
 import org.apache.giraph.io.VertexInputFormat;
 import org.apache.giraph.io.VertexReader;
 import org.apache.giraph.io.filters.VertexInputFilter;
+import org.apache.giraph.mapping.translate.TranslateEdge;
 import org.apache.giraph.partition.PartitionOwner;
 import org.apache.giraph.utils.LoggerUtils;
 import org.apache.giraph.utils.MemoryUtils;
@@ -50,6 +53,7 @@ import java.io.IOException;
  * @param <V> Vertex value
  * @param <E> Edge value
  */
+@SuppressWarnings("unchecked")
 public class VertexInputSplitsCallable<I extends WritableComparable,
     V extends Writable, E extends Writable>
     extends InputSplitsCallable<I, V, E> {
@@ -69,6 +73,15 @@ public class VertexInputSplitsCallable<I extends WritableComparable,
   private final BspServiceWorker<I, V, E> bspServiceWorker;
   /** Filter to select which vertices to keep */
   private final VertexInputFilter<I, V, E> vertexInputFilter;
+  /** Can embedInfo in vertexIds */
+  private final boolean canEmbedInIds;
+  /**
+   * Whether the chosen {@link OutEdges} implementation allows for Edge
+   * reuse.
+   */
+  private boolean reuseEdgeObjects;
+  /** Used to translate Edges during vertex input phase based on localData */
+  private final TranslateEdge<I, E> translateEdge;
 
   // Metrics
   /** number of vertices loaded meter across all readers */
@@ -102,6 +115,15 @@ public class VertexInputSplitsCallable<I extends WritableComparable,
     inputSplitMaxVertices = configuration.getInputSplitMaxVertices();
     this.bspServiceWorker = bspServiceWorker;
     vertexInputFilter = configuration.getVertexInputFilter();
+    reuseEdgeObjects = configuration.reuseEdgeObjects();
+    canEmbedInIds = bspServiceWorker
+        .getLocalData()
+        .getMappingStoreOps() != null &&
+        bspServiceWorker
+            .getLocalData()
+            .getMappingStoreOps()
+            .hasEmbedding();
+    translateEdge = bspServiceWorker.getTranslateEdge();
 
     // Initialize Metrics
     totalVerticesMeter = getTotalVerticesLoadedMeter();
@@ -151,6 +173,12 @@ public class VertexInputSplitsCallable<I extends WritableComparable,
             "readInputSplit: Vertex reader returned a vertex " +
                 "without an id!  - " + readerVertex);
       }
+      if (canEmbedInIds) {
+        bspServiceWorker
+            .getLocalData()
+            .getMappingStoreOps()
+            .embedTargetInfo(readerVertex.getId());
+      }
       if (readerVertex.getValue() == null) {
         readerVertex.setValue(configuration.createVertexValue());
       }
@@ -167,6 +195,37 @@ public class VertexInputSplitsCallable<I extends WritableComparable,
         continue;
       }
 
+      // Before saving to partition-store translate all edges (if present)
+      if (translateEdge != null) {
+        // only iff vertexInput reads edges also
+        if (readerVertex.getEdges() != null && readerVertex.getNumEdges() > 0) {
+          OutEdges<I, E> vertexOutEdges = configuration
+              .createAndInitializeOutEdges(readerVertex.getNumEdges());
+          // TODO : this works for generic OutEdges, can create a better api
+          // to support more efficient translation for specific types
+
+          // NOTE : for implementations where edge is reusable, space is
+          // consumed by the OutEdges data structure itself, but if not reusable
+          // space is consumed by the newly created edge -> and the new OutEdges
+          // data structure just holds a reference to the newly created edge
+          // so in any way we virtually hold edges twice - similar to
+          // OutEdges.trim() -> this has the same complexity as OutEdges.trim()
+          for (Edge<I, E> edge : readerVertex.getEdges()) {
+            if (reuseEdgeObjects) {
+              bspServiceWorker
+                  .getLocalData()
+                  .getMappingStoreOps()
+                  .embedTargetInfo(edge.getTargetVertexId());
+              vertexOutEdges.add(edge); // edge can be re-used
+            } else { // edge objects cannot be reused - so create new edges
+              vertexOutEdges.add(configuration.createEdge(translateEdge, edge));
+            }
+          }
+          // set out edges to translated instance -> old instance is released
+          readerVertex.setEdges(vertexOutEdges);
+        }
+      }
+
       PartitionOwner partitionOwner =
           bspServiceWorker.getVertexPartitionOwner(readerVertex.getId());
       workerClientRequestProcessor.sendVertexRequest(

http://git-wip-us.apache.org/repos/asf/giraph/blob/4a133f57/giraph-core/src/test/java/org/apache/giraph/partition/SimpleRangePartitionFactoryTest.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/partition/SimpleRangePartitionFactoryTest.java b/giraph-core/src/test/java/org/apache/giraph/partition/SimpleRangePartitionFactoryTest.java
index 4e19cd2..96bd5d7 100644
--- a/giraph-core/src/test/java/org/apache/giraph/partition/SimpleRangePartitionFactoryTest.java
+++ b/giraph-core/src/test/java/org/apache/giraph/partition/SimpleRangePartitionFactoryTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.giraph.partition;
 
+import java.net.InetSocketAddress;
 import java.util.ArrayList;
 import java.util.Collection;
 
@@ -47,6 +48,7 @@ public class SimpleRangePartitionFactoryTest {
     ArrayList<WorkerInfo> infos = new ArrayList<WorkerInfo>();
     for (int i = 0; i < numWorkers; i++) {
       WorkerInfo info = new WorkerInfo();
+      info.setInetSocketAddress(new InetSocketAddress(8080));
       info.setTaskId(i);
       infos.add(info);
     }

http://git-wip-us.apache.org/repos/asf/giraph/blob/4a133f57/giraph-hive/src/main/java/org/apache/giraph/hive/HiveGiraphRunner.java
----------------------------------------------------------------------
diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/HiveGiraphRunner.java b/giraph-hive/src/main/java/org/apache/giraph/hive/HiveGiraphRunner.java
index 603910b..fbc24f8 100644
--- a/giraph-hive/src/main/java/org/apache/giraph/hive/HiveGiraphRunner.java
+++ b/giraph-hive/src/main/java/org/apache/giraph/hive/HiveGiraphRunner.java
@@ -31,6 +31,8 @@ import org.apache.giraph.graph.Computation;
 import org.apache.giraph.hive.common.HiveUtils;
 import org.apache.giraph.hive.input.edge.HiveEdgeInputFormat;
 import org.apache.giraph.hive.input.edge.HiveToEdge;
+import org.apache.giraph.hive.input.mapping.HiveMappingInputFormat;
+import org.apache.giraph.hive.input.mapping.HiveToMapping;
 import org.apache.giraph.hive.input.vertex.HiveToVertex;
 import org.apache.giraph.hive.input.vertex.HiveVertexInputFormat;
 import org.apache.giraph.hive.output.HiveVertexOutputFormat;
@@ -55,6 +57,7 @@ import java.util.Arrays;
 import java.util.List;
 
 import static org.apache.giraph.hive.common.GiraphHiveConstants.HIVE_EDGE_INPUT;
+import static org.apache.giraph.hive.common.GiraphHiveConstants.HIVE_MAPPING_INPUT;
 import static org.apache.giraph.hive.common.GiraphHiveConstants.HIVE_VERTEX_INPUT;
 import static org.apache.giraph.hive.common.GiraphHiveConstants.HIVE_VERTEX_OUTPUT_DATABASE;
 import static org.apache.giraph.hive.common.GiraphHiveConstants.HIVE_VERTEX_OUTPUT_PARTITION;
@@ -87,6 +90,8 @@ public class HiveGiraphRunner implements Tool {
   private List<EdgeInputFormatDescription> edgeInputDescriptions =
       Lists.newArrayList();
 
+  /** Hive Mapping reader */
+  private Class<? extends HiveToMapping> hiveToMappingClass;
   /** Hive Vertex writer */
   private Class<? extends VertexToHive> vertexToHiveClass;
   /** Skip output? (Useful for testing without writing) */
@@ -238,6 +243,36 @@ public class HiveGiraphRunner implements Tool {
   }
 
   /**
+   * Check if mapping input is set
+   *
+   * @return true if mapping input is set
+   */
+  public boolean hasMappingInput() {
+    return hiveToMappingClass != null;
+  }
+
+  /**
+   * Set mapping input
+   *
+   * @param hiveToMappingClass class for reading mapping entries from Hive.
+   * @param tableName Table name
+   * @param partitionFilter Partition filter, or null if no filter used
+   */
+  public void setMappingInput(
+      Class<? extends HiveToMapping> hiveToMappingClass, String tableName,
+      String partitionFilter) {
+    this.hiveToMappingClass = hiveToMappingClass;
+    conf.set(HIVE_MAPPING_INPUT.getClassOpt().getKey(),
+        hiveToMappingClass.getName());
+    conf.set(HIVE_MAPPING_INPUT.getProfileIdOpt().getKey(),
+        "mapping_input_profile");
+    conf.set(HIVE_MAPPING_INPUT.getTableOpt().getKey(), tableName);
+    if (partitionFilter != null) {
+      conf.set(HIVE_MAPPING_INPUT.getPartitionOpt().getKey(), partitionFilter);
+    }
+  }
+
+  /**
    * main method
    * @param args system arguments
    * @throws Exception any errors from Hive Giraph Runner
@@ -425,6 +460,22 @@ public class HiveGiraphRunner implements Tool {
   }
 
   /**
+   * Prepare input settings in Configuration
+   *
+   * This caches metadata information into the configuration to eliminate worker
+   * access to the metastore.
+   */
+  public void prepareHiveMappingInput() {
+    GiraphConstants.MAPPING_INPUT_FORMAT_CLASS.set(conf,
+        HiveMappingInputFormat.class);
+
+    Configuration confCopy = new Configuration(conf);
+    createGiraphConf(confCopy)
+        .createWrappedMappingInputFormat()
+        .checkInputSpecs(confCopy);
+  }
+
+  /**
    * process arguments
    * @param args to process
    * @return CommandLine instance
@@ -458,6 +509,17 @@ public class HiveGiraphRunner implements Tool {
               " class name (-computationClass) to use");
     }
 
+    String mappingInput = cmdln.getOptionValue("mappingInput");
+    if (mappingInput != null) {
+      String[] parameters = split(mappingInput, ",", 3);
+      if (parameters.length < 2) {
+        throw new IllegalStateException("Illegal mappingInput description " +
+            mappingInput + " - HiveToMapping class and table name needed");
+      }
+      setMappingInput(findClass(parameters[0], HiveToMapping.class),
+          parameters[1], elementOrNull(parameters, 2));
+    }
+
     String[] vertexInputs = cmdln.getOptionValues("vertexInput");
     if (vertexInputs != null && vertexInputs.length != 0) {
       vertexInputDescriptions.clear();
@@ -534,6 +596,11 @@ public class HiveGiraphRunner implements Tool {
     // allow metastore changes (i.e. creating tables that don't exist)
     processMoreArguments(cmdln);
 
+    if (mappingInput != null) { // mapping input is provided
+      HIVE_MAPPING_INPUT.getDatabaseOpt().set(conf, dbName);
+      prepareHiveMappingInput();
+    }
+
     if (hasVertexInput()) {
       HIVE_VERTEX_INPUT.getDatabaseOpt().set(conf, dbName);
       prepareHiveVertexInputs();
@@ -573,6 +640,12 @@ public class HiveGiraphRunner implements Tool {
 
     options.addOption("db", "dbName", true, "Hive database name");
 
+    // Mapping input settings
+    options.addOption("mi", "mappingInput", true, "Giraph " +
+        HiveToMapping.class.getSimpleName() + " class to use, table name and " +
+        "partition filter (optional). Example:\n" +
+        "\"MyHiveToMapping, myTableName, a=1,b=two");
+
     // Vertex input settings
     options.addOption("vi", "vertexInput", true, getInputOptionDescription(
         "vertex", HiveToVertex.class.getSimpleName()));

http://git-wip-us.apache.org/repos/asf/giraph/blob/4a133f57/giraph-hive/src/main/java/org/apache/giraph/hive/common/GiraphHiveConstants.java
----------------------------------------------------------------------
diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/common/GiraphHiveConstants.java b/giraph-hive/src/main/java/org/apache/giraph/hive/common/GiraphHiveConstants.java
index c7ad63b..ab533a2 100644
--- a/giraph-hive/src/main/java/org/apache/giraph/hive/common/GiraphHiveConstants.java
+++ b/giraph-hive/src/main/java/org/apache/giraph/hive/common/GiraphHiveConstants.java
@@ -20,6 +20,7 @@ package org.apache.giraph.hive.common;
 
 import org.apache.giraph.conf.ClassConfOption;
 import org.apache.giraph.conf.StrConfOption;
+import org.apache.giraph.hive.input.mapping.HiveToMapping;
 import org.apache.giraph.hive.input.edge.HiveToEdge;
 import org.apache.giraph.hive.input.vertex.HiveToVertex;
 import org.apache.giraph.hive.output.VertexToHive;
@@ -28,6 +29,9 @@ import org.apache.giraph.hive.output.VertexToHive;
  * Constants for giraph-hive
  */
 public class GiraphHiveConstants {
+  /** Options for configuring mapping input */
+  public static final HiveInputOptions<HiveToMapping> HIVE_MAPPING_INPUT =
+      new HiveInputOptions<>("mapping", HiveToMapping.class);
   /** Options for configuring vertex input */
   public static final HiveInputOptions<HiveToVertex> HIVE_VERTEX_INPUT =
       new HiveInputOptions<HiveToVertex>("vertex", HiveToVertex.class);

http://git-wip-us.apache.org/repos/asf/giraph/blob/4a133f57/giraph-hive/src/main/java/org/apache/giraph/hive/common/HiveUtils.java
----------------------------------------------------------------------
diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/common/HiveUtils.java b/giraph-hive/src/main/java/org/apache/giraph/hive/common/HiveUtils.java
index 2388673..35d8b3e 100644
--- a/giraph-hive/src/main/java/org/apache/giraph/hive/common/HiveUtils.java
+++ b/giraph-hive/src/main/java/org/apache/giraph/hive/common/HiveUtils.java
@@ -20,6 +20,7 @@ package org.apache.giraph.hive.common;
 
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.conf.StrConfOption;
+import org.apache.giraph.hive.input.mapping.HiveToMapping;
 import org.apache.giraph.hive.input.edge.HiveToEdge;
 import org.apache.giraph.hive.input.vertex.HiveToVertex;
 import org.apache.giraph.hive.output.VertexToHive;
@@ -46,12 +47,14 @@ import java.util.Map;
 
 import static java.lang.System.getenv;
 import static org.apache.giraph.hive.common.GiraphHiveConstants.HIVE_EDGE_INPUT;
+import static org.apache.giraph.hive.common.GiraphHiveConstants.HIVE_MAPPING_INPUT;
 import static org.apache.giraph.hive.common.GiraphHiveConstants.HIVE_VERTEX_INPUT;
 import static org.apache.giraph.hive.common.GiraphHiveConstants.VERTEX_TO_HIVE_CLASS;
 
 /**
  * Utility methods for Hive IO
  */
+@SuppressWarnings("unchecked")
 public class HiveUtils {
   /** Logger */
   private static final Logger LOG = Logger.getLogger(HiveUtils.class);
@@ -342,6 +345,31 @@ public class HiveUtils {
   }
 
   /**
+   * Create a new HiveToMapping
+   *
+   * @param conf ImmutableClassesGiraphConfiguration
+   * @param schema HiveTableSchema
+   * @param <I> vertexId type
+   * @param <V> vertexValue type
+   * @param <E> edgeValue type
+   * @param <B> mappingTarget type
+   * @return HiveToMapping
+   */
+  public static <I extends WritableComparable, V extends Writable,
+    E extends Writable, B extends Writable>
+  HiveToMapping<I, B> newHiveToMapping(
+    ImmutableClassesGiraphConfiguration<I, V, E> conf,
+    HiveTableSchema schema) {
+    Class<? extends HiveToMapping> klass = HIVE_MAPPING_INPUT.getClass(conf);
+    if (klass == null) {
+      throw new IllegalArgumentException(
+          HIVE_MAPPING_INPUT.getClassOpt().getKey() + " not set in conf"
+      );
+    }
+    return newInstance(klass, conf, schema);
+  }
+
+  /**
    * Create a new instance of a class, configuring it and setting the Hive table
    * schema if it supports those types.
    *

http://git-wip-us.apache.org/repos/asf/giraph/blob/4a133f57/giraph-hive/src/main/java/org/apache/giraph/hive/input/mapping/AbstractHiveToMapping.java
----------------------------------------------------------------------
diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/input/mapping/AbstractHiveToMapping.java b/giraph-hive/src/main/java/org/apache/giraph/hive/input/mapping/AbstractHiveToMapping.java
new file mode 100644
index 0000000..dc7a6ee
--- /dev/null
+++ b/giraph-hive/src/main/java/org/apache/giraph/hive/input/mapping/AbstractHiveToMapping.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.hive.input.mapping;
+
+import org.apache.giraph.hive.common.DefaultConfigurableAndTableSchemaAware;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+/**
+ * AbstractHiveToMapping
+ *
+ * @param <I> vertexId type parameter
+ * @param <B> mapping target type parameter
+ */
+public abstract class AbstractHiveToMapping<I extends WritableComparable,
+    B extends Writable>
+    extends DefaultConfigurableAndTableSchemaAware<I, Writable, Writable>
+    implements HiveToMapping<I, B> {
+  @Override
+  public final void remove() {
+    throw new UnsupportedOperationException();
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/4a133f57/giraph-hive/src/main/java/org/apache/giraph/hive/input/mapping/HiveMappingInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/input/mapping/HiveMappingInputFormat.java b/giraph-hive/src/main/java/org/apache/giraph/hive/input/mapping/HiveMappingInputFormat.java
new file mode 100644
index 0000000..973813d
--- /dev/null
+++ b/giraph-hive/src/main/java/org/apache/giraph/hive/input/mapping/HiveMappingInputFormat.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.hive.input.mapping;
+
+import com.facebook.hiveio.input.HiveApiInputFormat;
+import com.facebook.hiveio.input.HiveInputDescription;
+import com.facebook.hiveio.record.HiveReadableRecord;
+import com.facebook.hiveio.schema.HiveTableSchema;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.hive.common.GiraphHiveConstants;
+import org.apache.giraph.io.MappingInputFormat;
+import org.apache.giraph.io.MappingReader;
+import org.apache.giraph.io.iterables.MappingReaderWrapper;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import java.io.IOException;
+import java.util.List;
+
+import static org.apache.giraph.hive.common.HiveUtils.newHiveToMapping;
+
+/**
+ * HiveMappingInputFormat extends MappingInputFormat
+ *
+ * @param <I> vertexId type
+ * @param <V> vertexValue type
+ * @param <E> edgeValue type
+ * @param <B> mappingTarget type
+ */
+public class HiveMappingInputFormat<I extends WritableComparable,
+  V extends Writable, E extends Writable, B extends Writable>
+  extends MappingInputFormat<I, V, E, B> {
+  /** Underlying Hive InputFormat used */
+  private final HiveApiInputFormat hiveInputFormat;
+
+  /**
+   * Create vertex input format
+   */
+  public HiveMappingInputFormat() {
+    hiveInputFormat = new HiveApiInputFormat();
+  }
+
+  @Override
+  public void checkInputSpecs(Configuration conf) {
+    HiveInputDescription inputDesc =
+        GiraphHiveConstants.HIVE_MAPPING_INPUT.makeInputDescription(conf);
+    HiveTableSchema schema = getTableSchema();
+    HiveToMapping<I, B> hiveToMapping = newHiveToMapping(getConf(), schema);
+    hiveToMapping.checkInput(inputDesc, schema);
+  }
+
+
+  @Override
+  public void setConf(ImmutableClassesGiraphConfiguration<I, V, E> conf) {
+    super.setConf(conf);
+    hiveInputFormat.initialize(
+        GiraphHiveConstants.HIVE_MAPPING_INPUT.makeInputDescription(conf),
+        GiraphHiveConstants.HIVE_MAPPING_INPUT.getProfileID(conf),
+        conf);
+  }
+
+
+  @Override
+  public List<InputSplit> getSplits(JobContext context, int minSplitCountHint)
+    throws IOException, InterruptedException {
+    return hiveInputFormat.getSplits(context);
+  }
+
+  @Override
+  public MappingReader<I, V, E, B> createMappingReader(InputSplit split,
+    TaskAttemptContext context) throws IOException {
+    HiveMappingReader<I, B> reader = new HiveMappingReader<>();
+    reader.setTableSchema(getTableSchema());
+
+    RecordReader<WritableComparable, HiveReadableRecord> baseReader;
+    try {
+      baseReader = hiveInputFormat.createRecordReader(split, context);
+    } catch (InterruptedException e) {
+      throw new IOException("Could not create map reader", e);
+    }
+
+    reader.setHiveRecordReader(baseReader);
+    return new MappingReaderWrapper<>(reader);
+  }
+
+
+  /**
+   * Get Hive table schema
+   *
+   * @return Hive table schema
+   */
+  private HiveTableSchema getTableSchema() {
+    return hiveInputFormat.getTableSchema(getConf());
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/4a133f57/giraph-hive/src/main/java/org/apache/giraph/hive/input/mapping/HiveMappingReader.java
----------------------------------------------------------------------
diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/input/mapping/HiveMappingReader.java b/giraph-hive/src/main/java/org/apache/giraph/hive/input/mapping/HiveMappingReader.java
new file mode 100644
index 0000000..3154f9d
--- /dev/null
+++ b/giraph-hive/src/main/java/org/apache/giraph/hive/input/mapping/HiveMappingReader.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.hive.input.mapping;
+
+import com.facebook.hiveio.record.HiveReadableRecord;
+import org.apache.giraph.hive.common.DefaultConfigurableAndTableSchemaAware;
+import org.apache.giraph.hive.common.HiveUtils;
+import org.apache.giraph.hive.input.RecordReaderWrapper;
+import org.apache.giraph.io.iterables.GiraphReader;
+import org.apache.giraph.mapping.MappingEntry;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import java.io.IOException;
+
+/**
+ * MappingReader using Hive
+ *
+ * @param <I> vertexId type
+ * @param <B> mappingTarget type
+ */
+public class HiveMappingReader<I extends WritableComparable,
+  B extends Writable>
+  extends DefaultConfigurableAndTableSchemaAware<I, Writable, Writable>
+  implements GiraphReader<MappingEntry<I, B>> {
+  /** Underlying Hive RecordReader used */
+  private RecordReader<WritableComparable, HiveReadableRecord> hiveRecordReader;
+  /** Hive To Mapping */
+  private HiveToMapping<I, B> hiveToMapping;
+
+  /**
+   * Get hiverecord reader
+   *
+   * @return hiveRecordReader
+   */
+  public RecordReader<WritableComparable, HiveReadableRecord>
+  getHiveRecordReader() {
+    return hiveRecordReader;
+  }
+
+  public void setHiveRecordReader(
+      RecordReader<WritableComparable, HiveReadableRecord> hiveRecordReader) {
+    this.hiveRecordReader = hiveRecordReader;
+  }
+
+  @Override
+  public void initialize(InputSplit inputSplit,
+    TaskAttemptContext context) throws IOException, InterruptedException {
+    hiveRecordReader.initialize(inputSplit, context);
+    hiveToMapping = HiveUtils.newHiveToMapping(getConf(), getTableSchema());
+    hiveToMapping.initializeRecords(
+        new RecordReaderWrapper<>(hiveRecordReader));
+  }
+
+  @Override
+  public void close() throws IOException {
+    hiveRecordReader.close();
+  }
+
+  @Override
+  public float getProgress() throws IOException, InterruptedException {
+    return hiveRecordReader.getProgress();
+  }
+
+  @Override
+  public boolean hasNext() {
+    return hiveToMapping.hasNext();
+  }
+
+
+  @Override
+  public MappingEntry<I, B> next() {
+    return hiveToMapping.next();
+  }
+
+  @Override
+  public void remove() {
+    hiveToMapping.remove();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/4a133f57/giraph-hive/src/main/java/org/apache/giraph/hive/input/mapping/HiveToMapping.java
----------------------------------------------------------------------
diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/input/mapping/HiveToMapping.java b/giraph-hive/src/main/java/org/apache/giraph/hive/input/mapping/HiveToMapping.java
new file mode 100644
index 0000000..497b044
--- /dev/null
+++ b/giraph-hive/src/main/java/org/apache/giraph/hive/input/mapping/HiveToMapping.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.hive.input.mapping;
+
+import com.facebook.hiveio.record.HiveReadableRecord;
+import org.apache.giraph.hive.input.HiveInputChecker;
+import org.apache.giraph.mapping.MappingEntry;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+import java.util.Iterator;
+
+/**
+ *  HiveToMapping interface
+ *
+ * @param <I> vertexId type
+ * @param <B> mappingTarget type
+ */
+public interface HiveToMapping<I extends WritableComparable,
+    B extends Writable> extends
+    Iterator<MappingEntry<I, B>>, HiveInputChecker {
+  /**
+   * Set the records which contain vertex input data
+   *
+   * @param records Hive records
+   */
+  void initializeRecords(Iterator<HiveReadableRecord> records);
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/4a133f57/giraph-hive/src/main/java/org/apache/giraph/hive/input/mapping/SimpleHiveToMapping.java
----------------------------------------------------------------------
diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/input/mapping/SimpleHiveToMapping.java b/giraph-hive/src/main/java/org/apache/giraph/hive/input/mapping/SimpleHiveToMapping.java
new file mode 100644
index 0000000..feccc1f
--- /dev/null
+++ b/giraph-hive/src/main/java/org/apache/giraph/hive/input/mapping/SimpleHiveToMapping.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.hive.input.mapping;
+
+import com.facebook.hiveio.record.HiveReadableRecord;
+import org.apache.giraph.mapping.MappingEntry;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+import java.util.Iterator;
+
+/**
+ * SimpleHiveToMapping - convenient class for HiveToMapping
+ *
+ * @param <I> vertexId type
+ * @param <B> mappingTarget type
+ */
+@SuppressWarnings("unchecked")
+public abstract class SimpleHiveToMapping<I extends WritableComparable,
+  B extends Writable> extends AbstractHiveToMapping<I, B> {
+  /** Hive records which we are reading from */
+  private Iterator<HiveReadableRecord> records;
+
+  /** Reusable entry object */
+  private  MappingEntry<I, B> reusableEntry;
+
+  /** Reusable vertex id */
+  private I reusableVertexId;
+  /** Reusable mapping target */
+  private B reusableMappingTarget;
+
+  /**
+   * Read vertexId from hive record
+   *
+   * @param record HiveReadableRecord
+   * @return vertexId
+   */
+  public abstract I getVertexId(HiveReadableRecord record);
+
+  /**
+   * Read mappingTarget from hive record
+   *
+   * @param record HiveReadableRecord
+   * @return mappingTarget
+   */
+  public abstract B getMappingTarget(HiveReadableRecord record);
+
+  @Override
+  public void initializeRecords(Iterator<HiveReadableRecord> records) {
+    this.records = records;
+    reusableVertexId = getConf().createVertexId();
+    reusableMappingTarget = (B) getConf().createMappingTarget();
+    reusableEntry = new MappingEntry<>(reusableVertexId,
+        reusableMappingTarget);
+  }
+
+  @Override
+  public boolean hasNext() {
+    return records.hasNext();
+  }
+
+  @Override
+  public MappingEntry<I, B> next() {
+    HiveReadableRecord record = records.next();
+    I id = getVertexId(record);
+    B target = getMappingTarget(record);
+    reusableEntry.setVertexId(id);
+    reusableEntry.setMappingTarget(target);
+    return reusableEntry;
+  }
+
+  /**
+   * Returns reusableVertexId for use in other methods
+   *
+   * @return reusableVertexId
+   */
+  public I getReusableVertexId() {
+    return reusableVertexId;
+  }
+
+  /**
+   * Returns reusableMappingTarget for use in other methods
+   *
+   * @return reusableMappingTarget
+   */
+  public B getReusableMappingTarget() {
+    return reusableMappingTarget;
+  }
+}


Mime
View raw message