giraph-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From m...@apache.org
Subject git commit: updated refs/heads/trunk to 02e73b9
Date Fri, 19 May 2017 01:09:22 GMT
Repository: giraph
Updated Branches:
  refs/heads/trunk 67540b32b -> 02e73b917


JIRA-1147

closes #38


Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/02e73b91
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/02e73b91
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/02e73b91

Branch: refs/heads/trunk
Commit: 02e73b917972ef69c7b901c3b5c493a4a504731c
Parents: 67540b3
Author: Maja Kabiljo <majakabiljo@fb.com>
Authored: Thu May 18 18:08:46 2017 -0700
Committer: Maja Kabiljo <majakabiljo@fb.com>
Committed: Thu May 18 18:08:46 2017 -0700

----------------------------------------------------------------------
 .../comm/requests/AskForInputSplitRequest.java  | 16 +++-
 .../apache/giraph/master/BspServiceMaster.java  |  2 +-
 .../master/input/MasterInputSplitsHandler.java  | 95 +++++++++++++++++++-
 .../giraph/worker/InputSplitsCallable.java      | 10 ++-
 .../giraph/worker/WorkerInputSplitsHandler.java |  6 +-
 5 files changed, 119 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/giraph/blob/02e73b91/giraph-core/src/main/java/org/apache/giraph/comm/requests/AskForInputSplitRequest.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/requests/AskForInputSplitRequest.java
b/giraph-core/src/main/java/org/apache/giraph/comm/requests/AskForInputSplitRequest.java
index 5d9e4e6..9f0616d 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/requests/AskForInputSplitRequest.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/requests/AskForInputSplitRequest.java
@@ -34,16 +34,25 @@ public class AskForInputSplitRequest extends WritableRequest
   private InputType splitType;
   /** Task id of worker which requested the split */
   private int workerTaskId;
+  /**
+   * Whether this is the first split a thread is requesting,
+   * or this request indicates that previously requested input split was done
+   */
+  private boolean isFirstSplit;
 
   /**
    * Constructor
    *
    * @param splitType Type of split we are requesting
    * @param workerTaskId Task id of worker which requested the split
+   * @param isFirstSplit Whether this is the first split a thread is requesting,
+   *   or this request indicates that previously requested input split was done
    */
