giraph-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From m...@apache.org
Subject git commit: updated refs/heads/trunk to 39eb253
Date Wed, 21 Feb 2018 20:46:04 GMT
Repository: giraph
Updated Branches:
  refs/heads/trunk 94a3ac5fb -> 39eb2533b


JIRA-1171

closes #60


Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/39eb2533
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/39eb2533
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/39eb2533

Branch: refs/heads/trunk
Commit: 39eb2533b8f7c6c8cb17ee30d5c2005e74873ff4
Parents: 94a3ac5
Author: Maja Kabiljo <majakabiljo@fb.com>
Authored: Wed Feb 21 12:45:54 2018 -0800
Committer: Maja Kabiljo <majakabiljo@fb.com>
Committed: Wed Feb 21 12:45:54 2018 -0800

----------------------------------------------------------------------
 .../apache/giraph/graph/ComputeCallable.java    | 16 ++++++----
 .../apache/giraph/master/BspServiceMaster.java  |  3 +-
 .../giraph/master/DefaultMasterObserver.java    |  6 +++-
 .../apache/giraph/master/MasterObserver.java    |  7 ++++-
 .../ooc/data/DiskBackedPartitionStore.java      |  3 +-
 .../apache/giraph/partition/PartitionStats.java | 31 ++++++++++++++++++--
 .../apache/giraph/utils/JMapHistoDumper.java    |  6 +++-
 .../org/apache/giraph/utils/LogVersions.java    |  6 +++-
 .../giraph/utils/ReactiveJMapHistoDumper.java   |  6 +++-
 .../apache/giraph/worker/BspServiceWorker.java  |  4 ++-
 10 files changed, 72 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/giraph/blob/39eb2533/giraph-core/src/main/java/org/apache/giraph/graph/ComputeCallable.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/ComputeCallable.java b/giraph-core/src/main/java/org/apache/giraph/graph/ComputeCallable.java
index cdd9877..7e51aa0 100644
--- a/giraph-core/src/main/java/org/apache/giraph/graph/ComputeCallable.java
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/ComputeCallable.java
@@ -206,6 +206,14 @@ public class ComputeCallable<I extends WritableComparable, V extends
Writable,
             partitionStatsList.size() + " partitions, " +
             partitionStore.getNumPartitions() + " remaining " +
             MemoryUtils.getRuntimeMemoryStats());
+        long timeDoingGCWhileProcessing =
+            taskManager.getSuperstepGCTime() - startGCTime;
+        timeDoingGC += timeDoingGCWhileProcessing;
+        long timeProcessingPartition =
+            System.currentTimeMillis() - startProcessingTime -
+                timeDoingGCWhileProcessing;
+        timeProcessing += timeProcessingPartition;
+        partitionStats.setComputeMs(timeProcessingPartition);
       } catch (IOException e) {
         throw new IllegalStateException("call: Caught unexpected IOException," +
             " failing.", e);
@@ -215,11 +223,6 @@ public class ComputeCallable<I extends WritableComparable, V extends
Writable,
       } finally {
         partitionStore.putPartition(partition);
       }
-      long timeDoingGCWhileProcessing =
-          taskManager.getSuperstepGCTime() - startGCTime;
-      timeDoingGC += timeDoingGCWhileProcessing;
-      timeProcessing += System.currentTimeMillis() - startProcessingTime -
-          timeDoingGCWhileProcessing;
       histogramComputePerPartition.update(
           System.currentTimeMillis() - startTime);
     }
