giraph-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From m...@apache.org
Subject git commit: updated refs/heads/trunk to a1d546f
Date Fri, 17 Mar 2017 17:41:10 GMT
Repository: giraph
Updated Branches:
  refs/heads/trunk 3dbd158c7 -> a1d546f7a


JIRA-1134

closes #24


Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/a1d546f7
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/a1d546f7
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/a1d546f7

Branch: refs/heads/trunk
Commit: a1d546f7a39df6ffb7d3ed9d3bcc47126ab2e579
Parents: 3dbd158
Author: Maja Kabiljo <majakabiljo@fb.com>
Authored: Fri Mar 17 10:40:53 2017 -0700
Committer: Maja Kabiljo <majakabiljo@fb.com>
Committed: Fri Mar 17 10:40:53 2017 -0700

----------------------------------------------------------------------
 .../graph/JobProgressTrackerClientNoOp.java     |  5 ++
 .../RetryableJobProgressTrackerClient.java      | 11 +++
 .../giraph/job/CombinedWorkerProgress.java      | 32 +++++--
 .../job/DefaultJobProgressTrackerService.java   | 13 ++-
 .../apache/giraph/job/JobProgressTracker.java   | 10 +++
 .../apache/giraph/master/BspServiceMaster.java  | 32 ++++---
 .../apache/giraph/master/MasterProgress.java    | 90 ++++++++++++++++++++
 7 files changed, 172 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/giraph/blob/a1d546f7/giraph-core/src/main/java/org/apache/giraph/graph/JobProgressTrackerClientNoOp.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/JobProgressTrackerClientNoOp.java
b/giraph-core/src/main/java/org/apache/giraph/graph/JobProgressTrackerClientNoOp.java
index 369941f..e699bfb 100644
--- a/giraph-core/src/main/java/org/apache/giraph/graph/JobProgressTrackerClientNoOp.java
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/JobProgressTrackerClientNoOp.java
@@ -18,6 +18,7 @@
 
 package org.apache.giraph.graph;
 
+import org.apache.giraph.master.MasterProgress;
 import org.apache.giraph.worker.WorkerProgress;
 
 /**
@@ -48,4 +49,8 @@ public class JobProgressTrackerClientNoOp implements JobProgressTrackerClient
{
   @Override
   public void updateProgress(WorkerProgress workerProgress) {
   }
+
+  @Override
+  public void updateMasterProgress(MasterProgress masterProgress) {
+  }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/a1d546f7/giraph-core/src/main/java/org/apache/giraph/graph/RetryableJobProgressTrackerClient.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/RetryableJobProgressTrackerClient.java
b/giraph-core/src/main/java/org/apache/giraph/graph/RetryableJobProgressTrackerClient.java
index 21204bd..a7ac055 100644
--- a/giraph-core/src/main/java/org/apache/giraph/graph/RetryableJobProgressTrackerClient.java
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/RetryableJobProgressTrackerClient.java
@@ -21,6 +21,7 @@ package org.apache.giraph.graph;
 import org.apache.giraph.conf.GiraphConfiguration;
 import org.apache.giraph.job.ClientThriftServer;
 import org.apache.giraph.job.JobProgressTracker;
+import org.apache.giraph.master.MasterProgress;
 import org.apache.giraph.worker.WorkerProgress;
 import org.apache.log4j.Logger;
 
@@ -152,6 +153,16 @@ public class RetryableJobProgressTrackerClient
     });
   }
 
+  @Override
+  public void updateMasterProgress(final MasterProgress masterProgress) {
+    executeWithRetry(new Runnable() {
+      @Override
+      public void run() {
+        jobProgressTracker.updateMasterProgress(masterProgress);
+      }
+    });
+  }
+
   /**
    * Execute Runnable, if disconnected try to connect again and retry
    *

http://git-wip-us.apache.org/repos/asf/giraph/blob/a1d546f7/giraph-core/src/main/java/org/apache/giraph/job/CombinedWorkerProgress.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/job/CombinedWorkerProgress.java b/giraph-core/src/main/java/org/apache/giraph/job/CombinedWorkerProgress.java
index e265163..8cc16ec 100644
--- a/giraph-core/src/main/java/org/apache/giraph/job/CombinedWorkerProgress.java
+++ b/giraph-core/src/main/java/org/apache/giraph/job/CombinedWorkerProgress.java
@@ -20,6 +20,7 @@ package org.apache.giraph.job;
 
 import com.google.common.collect.Iterables;
 import org.apache.giraph.conf.FloatConfOption;
+import org.apache.giraph.master.MasterProgress;
 import org.apache.giraph.worker.WorkerProgress;
 import org.apache.giraph.worker.WorkerProgressStats;
 import org.apache.hadoop.conf.Configuration;
@@ -71,15 +72,19 @@ public class CombinedWorkerProgress extends WorkerProgressStats {
   private int minGraphPercentageInMemory = 100;
   /** Id of the worker with min percentage of graph in memory */
   private int workerWithMinGraphPercentageInMemory = -1;
