giraph-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From m...@apache.org
Subject git commit: updated refs/heads/trunk to 31026d4
Date Tue, 20 Jan 2015 23:15:16 GMT
Repository: giraph
Updated Branches:
  refs/heads/trunk bc2babcc6 -> 31026d4d2


GIRAPH-985: Add more metrics

Summary: When limit number of open requests is on, significant part of computation can be
spent in just waiting. So adding metrics for total amount of time compute threads spent in
waiting on open requests, and histogram with compute times per partition. These should help
detecting why some job is slower than expected (is it from messaging or compute). Also changed
the way we wait on open requests, so we continue on different limit than we stop, so we wouldn't
have huge number of very tiny pauses.

Test Plan: Run some jobs on the cluster and looked through these metrics.

Reviewers: sergey.edunov, pavanka

Subscribers: ikabiljo

Differential Revision: https://reviews.facebook.net/D31683


Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/31026d4d
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/31026d4d
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/31026d4d

Branch: refs/heads/trunk
Commit: 31026d4d2a4ecc6f0af13a545d0ee633900ff2e5
Parents: bc2babc
Author: Maja Kabiljo <majakabiljo@fb.com>
Authored: Fri Jan 16 09:45:46 2015 -0800
Committer: Maja Kabiljo <majakabiljo@fb.com>
Committed: Tue Jan 20 15:12:39 2015 -0800

----------------------------------------------------------------------
 CHANGELOG                                       |  2 +
 .../org/apache/giraph/comm/WorkerClient.java    |  2 +-
 .../apache/giraph/comm/netty/NettyClient.java   | 71 ++++++++++++++------
 .../giraph/comm/netty/NettyWorkerClient.java    |  2 +-
 .../apache/giraph/graph/ComputeCallable.java    |  9 +++
 .../org/apache/giraph/metrics/MetricNames.java  |  7 ++
 .../metrics/SuperstepMetricsRegistry.java       | 11 ++-
 7 files changed, 79 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/giraph/blob/31026d4d/CHANGELOG
----------------------------------------------------------------------
diff --git a/CHANGELOG b/CHANGELOG
index 9adca87..527ac04 100644
--- a/CHANGELOG
+++ b/CHANGELOG
@@ -1,6 +1,8 @@
 Giraph Change Log
 
 Release 1.2.0 - unreleased
+  GIRAPH-985: Add more metrics (majakabiljo)
+
   GIRAPH-986: Add more stuff to TypeOps (ikabiljo via majakabiljo)
 
   GIRAPH-962: TextAggregatorWriter with frequency AT_THE_END writes in every superstep (mju
via majakabiljo)

http://git-wip-us.apache.org/repos/asf/giraph/blob/31026d4d/giraph-core/src/main/java/org/apache/giraph/comm/WorkerClient.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/WorkerClient.java b/giraph-core/src/main/java/org/apache/giraph/comm/WorkerClient.java
index 3759f6b..a84a14d 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/WorkerClient.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/WorkerClient.java
@@ -73,7 +73,7 @@ else[HADOOP_NON_SECURE]*/
    * @param destTaskId Destination worker id
    * @param request Request to send
    */
-  void sendWritableRequest(Integer destTaskId, WritableRequest request);
+  void sendWritableRequest(int destTaskId, WritableRequest request);
 
   /**
    * Wait until all the outstanding requests are completed.

http://git-wip-us.apache.org/repos/asf/giraph/blob/31026d4d/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyClient.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyClient.java b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyClient.java
index 97394bf..78e318e 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyClient.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyClient.java
@@ -30,9 +30,16 @@ import org.apache.giraph.comm.requests.RequestType;
 import org.apache.giraph.comm.requests.SaslTokenMessageRequest;
 /*end[HADOOP_NON_SECURE]*/
 import org.apache.giraph.comm.requests.WritableRequest;
+import org.apache.giraph.conf.BooleanConfOption;
+import org.apache.giraph.conf.FloatConfOption;
 import org.apache.giraph.conf.GiraphConstants;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.conf.IntConfOption;
 import org.apache.giraph.graph.TaskInfo;
+import org.apache.giraph.metrics.GiraphMetrics;
+import org.apache.giraph.metrics.MetricNames;
+import org.apache.giraph.metrics.ResetSuperstepMetricsObserver;
+import org.apache.giraph.metrics.SuperstepMetricsRegistry;
 import org.apache.giraph.utils.PipelineUtils;
 import org.apache.giraph.utils.ProgressableUtils;
 import org.apache.giraph.utils.ThreadUtils;