@@ -279,7 +282,8 @@ public class ComputeCallable<I extends WritableComparable, V extends
Writable,
       boolean ignoreExistingVertices)
       throws IOException, InterruptedException {
     PartitionStats partitionStats =
-        new PartitionStats(partition.getId(), 0, 0, 0, 0, 0);
+        new PartitionStats(partition.getId(), 0, 0, 0, 0, 0,
+            serviceWorker.getWorkerInfo().getHostnameId());
     final LongRef verticesComputedProgress = new LongRef(0);
 
     Progressable verticesProgressable = new Progressable() {

http://git-wip-us.apache.org/repos/asf/giraph/blob/39eb2533/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java b/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
index d529ffe..31d10df 100644
--- a/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
+++ b/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
@@ -985,7 +985,8 @@ public class BspServiceMaster<I extends WritableComparable,
         printAggregatedMetricsToHDFS(superstep, aggregatedMetrics);
       }
       for (MasterObserver observer : observers) {
-        observer.superstepMetricsUpdate(superstep, aggregatedMetrics);
+        observer.superstepMetricsUpdate(
+            superstep, aggregatedMetrics, allPartitionStatsList);
       }
     }
 

http://git-wip-us.apache.org/repos/asf/giraph/blob/39eb2533/giraph-core/src/main/java/org/apache/giraph/master/DefaultMasterObserver.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/master/DefaultMasterObserver.java
b/giraph-core/src/main/java/org/apache/giraph/master/DefaultMasterObserver.java
index 7854a16..b2184e6 100644
--- a/giraph-core/src/main/java/org/apache/giraph/master/DefaultMasterObserver.java
+++ b/giraph-core/src/main/java/org/apache/giraph/master/DefaultMasterObserver.java
@@ -20,6 +20,9 @@ package org.apache.giraph.master;
 
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.metrics.AggregatedMetrics;
+import org.apache.giraph.partition.PartitionStats;
+
+import java.util.List;
 
 /**
  * A no-op implementation of MasterObserver to make it easier for users.
@@ -55,5 +58,6 @@ public class DefaultMasterObserver implements MasterObserver {
 
   @Override
   public void superstepMetricsUpdate(long superstep,
-      AggregatedMetrics aggregatedMetrics) { }
+      AggregatedMetrics aggregatedMetrics,
+      List<PartitionStats> partitionStatsList) { }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/39eb2533/giraph-core/src/main/java/org/apache/giraph/master/MasterObserver.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/master/MasterObserver.java b/giraph-core/src/main/java/org/apache/giraph/master/MasterObserver.java
index 4931ea8..ba4b4a8 100644
--- a/giraph-core/src/main/java/org/apache/giraph/master/MasterObserver.java
+++ b/giraph-core/src/main/java/org/apache/giraph/master/MasterObserver.java
@@ -20,6 +20,9 @@ package org.apache.giraph.master;
 
 import org.apache.giraph.conf.ImmutableClassesGiraphConfigurable;
 import org.apache.giraph.metrics.AggregatedMetrics;
+import org.apache.giraph.partition.PartitionStats;
+
+import java.util.List;
 
 /**
  * Observer for Master.
@@ -62,7 +65,9 @@ public interface MasterObserver extends ImmutableClassesGiraphConfigurable
{
    *
    * @param superstep Supsertep number
    * @param aggregatedMetrics Metrics
+   * @param partitionStatsList List of partition stats
    */
   void superstepMetricsUpdate(
-      long superstep, AggregatedMetrics aggregatedMetrics);
+      long superstep, AggregatedMetrics aggregatedMetrics,
+      List<PartitionStats> partitionStatsList);
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/39eb2533/giraph-core/src/main/java/org/apache/giraph/ooc/data/DiskBackedPartitionStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/ooc/data/DiskBackedPartitionStore.java
b/giraph-core/src/main/java/org/apache/giraph/ooc/data/DiskBackedPartitionStore.java
index 14ba6d1..5514a4a 100644
--- a/giraph-core/src/main/java/org/apache/giraph/ooc/data/DiskBackedPartitionStore.java
+++ b/giraph-core/src/main/java/org/apache/giraph/ooc/data/DiskBackedPartitionStore.java
@@ -368,7 +368,8 @@ public class DiskBackedPartitionStore<I extends WritableComparable,
           partitionStore.getPartitionEdgeCount(partitionId));
       Partition<I, V, E> partition =
           partitionStore.removePartition(partitionId);