+  /** Master progress */
+  private MasterProgress masterProgress;
 
   /**
    * Constructor
    *
    * @param workerProgresses Worker progresses to combine
+   * @param masterProgress Master progress
    * @param conf Configuration
    */
   public CombinedWorkerProgress(Iterable<WorkerProgress> workerProgresses,
-      Configuration conf) {
+      MasterProgress masterProgress, Configuration conf) {
+    this.masterProgress = masterProgress;
     normalFreeMemoryFraction = NORMAL_FREE_MEMORY_FRACTION.get(conf);
     for (WorkerProgress workerProgress : workerProgresses) {
       if (workerProgress.getCurrentSuperstep() > currentSuperstep) {
@@ -151,11 +156,26 @@ public class CombinedWorkerProgress extends WorkerProgressStats {
     sb.append("Data from ").append(workersInSuperstep).append(" workers - ");
     if (isInputSuperstep()) {
       sb.append("Loading data: ");
-      sb.append(verticesLoaded).append(" vertices loaded, ");
-      sb.append(vertexInputSplitsLoaded).append(
-          " vertex input splits loaded; ");
-      sb.append(edgesLoaded).append(" edges loaded, ");
-      sb.append(edgeInputSplitsLoaded).append(" edge input splits loaded");
+      if (!masterProgress.vertexInputSplitsSet() ||
+          masterProgress.getVertexInputSplitCount() > 0) {
+        sb.append(verticesLoaded).append(" vertices loaded, ");
+        sb.append(vertexInputSplitsLoaded).append(
+            " vertex input splits loaded");
+        if (masterProgress.getVertexInputSplitCount() > 0) {
+          sb.append(" (out of ").append(
+              masterProgress.getVertexInputSplitCount()).append(")");
+        }
+        sb.append("; ");
+      }
+      if (!masterProgress.edgeInputSplitsSet() ||
+          masterProgress.getEdgeInputSplitsCount() > 0) {
+        sb.append(edgesLoaded).append(" edges loaded, ");
+        sb.append(edgeInputSplitsLoaded).append(" edge input splits loaded");
+        if (masterProgress.getEdgeInputSplitsCount() > 0) {
+          sb.append(" (out of ").append(
+              masterProgress.getEdgeInputSplitsCount()).append(")");
+        }
+      }
     } else if (isComputeSuperstep()) {
       sb.append("Compute superstep ").append(currentSuperstep).append(": ");
       sb.append(verticesComputed).append(" out of ").append(

http://git-wip-us.apache.org/repos/asf/giraph/blob/a1d546f7/giraph-core/src/main/java/org/apache/giraph/job/DefaultJobProgressTrackerService.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/job/DefaultJobProgressTrackerService.java
b/giraph-core/src/main/java/org/apache/giraph/job/DefaultJobProgressTrackerService.java
index 9e836dc..ccd0fba 100644
--- a/giraph-core/src/main/java/org/apache/giraph/job/DefaultJobProgressTrackerService.java
+++ b/giraph-core/src/main/java/org/apache/giraph/job/DefaultJobProgressTrackerService.java
@@ -20,6 +20,7 @@ package org.apache.giraph.job;
 
 import org.apache.giraph.conf.GiraphConfiguration;
 import org.apache.giraph.conf.GiraphConstants;
+import org.apache.giraph.master.MasterProgress;
 import org.apache.giraph.utils.ThreadUtils;
 import org.apache.giraph.worker.WorkerProgress;
 import org.apache.hadoop.mapreduce.Job;
@@ -28,6 +29,7 @@ import org.apache.log4j.Logger;
 import java.io.IOException;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicReference;
 
 /**
  * Default implementation of JobProgressTrackerService
@@ -55,6 +57,9 @@ public class DefaultJobProgressTrackerService
   /** Map of worker progresses */
   private final Map<Integer, WorkerProgress> workerProgresses =
       new ConcurrentHashMap<>();
+  /** Master progress */
+  private final AtomicReference<MasterProgress> masterProgress =
+      new AtomicReference<>(new MasterProgress());
   /** Job */
   private Job job;
 
@@ -81,7 +86,8 @@ public class DefaultJobProgressTrackerService
               !workerProgresses.isEmpty()) {
             // Combine and log
             CombinedWorkerProgress combinedWorkerProgress =
-                new CombinedWorkerProgress(workerProgresses.values(), conf);
+                new CombinedWorkerProgress(workerProgresses.values(),
+                    masterProgress.get(), conf);
             if (LOG.isInfoEnabled()) {
               LOG.info(combinedWorkerProgress.toString());
             }
@@ -173,6 +179,11 @@ public class DefaultJobProgressTrackerService
   }
 
   @Override
+  public void updateMasterProgress(MasterProgress masterProgress) {
+    this.masterProgress.set(masterProgress);
+  }
+
+  @Override
   public void stop(boolean succeeded) {
     finished = true;
     writerThread.interrupt();

http://git-wip-us.apache.org/repos/asf/giraph/blob/a1d546f7/giraph-core/src/main/java/org/apache/giraph/job/JobProgressTracker.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/job/JobProgressTracker.java b/giraph-core/src/main/java/org/apache/giraph/job/JobProgressTracker.java
index 3041d08..92e35b8 100644
--- a/giraph-core/src/main/java/org/apache/giraph/job/JobProgressTracker.java
+++ b/giraph-core/src/main/java/org/apache/giraph/job/JobProgressTracker.java
@@ -20,6 +20,8 @@ package org.apache.giraph.job;
 
 import com.facebook.swift.service.ThriftMethod;
 import com.facebook.swift.service.ThriftService;
+
+import org.apache.giraph.master.MasterProgress;
 import org.apache.giraph.worker.WorkerProgress;
 
 /**
@@ -64,5 +66,13 @@ public interface JobProgressTracker {
    */
   @ThriftMethod
   void updateProgress(WorkerProgress workerProgress);
+
+  /**
+   * Master should call this method to update its progress
+   *
+   * @param masterProgress Progress of the master
+   */
+  @ThriftMethod
+  void updateMasterProgress(MasterProgress masterProgress);
 }
 

http://git-wip-us.apache.org/repos/asf/giraph/blob/a1d546f7/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java b/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
index da1374a..6c930cd 100644
--- a/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
+++ b/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
@@ -659,24 +659,28 @@ public class BspServiceMaster<I extends WritableComparable,
 
   @Override
   public int createVertexInputSplits() {
-    // Short-circuit if there is no vertex input format
-    if (!getConfiguration().hasVertexInputFormat()) {
-      return 0;
-    }
-    VertexInputFormat<I, V, E> vertexInputFormat =
-        getConfiguration().createWrappedVertexInputFormat();
-    return createInputSplits(vertexInputFormat, InputType.VERTEX);
+    int splits = 0;
+    if (getConfiguration().hasVertexInputFormat()) {
+      VertexInputFormat<I, V, E> vertexInputFormat =
+          getConfiguration().createWrappedVertexInputFormat();
+      splits = createInputSplits(vertexInputFormat, InputType.VERTEX);
+    }
+    MasterProgress.get().setVertexInputSplitCount(splits);
+    getJobProgressTracker().updateMasterProgress(MasterProgress.get());
+    return splits;
   }
 
   @Override
   public int createEdgeInputSplits() {
-    // Short-circuit if there is no edge input format
-    if (!getConfiguration().hasEdgeInputFormat()) {
-      return 0;
-    }
-    EdgeInputFormat<I, E> edgeInputFormat =
-        getConfiguration().createWrappedEdgeInputFormat();
-    return createInputSplits(edgeInputFormat, InputType.EDGE);
+    int splits = 0;
+    if (getConfiguration().hasEdgeInputFormat()) {
+      EdgeInputFormat<I, E> edgeInputFormat =
+          getConfiguration().createWrappedEdgeInputFormat();
+      splits = createInputSplits(edgeInputFormat, InputType.EDGE);
+    }
+    MasterProgress.get().setEdgeInputSplitsCount(splits);
+    getJobProgressTracker().updateMasterProgress(MasterProgress.get());
+    return splits;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/giraph/blob/a1d546f7/giraph-core/src/main/java/org/apache/giraph/master/MasterProgress.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/master/MasterProgress.java b/giraph-core/src/main/java/org/apache/giraph/master/MasterProgress.java
new file mode 100644
index 0000000..89dc4d3
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/master/MasterProgress.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.master;
+
+import com.facebook.swift.codec.ThriftField;
+import com.facebook.swift.codec.ThriftStruct;
+
+/**
+ * Stores information about master progress
+ */
+@ThriftStruct
+public final class MasterProgress {
+  /** Singleton instance for everyone to use */
+  private static final MasterProgress INSTANCE = new MasterProgress();
+
+  /** How many vertex input splits were created */
+  private int vertexInputSplitCount = -1;
+  /** How many edge input splits were created */
+  private int edgeInputSplitCount = -1;
+
+  /**
+   * Public constructor for thrift to create us.
+   * Please use MasterProgress.get() to get the static instance.
+   */
+  public MasterProgress() {
+  }
+
+  /**
+   * Get singleton instance of MasterProgress.
+   *
+   * @return MasterProgress singleton instance
+   */
+  public static MasterProgress get() {
+    return INSTANCE;
+  }
+
+  @ThriftField(1)
+  public int getVertexInputSplitCount() {
+    return vertexInputSplitCount;
+  }
+
+  @ThriftField
+  public void setVertexInputSplitCount(int vertexInputSplitCount) {
+    this.vertexInputSplitCount = vertexInputSplitCount;
+  }
+
+  @ThriftField(2)
+  public int getEdgeInputSplitsCount() {
+    return edgeInputSplitCount;
+  }
+
+  @ThriftField
+  public void setEdgeInputSplitsCount(int edgeInputSplitCount) {
+    this.edgeInputSplitCount = edgeInputSplitCount;
+  }
+
+  /**
+   * Whether or not number of vertex input splits was set yet
+   *
+   * @return True iff it was set
+   */
+  public boolean vertexInputSplitsSet() {
+    return vertexInputSplitCount != -1;
+  }
+
+  /**
+   * Whether or not number of edge input splits was set yet
+   *
+   * @return True iff it was set
+   */
+  public boolean edgeInputSplitsSet() {
+    return edgeInputSplitCount != -1;
+  }
+}


Mime
View raw message