-  public AskForInputSplitRequest(InputType splitType, int workerTaskId) {
+  public AskForInputSplitRequest(InputType splitType, int workerTaskId,
+      boolean isFirstSplit) {
     this.splitType = splitType;
     this.workerTaskId = workerTaskId;
+    this.isFirstSplit = isFirstSplit;
   }
 
   /**
@@ -54,19 +63,22 @@ public class AskForInputSplitRequest extends WritableRequest
 
   @Override
   public void doRequest(MasterGlobalCommHandler commHandler) {
-    commHandler.getInputSplitsHandler().sendSplitTo(splitType, workerTaskId);
+    commHandler.getInputSplitsHandler().sendSplitTo(
+        splitType, workerTaskId, isFirstSplit);
   }
 
   @Override
   void readFieldsRequest(DataInput in) throws IOException {
     splitType = InputType.values()[in.readInt()];
     workerTaskId = in.readInt();
+    isFirstSplit = in.readBoolean();
   }
 
   @Override
   void writeRequest(DataOutput out) throws IOException {
     out.writeInt(splitType.ordinal());
     out.writeInt(workerTaskId);
+    out.writeBoolean(isFirstSplit);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/giraph/blob/02e73b91/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java b/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
index 7c54678..779bccb 100644
--- a/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
+++ b/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
@@ -846,7 +846,7 @@ public class BspServiceMaster<I extends WritableComparable,
           globalCommHandler = new MasterGlobalCommHandler(
               new MasterAggregatorHandler(getConfiguration(), getContext()),
               new MasterInputSplitsHandler(
-                  getConfiguration().useInputSplitLocality()));
+                  getConfiguration().useInputSplitLocality(), getContext()));
           aggregatorTranslation = new AggregatorToGlobalCommTranslation(
               getConfiguration(), globalCommHandler);
 

http://git-wip-us.apache.org/repos/asf/giraph/blob/02e73b91/giraph-core/src/main/java/org/apache/giraph/master/input/MasterInputSplitsHandler.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/master/input/MasterInputSplitsHandler.java
b/giraph-core/src/main/java/org/apache/giraph/master/input/MasterInputSplitsHandler.java
index 327e59d..3044f03 100644
--- a/giraph-core/src/main/java/org/apache/giraph/master/input/MasterInputSplitsHandler.java
+++ b/giraph-core/src/main/java/org/apache/giraph/master/input/MasterInputSplitsHandler.java
@@ -20,10 +20,13 @@ package org.apache.giraph.master.input;
 
 import org.apache.giraph.comm.MasterClient;
 import org.apache.giraph.comm.requests.ReplyWithInputSplitRequest;
+import org.apache.giraph.conf.StrConfOption;
 import org.apache.giraph.io.GiraphInputFormat;
 import org.apache.giraph.io.InputType;
 import org.apache.giraph.worker.WorkerInfo;
+import org.apache.hadoop.mapreduce.Counter;
 import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Mapper;
 
 import java.io.ByteArrayOutputStream;
 import java.io.DataOutput;
@@ -34,6 +37,7 @@ import java.util.EnumMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicInteger;
 
 /**
  * Handler for input splits on master
@@ -44,6 +48,15 @@ import java.util.concurrent.CountDownLatch;
  * these splits back to queues.
  */
 public class MasterInputSplitsHandler {
+  /**
+   * Store in counters timestamps when we finished reading
+   * these fractions of input
+   */
+  public static final StrConfOption DONE_FRACTIONS_TO_STORE_IN_COUNTERS =
+      new StrConfOption("giraph.master.input.doneFractionsToStoreInCounters",
+          "0.99,1", "Store in counters timestamps when we finished reading " +
+          "these fractions of input");
+
   /** Whether to use locality information */
   private final boolean useLocality;
   /** Master client */
@@ -56,16 +69,42 @@ public class MasterInputSplitsHandler {
   /** Latches to say when one input splits type is ready to be accessed */
   private Map<InputType, CountDownLatch> latchesMap =
       new EnumMap<>(InputType.class);
+  /** Context for accessing counters */
+  private final Mapper.Context context;
+  /** How many splits per type are there total */
+  private final Map<InputType, Integer> numSplitsPerType =
+      new EnumMap<>(InputType.class);
+  /** How many splits per type have been read so far */
+  private final Map<InputType, AtomicInteger> numSplitsReadPerType =
+      new EnumMap<>(InputType.class);
+  /** Timestamps when various splits were created */
+  private final Map<InputType, Long> splitsCreatedTimestamp =
+      new EnumMap<>(InputType.class);
+  /**
+   * Store in counters timestamps when we finished reading
+   * these fractions of input
+   */
+  private final double[] doneFractionsToStoreInCounters;
 
   /**
    * Constructor
    *
    * @param useLocality Whether to use locality information or not
+   * @param context Context for accessing counters
    */
-  public MasterInputSplitsHandler(boolean useLocality) {
+  public MasterInputSplitsHandler(boolean useLocality, Mapper.Context context) {
     this.useLocality = useLocality;
+    this.context = context;
     for (InputType inputType : InputType.values()) {
       latchesMap.put(inputType, new CountDownLatch(1));
+      numSplitsReadPerType.put(inputType, new AtomicInteger(0));
+    }
+
+    String[] tmp = DONE_FRACTIONS_TO_STORE_IN_COUNTERS.get(
+        context.getConfiguration()).split(",");
+    doneFractionsToStoreInCounters = new double[tmp.length];
+    for (int i = 0; i < tmp.length; i++) {
+      doneFractionsToStoreInCounters[i] = Double.parseDouble(tmp[i].trim());
     }
   }
 
@@ -89,6 +128,7 @@ public class MasterInputSplitsHandler {
    */
   public void addSplits(InputType splitsType, List<InputSplit> inputSplits,
       GiraphInputFormat inputFormat) {
+    splitsCreatedTimestamp.put(splitsType, System.currentTimeMillis());
     List<byte[]> serializedSplits = new ArrayList<>();
     for (InputSplit inputSplit : inputSplits) {
       try {
@@ -114,6 +154,7 @@ public class MasterInputSplitsHandler {
     }
     splitsMap.put(splitsType, inputSplitsOrganizer);
     latchesMap.get(splitsType).countDown();
+    numSplitsPerType.put(splitsType, serializedSplits.size());
   }
 
   /**
@@ -123,8 +164,11 @@ public class MasterInputSplitsHandler {
    *
    * @param splitType Type of split requested
    * @param workerTaskId Id of worker who requested split
+   * @param isFirstSplit Whether this is the first split a thread is requesting,
+   *   or this request indicates that previously requested input split was done
    */
-  public void sendSplitTo(InputType splitType, int workerTaskId) {
+  public void sendSplitTo(InputType splitType, int workerTaskId,
+      boolean isFirstSplit) {
     try {
       // Make sure we don't try to retrieve splits before they were added
       latchesMap.get(splitType).await();
@@ -136,5 +180,52 @@ public class MasterInputSplitsHandler {
     masterClient.sendWritableRequest(workerTaskId,
         new ReplyWithInputSplitRequest(splitType,
             serializedInputSplit == null ? new byte[0] : serializedInputSplit));
+    if (!isFirstSplit) {
+      incrementSplitsRead(splitType);
+    }
+  }
+
+  /**
+   * Increment splits read
+   *
+   * @param splitType Type of split which was read
+   */
+  private void incrementSplitsRead(InputType splitType) {
+    int splitsRead = numSplitsReadPerType.get(splitType).incrementAndGet();
+    int splits = numSplitsPerType.get(splitType);
+    for (int i = 0; i < doneFractionsToStoreInCounters.length; i++) {
+      if (splitsRead == (int) (splits * doneFractionsToStoreInCounters[i])) {
+        splitFractionReached(
+            splitType, doneFractionsToStoreInCounters[i], context);
+      }
+    }
+  }
+
+  /**
+   * Call when we reached some fraction of split type done to set the
+   * timestamp counter
+   *
+   * @param inputType Type of input
+   * @param fraction Which fraction of input type was done reading
+   * @param context Context for accessing counters
+   */
+  private void splitFractionReached(
+      InputType inputType, double fraction, Mapper.Context context) {
+    getSplitFractionDoneTimestampCounter(inputType, fraction, context).setValue(
+        System.currentTimeMillis() - splitsCreatedTimestamp.get(inputType));
+  }
+
+  /**
+   * Get counter
+   *
+   * @param inputType Type of input for counter
+   * @param fraction Fraction for counter
+   * @param context Context to get counter from
+   * @return Counter
+   */
+  public static Counter getSplitFractionDoneTimestampCounter(
+      InputType inputType, double fraction, Mapper.Context context) {
+    return context.getCounter(inputType.name() + " input",
+        String.format("%.2f%% done time (ms)", fraction * 100));
   }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/02e73b91/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitsCallable.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitsCallable.java b/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitsCallable.java
index 40a3bb0..44b43ec 100644
--- a/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitsCallable.java
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitsCallable.java
@@ -208,15 +208,19 @@ public abstract class InputSplitsCallable<I extends WritableComparable,
   @Override
   public VertexEdgeCount call() {
     VertexEdgeCount vertexEdgeCount = new VertexEdgeCount();
-    byte[] serializedInputSplit;
     int inputSplitsProcessed = 0;
     try {
       OutOfCoreEngine oocEngine = serviceWorker.getServerData().getOocEngine();
       if (oocEngine != null) {
         oocEngine.processingThreadStart();
       }
-      while ((serializedInputSplit =
-          splitsHandler.reserveInputSplit(getInputType())) != null) {
+      while (true) {
+        byte[] serializedInputSplit = splitsHandler.reserveInputSplit(
+            getInputType(), inputSplitsProcessed == 0);
+        if (serializedInputSplit == null) {
+          // No splits left
+          break;
+        }
         // If out-of-core mechanism is used, check whether this thread
         // can stay active or it should temporarily suspend and stop
         // processing and generating more data for the moment.

http://git-wip-us.apache.org/repos/asf/giraph/blob/02e73b91/giraph-core/src/main/java/org/apache/giraph/worker/WorkerInputSplitsHandler.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/WorkerInputSplitsHandler.java
b/giraph-core/src/main/java/org/apache/giraph/worker/WorkerInputSplitsHandler.java
index 0dc42b3..d054be3 100644
--- a/giraph-core/src/main/java/org/apache/giraph/worker/WorkerInputSplitsHandler.java
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/WorkerInputSplitsHandler.java
@@ -91,12 +91,14 @@ public class WorkerInputSplitsHandler {
    * have been created.
    *
    * @param splitType Type of split
+   * @param isFirstSplit Whether this is the first split input thread reads
    * @return reserved InputSplit or null if no unfinished InputSplits exist
    */
-  public byte[] reserveInputSplit(InputType splitType) {
+  public byte[] reserveInputSplit(InputType splitType, boolean isFirstSplit) {
     // Send request
     workerClient.sendWritableRequest(masterTaskId,
-        new AskForInputSplitRequest(splitType, workerInfo.getTaskId()));
+        new AskForInputSplitRequest(
+            splitType, workerInfo.getTaskId(), isFirstSplit));
     try {
       // Wait for some split to become available
       byte[] serializedInputSplit = availableInputSplits.get(splitType).take();


Mime
View raw message