-      LOG.debug("Offloading partition " + partition + " DataIndex[" + index + "]");
+      LOG.debug(
+          "Offloading partition " + partition + " DataIndex[" + index + "]");
       index.addIndex(DataIndex.TypeIndexEntry.PARTITION_VERTICES);
       OutOfCoreDataAccessor.DataOutputWrapper outputWrapper =
           dataAccessor.prepareOutput(ioThreadId, index.copy(), false);

http://git-wip-us.apache.org/repos/asf/giraph/blob/39eb2533/giraph-core/src/main/java/org/apache/giraph/partition/PartitionStats.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/PartitionStats.java b/giraph-core/src/main/java/org/apache/giraph/partition/PartitionStats.java
index 624e385..fe0390d 100644
--- a/giraph-core/src/main/java/org/apache/giraph/partition/PartitionStats.java
+++ b/giraph-core/src/main/java/org/apache/giraph/partition/PartitionStats.java
@@ -40,6 +40,13 @@ public class PartitionStats implements Writable {
   private long messagesSentCount = 0;
   /** Message byetes sent from this partition */
   private long messageBytesSentCount = 0;
+  /**
+   * How long did compute take on this partition
+   * (excluding time spent in GC) (TODO and waiting on open requests)
+   */
+  private long computeMs;
+  /** Hostname and id of worker owning this partition */
+  private String workerHostnameId;
 
   /**
    * Default constructor for reflection.
@@ -55,19 +62,22 @@ public class PartitionStats implements Writable {
    * @param edgeCount Edge count.
    * @param messagesSentCount Number of messages sent
    * @param messageBytesSentCount Number of message bytes sent
+   * @param workerHostnameId Hostname and id of worker owning this partition
    */
   public PartitionStats(int partitionId,
       long vertexCount,
       long finishedVertexCount,
       long edgeCount,
       long messagesSentCount,
-      long messageBytesSentCount) {
+      long messageBytesSentCount,
+      String workerHostnameId) {
     this.partitionId = partitionId;
     this.vertexCount = vertexCount;
     this.finishedVertexCount = finishedVertexCount;
     this.edgeCount = edgeCount;
     this.messagesSentCount = messagesSentCount;
     this.messageBytesSentCount = messageBytesSentCount;
+    this.workerHostnameId = workerHostnameId;
   }
 
   /**
@@ -174,6 +184,18 @@ public class PartitionStats implements Writable {
     return messageBytesSentCount;
   }
 
+  public long getComputeMs() {
+    return computeMs;
+  }
+
+  public void setComputeMs(long computeMs) {
+    this.computeMs = computeMs;
+  }
+
+  public String getWorkerHostnameId() {
+    return workerHostnameId;
+  }
+
   @Override
   public void readFields(DataInput input) throws IOException {
     partitionId = input.readInt();
@@ -182,6 +204,8 @@ public class PartitionStats implements Writable {
     edgeCount = input.readLong();
     messagesSentCount = input.readLong();
     messageBytesSentCount = input.readLong();
+    computeMs = input.readLong();
+    workerHostnameId = input.readUTF();
   }
 
   @Override
@@ -192,6 +216,8 @@ public class PartitionStats implements Writable {
     output.writeLong(edgeCount);
     output.writeLong(messagesSentCount);
     output.writeLong(messageBytesSentCount);
+    output.writeLong(computeMs);
+    output.writeUTF(workerHostnameId);
   }
 
   @Override
@@ -199,6 +225,7 @@ public class PartitionStats implements Writable {
     return "(id=" + partitionId + ",vtx=" + vertexCount + ",finVtx=" +
         finishedVertexCount + ",edges=" + edgeCount + ",msgsSent=" +
         messagesSentCount + ",msgBytesSent=" +
-          messageBytesSentCount + ")";
+        messageBytesSentCount + ",computeMs=" + computeMs +
+        ")";
   }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/39eb2533/giraph-core/src/main/java/org/apache/giraph/utils/JMapHistoDumper.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/JMapHistoDumper.java b/giraph-core/src/main/java/org/apache/giraph/utils/JMapHistoDumper.java
index b34f926..fff63ed 100644
--- a/giraph-core/src/main/java/org/apache/giraph/utils/JMapHistoDumper.java
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/JMapHistoDumper.java
@@ -22,9 +22,12 @@ import org.apache.giraph.conf.GiraphConstants;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.master.MasterObserver;
 import org.apache.giraph.metrics.AggregatedMetrics;
+import org.apache.giraph.partition.PartitionStats;
 import org.apache.giraph.worker.WorkerObserver;
 import org.apache.log4j.Logger;
 
+import java.util.List;
+
 /**
  * An observer for both worker and master that periodically dumps the memory
  * usage using jmap tool.
@@ -102,7 +105,8 @@ public class JMapHistoDumper implements MasterObserver, WorkerObserver
{
 
   @Override
   public void superstepMetricsUpdate(long superstep,
-      AggregatedMetrics aggregatedMetrics) { }
+      AggregatedMetrics aggregatedMetrics,
+      List<PartitionStats> partitionStatsList) { }
 
   @Override
   public void applicationFailed(Exception e) { }

http://git-wip-us.apache.org/repos/asf/giraph/blob/39eb2533/giraph-core/src/main/java/org/apache/giraph/utils/LogVersions.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/LogVersions.java b/giraph-core/src/main/java/org/apache/giraph/utils/LogVersions.java
index 6fbc32b..5f717af 100644
--- a/giraph-core/src/main/java/org/apache/giraph/utils/LogVersions.java
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/LogVersions.java
@@ -20,8 +20,11 @@ package org.apache.giraph.utils;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.master.MasterObserver;
 import org.apache.giraph.metrics.AggregatedMetrics;
+import org.apache.giraph.partition.PartitionStats;
 import org.apache.giraph.worker.WorkerObserver;
 
+import java.util.List;
+
 /**
  * Logs versions of Giraph dependencies on job start.
  */
@@ -59,5 +62,6 @@ public class LogVersions implements WorkerObserver, MasterObserver {
 
   @Override
   public void superstepMetricsUpdate(long superstep,
-      AggregatedMetrics aggregatedMetrics) { }
+      AggregatedMetrics aggregatedMetrics,
+      List<PartitionStats> partitionStatsList) { }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/39eb2533/giraph-core/src/main/java/org/apache/giraph/utils/ReactiveJMapHistoDumper.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/ReactiveJMapHistoDumper.java
b/giraph-core/src/main/java/org/apache/giraph/utils/ReactiveJMapHistoDumper.java
index 190e755..230dfdf 100644
--- a/giraph-core/src/main/java/org/apache/giraph/utils/ReactiveJMapHistoDumper.java
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/ReactiveJMapHistoDumper.java
@@ -23,9 +23,12 @@ import org.apache.giraph.conf.GiraphConstants;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.master.MasterObserver;
 import org.apache.giraph.metrics.AggregatedMetrics;
+import org.apache.giraph.partition.PartitionStats;
 import org.apache.giraph.worker.WorkerObserver;
 import org.apache.log4j.Logger;
 
+import java.util.List;
+
 /**
  * An observer for both worker and master that periodically checks if available
  * memory on heap is below certain threshold, and if found to be the case
@@ -113,7 +116,8 @@ public class ReactiveJMapHistoDumper extends
 
   @Override
   public void superstepMetricsUpdate(long superstep,
-      AggregatedMetrics aggregatedMetrics) { }
+      AggregatedMetrics aggregatedMetrics,
+      List<PartitionStats> partitionStatsList) { }
 
   @Override
   public void applicationFailed(Exception e) { }

http://git-wip-us.apache.org/repos/asf/giraph/blob/39eb2533/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java b/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
index 6f02749..794e1a0 100644
--- a/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
@@ -608,7 +608,9 @@ else[HADOOP_NON_SECURE]*/
               partitionStore.getPartitionVertexCount(partitionId),
               0,
               partitionStore.getPartitionEdgeCount(partitionId),
-              0, 0);
+              0,
+              0,
+              workerInfo.getHostnameId());
       partitionStatsList.add(partitionStats);
     }
     workerGraphPartitioner.finalizePartitionStats(


Mime
View raw message