@@ -43,6 +50,7 @@ import org.apache.log4j.Logger;
 import com.google.common.collect.Lists;
 import com.google.common.collect.MapMaker;
 import com.google.common.collect.Maps;
+import com.yammer.metrics.core.Counter;
 
 import java.io.IOException;
 import java.net.InetSocketAddress;
@@ -84,17 +92,23 @@ import static org.apache.giraph.conf.GiraphConstants.WAITING_REQUEST_MSECS;
 /**
  * Netty client for sending requests.  Thread-safe.
  */
-public class NettyClient {
+public class NettyClient implements ResetSuperstepMetricsObserver {
   /** Do we have a limit on number of open requests we can have */
-  public static final String LIMIT_NUMBER_OF_OPEN_REQUESTS =
-      "giraph.waitForRequestsConfirmation";
-  /** Default choice about having a limit on number of open requests */
-  public static final boolean LIMIT_NUMBER_OF_OPEN_REQUESTS_DEFAULT = false;
+  public static final BooleanConfOption LIMIT_NUMBER_OF_OPEN_REQUESTS =
+      new BooleanConfOption("giraph.waitForRequestsConfirmation", false,
+          "Whether to have a limit on number of open requests or not");
   /** Maximum number of requests without confirmation we should have */
-  public static final String MAX_NUMBER_OF_OPEN_REQUESTS =
-      "giraph.maxNumberOfOpenRequests";
-  /** Default maximum number of requests without confirmation */
-  public static final int MAX_NUMBER_OF_OPEN_REQUESTS_DEFAULT = 10000;
+  public static final IntConfOption MAX_NUMBER_OF_OPEN_REQUESTS =
+      new IntConfOption("giraph.maxNumberOfOpenRequests", 10000,
+          "Maximum number of requests without confirmation we should have");
+  /**
+   * After pausing a thread due to too large number of open requests,
+   * which fraction of these requests need to be closed before we continue
+   */
+  public static final FloatConfOption
+  FRACTION_OF_REQUESTS_TO_CLOSE_BEFORE_PROCEEDING =
+      new FloatConfOption("giraph.fractionOfRequestsToCloseBeforeProceeding",
+          0.2f, "Fraction of requsts to close before proceeding");
   /** Maximum number of requests to list (for debugging) */
   public static final int MAX_REQUESTS_TO_LIST = 10;
   /**
@@ -147,6 +161,11 @@ public class NettyClient {
   private final boolean limitNumberOfOpenRequests;
   /** Maximum number of requests without confirmation we can have */
   private final int maxNumberOfOpenRequests;
+  /**
+   * Maximum number of requests that can be open after the pause in order to
+   * proceed
+   */
+  private final int numberOfRequestsToProceed;
   /** Maximum number of connection failures */
   private final int maxConnectionFailures;
   /** Maximum number of milliseconds for a request */
@@ -181,6 +200,8 @@ public class NettyClient {
    */
   private final LogOnErrorChannelFutureListener logErrorListener =
       new LogOnErrorChannelFutureListener();
+  /** Counter for time spent waiting on too many open requests */
+  private Counter timeWaitingOnOpenRequests;
 
   /**
    * Only constructor
@@ -201,34 +222,32 @@ public class NettyClient {
     sendBufferSize = CLIENT_SEND_BUFFER_SIZE.get(conf);
     receiveBufferSize = CLIENT_RECEIVE_BUFFER_SIZE.get(conf);
 
-    limitNumberOfOpenRequests = conf.getBoolean(
-        LIMIT_NUMBER_OF_OPEN_REQUESTS,
-        LIMIT_NUMBER_OF_OPEN_REQUESTS_DEFAULT);
+    limitNumberOfOpenRequests = LIMIT_NUMBER_OF_OPEN_REQUESTS.get(conf);
     if (limitNumberOfOpenRequests) {
-      maxNumberOfOpenRequests = conf.getInt(
-          MAX_NUMBER_OF_OPEN_REQUESTS,
-          MAX_NUMBER_OF_OPEN_REQUESTS_DEFAULT);
+      maxNumberOfOpenRequests = MAX_NUMBER_OF_OPEN_REQUESTS.get(conf);
+      numberOfRequestsToProceed = (int) (maxNumberOfOpenRequests *
+          (1 - FRACTION_OF_REQUESTS_TO_CLOSE_BEFORE_PROCEEDING.get(conf)));
       if (LOG.isInfoEnabled()) {
         LOG.info("NettyClient: Limit number of open requests to " +
-            maxNumberOfOpenRequests);
+            maxNumberOfOpenRequests + " and proceed when <= " +
+            numberOfRequestsToProceed);
       }
     } else {
       maxNumberOfOpenRequests = -1;
+      numberOfRequestsToProceed = 0;
     }
 
     maxRequestMilliseconds = MAX_REQUEST_MILLISECONDS.get(conf);
-
     maxConnectionFailures = NETTY_MAX_CONNECTION_FAILURES.get(conf);
-
     waitingRequestMsecs = WAITING_REQUEST_MSECS.get(conf);
-
     maxPoolSize = GiraphConstants.NETTY_CLIENT_THREADS.get(conf);
-
     maxResolveAddressAttempts = MAX_RESOLVE_ADDRESS_ATTEMPTS.get(conf);
 
     clientRequestIdRequestInfoMap =
         new MapMaker().concurrencyLevel(maxPoolSize).makeMap();
 
+    GiraphMetrics.get().addSuperstepResetObserver(this);
+
     handlerToUseExecutionGroup =
         NETTY_CLIENT_EXECUTION_AFTER_HANDLER.get(conf);
     useExecutionGroup = NETTY_CLIENT_USE_EXECUTION_HANDLER.get(conf);
@@ -354,6 +373,12 @@ public class NettyClient {
         });
   }
 
+  @Override
+  public void newSuperstep(SuperstepMetricsRegistry metrics) {
+    timeWaitingOnOpenRequests = metrics.getCounter(
+        MetricNames.TIME_SPENT_WAITING_ON_TOO_MANY_OPEN_REQUESTS_MS);
+  }
+
   /**
    * Pair object for connectAllAddresses().
    */
@@ -673,7 +698,7 @@ public class NettyClient {
    * @param destTaskId Destination task id
    * @param request Request to send
    */
-  public void sendWritableRequest(Integer destTaskId,
+  public void sendWritableRequest(int destTaskId,
       WritableRequest request) {
     InetSocketAddress remoteServer = taskIdAddressMap.get(destTaskId);
     if (clientRequestIdRequestInfoMap.isEmpty()) {
@@ -709,7 +734,9 @@ public class NettyClient {
 
     if (limitNumberOfOpenRequests &&
         clientRequestIdRequestInfoMap.size() > maxNumberOfOpenRequests) {
-      waitSomeRequests(maxNumberOfOpenRequests);
+      long startTime = System.currentTimeMillis();
+      waitSomeRequests(numberOfRequestsToProceed);
+      timeWaitingOnOpenRequests.inc(System.currentTimeMillis() - startTime);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/giraph/blob/31026d4d/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClient.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClient.java
b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClient.java
index c893a24..2a89109 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClient.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClient.java
@@ -138,7 +138,7 @@ public class NettyWorkerClient<I extends WritableComparable,
   }
 
   @Override
-  public void sendWritableRequest(Integer destTaskId,
+  public void sendWritableRequest(int destTaskId,
                                   WritableRequest request) {
     Counter counter = superstepRequestCounters.get(request.getType());
     if (counter != null) {

http://git-wip-us.apache.org/repos/asf/giraph/blob/31026d4d/giraph-core/src/main/java/org/apache/giraph/graph/ComputeCallable.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/ComputeCallable.java b/giraph-core/src/main/java/org/apache/giraph/graph/ComputeCallable.java
index 33f2255..996159f 100644
--- a/giraph-core/src/main/java/org/apache/giraph/graph/ComputeCallable.java
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/ComputeCallable.java
@@ -45,6 +45,7 @@ import org.apache.log4j.Logger;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
 import com.yammer.metrics.core.Counter;
+import com.yammer.metrics.core.Histogram;
 
 import java.io.IOException;
 import java.util.Collection;
@@ -99,6 +100,8 @@ public class ComputeCallable<I extends WritableComparable, V extends
Writable,
   private final Counter messagesSentCounter;
   /** Message bytes sent */
   private final Counter messageBytesSentCounter;
+  /** Compute time per partition */
+  private final Histogram histogramComputePerPartition;
 
   /**
    * Constructor
@@ -127,6 +130,8 @@ public class ComputeCallable<I extends WritableComparable, V extends
Writable,
     messagesSentCounter = metrics.getCounter(MetricNames.MESSAGES_SENT);
     messageBytesSentCounter =
       metrics.getCounter(MetricNames.MESSAGE_BYTES_SENT);
+    histogramComputePerPartition = metrics.getUniformHistogram(
+        MetricNames.HISTOGRAM_COMPUTE_PER_PARTITION);
   }
 
   @Override
@@ -150,6 +155,7 @@ public class ComputeCallable<I extends WritableComparable, V extends
Writable,
 
       Partition<I, V, E> partition =
           serviceWorker.getPartitionStore().getOrCreatePartition(partitionId);
+      long startTime = System.currentTimeMillis();
 
       Computation<I, V, E, M1, M2> computation =
           (Computation<I, V, E, M1, M2>) configuration.createComputation();
@@ -183,6 +189,9 @@ public class ComputeCallable<I extends WritableComparable, V extends
Writable,
       }
 
       computation.postSuperstep();
+
+      histogramComputePerPartition.update(
+          System.currentTimeMillis() - startTime);
     }
 
     // Return VertexWriter after the usage

http://git-wip-us.apache.org/repos/asf/giraph/blob/31026d4d/giraph-core/src/main/java/org/apache/giraph/metrics/MetricNames.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/metrics/MetricNames.java b/giraph-core/src/main/java/org/apache/giraph/metrics/MetricNames.java
index f731bbc..ff46198 100644
--- a/giraph-core/src/main/java/org/apache/giraph/metrics/MetricNames.java
+++ b/giraph-core/src/main/java/org/apache/giraph/metrics/MetricNames.java
@@ -61,6 +61,10 @@ public interface MetricNames {
   /** Counter for sending aggregators from worker owner to other workers */
   String SEND_AGGREGATORS_TO_WORKER_REQUESTS =
       "send-aggregators-to-worker-requests";
+
+  /** Counter for time spent waiting on too many open requests */
+  String TIME_SPENT_WAITING_ON_TOO_MANY_OPEN_REQUESTS_MS =
+      "time-spent-waiting-on-too-many-open-requests-ms";
   //////////////////////////////////////////////////////////////////////////////
   // End of Request counters per superstep
   //////////////////////////////////////////////////////////////////////////////
@@ -91,4 +95,7 @@ public interface MetricNames {
   String VERTICES_FILTERED = "vertices-filtered";
   /** Percent of vertices filtered out */
   String VERTICES_FILTERED_PCT = "vertices-filtered-pct";
+
+  /** Name of metric for compute times per partition */
+  String HISTOGRAM_COMPUTE_PER_PARTITION = "compute-per-partition-ms";
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/31026d4d/giraph-core/src/main/java/org/apache/giraph/metrics/SuperstepMetricsRegistry.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/metrics/SuperstepMetricsRegistry.java
b/giraph-core/src/main/java/org/apache/giraph/metrics/SuperstepMetricsRegistry.java
index 57e2431..3a22d69 100644
--- a/giraph-core/src/main/java/org/apache/giraph/metrics/SuperstepMetricsRegistry.java
+++ b/giraph-core/src/main/java/org/apache/giraph/metrics/SuperstepMetricsRegistry.java
@@ -21,6 +21,7 @@ package org.apache.giraph.metrics;
 import org.apache.giraph.conf.GiraphConfiguration;
 import org.apache.giraph.bsp.BspService;
 
+import com.yammer.metrics.core.Histogram;
 import com.yammer.metrics.core.Metric;
 import com.yammer.metrics.core.MetricName;
 import com.yammer.metrics.core.MetricPredicate;
@@ -111,6 +112,14 @@ public class SuperstepMetricsRegistry extends GiraphMetricsRegistry {
         return name.getType().equals(getType());
       }
     };
-    new ConsoleReporter(getInternalRegistry(), out, superstepFilter).run();
+    new ConsoleReporter(getInternalRegistry(), out, superstepFilter) {
+      @Override
+      public void processHistogram(MetricName name, Histogram histogram,
+          PrintStream stream) {
+        super.processHistogram(name, histogram, stream);
+        stream.printf("             count = %d%n", histogram.count());
+        stream.printf("               sum = %,2.2f%n", histogram.sum());
+      }
+    } .run();
   }
 }


Mime
View raw message