giraph-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ach...@apache.org
Subject git commit: GIRAPH-492: Saving vertices has no status report, making it hard to find DFS issues (aching)
Date Wed, 30 Jan 2013 19:23:55 GMT
Updated Branches:
  refs/heads/trunk d3f4a4e0d -> 6fd9f12da


GIRAPH-492: Saving vertices has no status report, making it hard to
find DFS issues (aching)


Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/6fd9f12d
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/6fd9f12d
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/6fd9f12d

Branch: refs/heads/trunk
Commit: 6fd9f12da217774f15d32deeaeda3a389d44ea34
Parents: d3f4a4e
Author: aching <aching@apache.org>
Authored: Tue Jan 15 17:31:52 2013 -0800
Committer: aching <aching@apache.org>
Committed: Wed Jan 30 11:17:35 2013 -0800

----------------------------------------------------------------------
 CHANGELOG                                          |    3 +
 .../apache/giraph/benchmark/PageRankBenchmark.java |   17 +++++-
 .../org/apache/giraph/bsp/CentralizedService.java  |   12 +----
 .../giraph/bsp/CentralizedServiceMaster.java       |   10 +++
 .../giraph/bsp/CentralizedServiceWorker.java       |   10 +++
 .../giraph/graph/FinishedSuperstepStats.java       |   36 +++++++++++-
 .../org/apache/giraph/graph/GraphTaskManager.java  |   43 +++++++-------
 .../org/apache/giraph/master/BspServiceMaster.java |    2 +-
 .../java/org/apache/giraph/utils/LoggerUtils.java  |   19 ++++++
 .../org/apache/giraph/worker/BspServiceWorker.java |   44 ++++++++++++---
 .../giraph/worker/VertexInputSplitsCallable.java   |   26 +++++---
 11 files changed, 166 insertions(+), 56 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/giraph/blob/6fd9f12d/CHANGELOG
----------------------------------------------------------------------
diff --git a/CHANGELOG b/CHANGELOG
index a82ee15..5177255 100644
--- a/CHANGELOG
+++ b/CHANGELOG
@@ -1,6 +1,9 @@
 Giraph Change Log
 
 Release 0.2.0 - unreleased
+  GIRAPH-492: Saving vertices has no status report, making it hard to 
+  find DFS issues (aching)
+
   GIRAPH-312: Giraph needs an admin script (ereisman)
 
   GIRAPH-469: Refactor GraphMapper (ereisman)

http://git-wip-us.apache.org/repos/asf/giraph/blob/6fd9f12d/giraph-core/src/main/java/org/apache/giraph/benchmark/PageRankBenchmark.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/benchmark/PageRankBenchmark.java
b/giraph-core/src/main/java/org/apache/giraph/benchmark/PageRankBenchmark.java
index 3ef471a..6e49812 100644
--- a/giraph-core/src/main/java/org/apache/giraph/benchmark/PageRankBenchmark.java
+++ b/giraph-core/src/main/java/org/apache/giraph/benchmark/PageRankBenchmark.java
@@ -26,6 +26,7 @@ import org.apache.giraph.conf.GiraphConfiguration;
 import org.apache.giraph.conf.GiraphConstants;
 import org.apache.giraph.combiner.DoubleSumCombiner;
 import org.apache.giraph.graph.GiraphJob;
+import org.apache.giraph.io.formats.JsonBase64VertexOutputFormat;
 import org.apache.giraph.io.formats.PseudoRandomEdgeInputFormat;
 import org.apache.giraph.io.formats.PseudoRandomVertexInputFormat;
 import org.apache.hadoop.conf.Configuration;
@@ -97,6 +98,10 @@ public class PageRankBenchmark implements Tool {
         "combinerType",
         true,
         "Combiner type (0 for no combiner, 1 for DoubleSumCombiner (default)");
+    options.addOption("o",
+        "vertexOutputFormat",
+        true,
+        "0 for JsonBase64VertexOutputFormat");
 
     HelpFormatter formatter = new HelpFormatter();
     if (args.length == 0) {
@@ -136,6 +141,7 @@ public class PageRankBenchmark implements Tool {
     GiraphJob job = new GiraphJob(getConf(), name);
     GiraphConfiguration configuration = job.getConfiguration();
     setVertexAndInputFormatClasses(cmd, configuration);
+
     configuration.setWorkerConfiguration(workers, workers, 100.0f);
     configuration.setInt(
         PageRankComputation.SUPERSTEP_COUNT,
@@ -188,7 +194,7 @@ public class PageRankBenchmark implements Tool {
           MultiGraphRepresentativeVertexPageRankBenchmark.class);
       configuration.useUnsafeSerialization(true);
     }
-    LOG.info("Using class " +
+    LOG.info("Using vertex class " +
         configuration.get(GiraphConstants.VERTEX_CLASS));
     if (!cmd.hasOption('t') ||
         (Integer.parseInt(cmd.getOptionValue('t')) == 2)) {
@@ -217,6 +223,15 @@ public class PageRankBenchmark implements Tool {
           PseudoRandomEdgeInputFormat.EDGES_PER_VERTEX,
           Long.parseLong(cmd.getOptionValue('e')));
     }
+
+    int vertexOutputClassOption =
+        cmd.hasOption('o') ? Integer.parseInt(cmd.getOptionValue('o')) : -1;
+    if (vertexOutputClassOption == 0) {
+      LOG.info("Using vertex output format class " +
+          JsonBase64VertexOutputFormat.class.getName());
+      configuration.setVertexOutputFormatClass(
+          JsonBase64VertexOutputFormat.class);
+    }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/giraph/blob/6fd9f12d/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedService.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedService.java b/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedService.java
index 83fba57..2281903 100644
--- a/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedService.java
+++ b/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedService.java
@@ -18,13 +18,11 @@
 
 package org.apache.giraph.bsp;
 
+import java.util.List;
 import org.apache.giraph.worker.WorkerInfo;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 
-import java.io.IOException;
-import java.util.List;
-
 /**
  * Basic service interface shared by both {@link CentralizedServiceMaster} and
  * {@link CentralizedServiceWorker}.
@@ -68,12 +66,4 @@ public interface CentralizedService<I extends WritableComparable,
    * @return List of workers
    */
   List<WorkerInfo> getWorkerInfoList();
-
-  /**
-   * Clean up the service (no calls may be issued after this)
-   *
-   * @throws IOException
-   * @throws InterruptedException
-   */
-  void cleanup() throws IOException, InterruptedException;
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/6fd9f12d/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedServiceMaster.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedServiceMaster.java
b/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedServiceMaster.java
index 399dc72..5f84ece 100644
--- a/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedServiceMaster.java
+++ b/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedServiceMaster.java
@@ -139,4 +139,14 @@ public interface CentralizedServiceMaster<I extends WritableComparable,
    * @param e Exception job failed from. May be null.
    */
   void failureCleanup(Exception e);
+
+
+  /**
+   * Clean up the service (no calls may be issued after this)
+   *
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  void cleanup()
+    throws IOException, InterruptedException;
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/6fd9f12d/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java
b/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java
index 294c2c7..30d4462 100644
--- a/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java
+++ b/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java
@@ -233,4 +233,14 @@ public interface CentralizedServiceWorker<I extends WritableComparable,
    * TODO how to avoid this additional function
    */
   void prepareSuperstep();
+
+  /**
+   * Clean up the service (no calls may be issued after this)
+   *
+   * @param finishedSuperstepStats Finished supestep stats
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  void cleanup(FinishedSuperstepStats finishedSuperstepStats)
+    throws IOException, InterruptedException;
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/6fd9f12d/giraph-core/src/main/java/org/apache/giraph/graph/FinishedSuperstepStats.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/FinishedSuperstepStats.java
b/giraph-core/src/main/java/org/apache/giraph/graph/FinishedSuperstepStats.java
index d888d10..c351778 100644
--- a/giraph-core/src/main/java/org/apache/giraph/graph/FinishedSuperstepStats.java
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/FinishedSuperstepStats.java
@@ -18,27 +18,55 @@
 package org.apache.giraph.graph;
 
 /**
- * Immutable results of finishSuperste()
+ * Immutable graph stats after the completion of a superstep
  */
 public class FinishedSuperstepStats extends VertexEdgeCount {
+  /** Number of local vertices */
+  private final long localVertexCount;
   /** Are all the graph vertices halted? */
   private final boolean allVerticesHalted;
+  /** Needs to load a checkpoint */
+  private final boolean mustLoadCheckpoint;
 
   /**
    * Constructor.
    *
+   * @param numLocalVertices Number of local vertices
    * @param allVerticesHalted Are all the vertices halted
    * @param numVertices Number of vertices
    * @param numEdges Number of edges
+   * @param mustLoadCheckpoint Has to load a checkpoint?
    */
-  public FinishedSuperstepStats(boolean allVerticesHalted,
+  public FinishedSuperstepStats(long numLocalVertices,
+                                boolean allVerticesHalted,
                                 long numVertices,
-                                long numEdges) {
+                                long numEdges,
+                                boolean mustLoadCheckpoint) {
     super(numVertices, numEdges);
+    this.localVertexCount = numLocalVertices;
     this.allVerticesHalted = allVerticesHalted;
+    this.mustLoadCheckpoint = mustLoadCheckpoint;
   }
 
-  public boolean getAllVerticesHalted() {
+  public long getLocalVertexCount() {
+    return localVertexCount;
+  }
+
+  /**
+   * Are all the vertices halted?
+   *
+   * @return True if all halted, false otherwise
+   */
+  public boolean allVerticesHalted() {
     return allVerticesHalted;
   }
+
+  /**
+   * Must load the checkpoint?
+   *
+   * @return True if the checkpoint must be loaded, false otherwise
+   */
+  public boolean mustLoadCheckpoint() {
+    return mustLoadCheckpoint;
+  }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/6fd9f12d/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java b/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
index 401e07b..4ede8bb 100644
--- a/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
@@ -124,10 +124,9 @@ public class GraphTaskManager<I extends WritableComparable, V extends
Writable,
   private boolean done = false;
   /** What kind of functions is this mapper doing? */
   private GraphFunctions graphFunctions = GraphFunctions.UNKNOWN;
-  /** Total number of vertices in the graph (at this time) */
-  private long numVertices = -1;
-  /** Total number of edges in the graph (at this time) */
-  private long numEdges = -1;
+  /** Superstep stats */
+  private FinishedSuperstepStats finishedSuperstepStats =
+      new FinishedSuperstepStats(0, false, 0, 0, false);
 
   // Per-Job Metrics
   /** Timer for WorkerContext#preApplication() */
@@ -224,15 +223,14 @@ public class GraphTaskManager<I extends WritableComparable, V extends
Writable,
     if (checkTaskState()) {
       return;
     }
-    FinishedSuperstepStats inputSuperstepStats = serviceWorker.setup();
-    if (collectInputSuperstepStats(inputSuperstepStats)) {
+    finishedSuperstepStats = serviceWorker.setup();
+    if (collectInputSuperstepStats(finishedSuperstepStats)) {
       return;
     }
     WorkerAggregatorUsage aggregatorUsage =
       prepareAggregatorsAndGraphState();
     List<PartitionStats> partitionStatsList = new ArrayList<PartitionStats>();
     int numComputeThreads = conf.getNumComputeThreads();
-    FinishedSuperstepStats finishedSuperstepStats = null;
 
     // main superstep processing loop
     do {
@@ -240,7 +238,9 @@ public class GraphTaskManager<I extends WritableComparable, V extends
Writable,
       GiraphTimerContext superstepTimerContext =
         getTimerForThisSuperstep(superstep);
       GraphState<I, V, E, M> graphState =
-        new GraphState<I, V, E, M>(superstep, numVertices, numEdges,
+        new GraphState<I, V, E, M>(superstep,
+            finishedSuperstepStats.getVertexCount(),
+            finishedSuperstepStats.getEdgeCount(),
           context, this, null, aggregatorUsage);
       Collection<? extends PartitionOwner> masterAssignedPartitionOwners =
         serviceWorker.startSuperstep(graphState);
@@ -273,7 +273,7 @@ public class GraphTaskManager<I extends WritableComparable, V extends
Writable,
       finishedSuperstepStats = completeSuperstepAndCollectStats(
         partitionStatsList, superstepTimerContext, graphState);
       // END of superstep compute loop
-    } while (!finishedSuperstepStats.getAllVerticesHalted());
+    } while (!finishedSuperstepStats.allVerticesHalted());
 
     if (LOG.isInfoEnabled()) {
       LOG.info("execute: BSP application done (global vertices marked done)");
@@ -335,7 +335,9 @@ public class GraphTaskManager<I extends WritableComparable, V extends
Writable,
     WorkerAggregatorUsage aggregatorUsage) {
     serviceWorker.getWorkerContext().setGraphState(
       new GraphState<I, V, E, M>(serviceWorker.getSuperstep(),
-        numVertices, numEdges, context, this, null, aggregatorUsage));
+        finishedSuperstepStats.getVertexCount(),
+          finishedSuperstepStats.getEdgeCount(), context, this, null,
+          aggregatorUsage));
   }
 
   /**
@@ -350,11 +352,8 @@ public class GraphTaskManager<I extends WritableComparable, V extends
Writable,
     List<PartitionStats> partitionStatsList,
     GiraphTimerContext superstepTimerContext,
     GraphState<I, V, E, M> graphState) {
-    FinishedSuperstepStats finishedSuperstepStats;
     finishedSuperstepStats =
       serviceWorker.finishSuperstep(graphState, partitionStatsList);
-    numVertices = finishedSuperstepStats.getVertexCount();
-    numEdges = finishedSuperstepStats.getEdgeCount();
     superstepTimerContext.stop();
     if (conf.metricsEnabled()) {
       GiraphMetrics.get().perSuperstep().printSummary();
@@ -747,10 +746,13 @@ public class GraphTaskManager<I extends WritableComparable, V extends
Writable,
       }
       VertexEdgeCount vertexEdgeCount = serviceWorker.loadCheckpoint(
         serviceWorker.getRestartedSuperstep());
-      numVertices = vertexEdgeCount.getVertexCount();
-      numEdges = vertexEdgeCount.getEdgeCount();
-      graphState = new GraphState<I, V, E, M>(superstep, numVertices,
-        numEdges, context, this, null, aggregatorUsage);
+      finishedSuperstepStats = new FinishedSuperstepStats(0, false,
+          vertexEdgeCount.getVertexCount(), vertexEdgeCount.getEdgeCount(),
+          false);
+      graphState = new GraphState<I, V, E, M>(superstep,
+          finishedSuperstepStats.getVertexCount(),
+          finishedSuperstepStats.getEdgeCount(),
+          context, this, null, aggregatorUsage);
     } else if (serviceWorker.checkpointFrequencyMet(superstep)) {
       serviceWorker.storeCheckpoint();
     }
@@ -767,9 +769,8 @@ public class GraphTaskManager<I extends WritableComparable, V extends
Writable,
    */
   private boolean collectInputSuperstepStats(
     FinishedSuperstepStats inputSuperstepStats) {
-    numVertices = inputSuperstepStats.getVertexCount();
-    numEdges = inputSuperstepStats.getEdgeCount();
-    if (inputSuperstepStats.getVertexCount() == 0) {
+    if (inputSuperstepStats.getVertexCount() == 0 &&
+        !inputSuperstepStats.mustLoadCheckpoint()) {
       LOG.warn("map: No vertices in the graph, exiting.");
       return true;
     }
@@ -833,7 +834,7 @@ public class GraphTaskManager<I extends WritableComparable, V extends
Writable,
     }
 
     if (serviceWorker != null) {
-      serviceWorker.cleanup();
+      serviceWorker.cleanup(finishedSuperstepStats);
     }
     try {
       if (masterThread != null) {

http://git-wip-us.apache.org/repos/asf/giraph/blob/6fd9f12d/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java b/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
index 7ad2902..677ab82 100644
--- a/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
+++ b/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
@@ -461,7 +461,7 @@ public class BspServiceMaster<I extends WritableComparable,
         LOG.info("checkWorkers: Only found " + totalResponses +
             " responses of " + maxWorkers +
             " needed to start superstep " +
-            getSuperstep() + ".  Reporting every" +
+            getSuperstep() + ".  Reporting every " +
             eventWaitMsecs + " msecs, " +
             (failWorkerCheckMsecs - SystemTime.get().getMilliseconds()) +
             " more msecs left before giving up.");

http://git-wip-us.apache.org/repos/asf/giraph/blob/6fd9f12d/giraph-core/src/main/java/org/apache/giraph/utils/LoggerUtils.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/LoggerUtils.java b/giraph-core/src/main/java/org/apache/giraph/utils/LoggerUtils.java
index 81dfd1d..72b6c23 100644
--- a/giraph-core/src/main/java/org/apache/giraph/utils/LoggerUtils.java
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/LoggerUtils.java
@@ -32,6 +32,25 @@ public class LoggerUtils {
   private LoggerUtils() { }
 
   /**
+   * Helper method to set the status and log message together if condition
+   * has been been met.
+   *
+   * @param condition Must be true to write status and log
+   * @param context Context to set the status with
+   * @param logger Logger to write to
+   * @param level Level of logging
+   * @param message Message to set status with
+   */
+  public static void conditionalSetStatusAndLog(
+      boolean condition,
+      TaskAttemptContext context, Logger logger, Level level,
+      String message) {
+    if (condition) {
+      setStatusAndLog(context, logger, level, message);
+    }
+  }
+
+  /**
    * Helper method to set the status and log message together.
    *
    * @param context Context to set the status with

http://git-wip-us.apache.org/repos/asf/giraph/blob/6fd9f12d/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java b/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
index d5ad62b..f542344 100644
--- a/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
@@ -432,7 +432,7 @@ public class BspServiceWorker<I extends WritableComparable,
     // 6. Wait for superstep INPUT_SUPERSTEP to complete.
     if (getRestartedSuperstep() != UNSET_SUPERSTEP) {
       setCachedSuperstep(getRestartedSuperstep());
-      return new FinishedSuperstepStats(false, -1, -1);
+      return new FinishedSuperstepStats(0, false, 0, 0, true);
     }
 
     JSONObject jobState = getJobState();
@@ -449,7 +449,7 @@ public class BspServiceWorker<I extends WritableComparable,
                 getApplicationAttempt());
           }
           setRestartedSuperstep(getSuperstep());
-          return new FinishedSuperstepStats(false, -1, -1);
+          return new FinishedSuperstepStats(0, false, 0, 0, true);
         }
       } catch (JSONException e) {
         throw new RuntimeException(
@@ -722,8 +722,10 @@ else[HADOOP_NON_SECURE]*/
     graphState.getGraphTaskManager().notifyFinishedCommunication();
 
     long workerSentMessages = 0;
+    long localVertices = 0;
     for (PartitionStats partitionStats : partitionStatsList) {
       workerSentMessages += partitionStats.getMessagesSentCount();
+      localVertices += partitionStats.getVertexCount();
     }
 
     if (getSuperstep() != INPUT_SUPERSTEP) {
@@ -770,9 +772,11 @@ else[HADOOP_NON_SECURE]*/
         ", Superstep=" + getSuperstep());
 
     return new FinishedSuperstepStats(
+        localVertices,
         globalStats.getHaltComputation(),
         globalStats.getVertexCount(),
-        globalStats.getEdgeCount());
+        globalStats.getEdgeCount(),
+        false);
   }
 
   /**
@@ -865,9 +869,12 @@ else[HADOOP_NON_SECURE]*/
   /**
    * Save the vertices using the user-defined VertexOutputFormat from our
    * vertexArray based on the split.
+   *
+   * @param numLocalVertices Number of local vertices
    * @throws InterruptedException
    */
-  private void saveVertices() throws IOException, InterruptedException {
+  private void saveVertices(long numLocalVertices) throws IOException,
+      InterruptedException {
     if (getConfiguration().getVertexOutputFormatClass() == null) {
       LOG.warn("saveVertices: " +
           GiraphConstants.VERTEX_OUTPUT_FORMAT_CLASS +
@@ -876,19 +883,39 @@ else[HADOOP_NON_SECURE]*/
     }
 
     LoggerUtils.setStatusAndLog(getContext(), LOG, Level.INFO,
-        "saveVertices: Starting to save vertices");
+        "saveVertices: Starting to save " + numLocalVertices + " vertices");
     VertexOutputFormat<I, V, E> vertexOutputFormat =
         getConfiguration().createVertexOutputFormat();
     VertexWriter<I, V, E> vertexWriter =
         vertexOutputFormat.createVertexWriter(getContext());
     vertexWriter.initialize(getContext());
+    long verticesWritten = 0;
+    long nextPrintVertices = 0;
+    long nextPrintMsecs = System.currentTimeMillis() + 15000;
+    int partitionIndex = 0;
+    int numPartitions = getPartitionStore().getNumPartitions();
     for (Partition<I, V, E, M> partition :
         getPartitionStore().getPartitions()) {
       for (Vertex<I, V, E, M> vertex : partition) {
         getContext().progress();
         vertexWriter.writeVertex(vertex);
+        ++verticesWritten;
+
+        // Update status at most every 250k vertices or 15 seconds
+        if (verticesWritten > nextPrintVertices &&
+            System.currentTimeMillis() > nextPrintMsecs) {
+          LoggerUtils.setStatusAndLog(
+              getContext(), LOG, Level.INFO,
+              "saveVertices: Saved " +
+                  verticesWritten + " out of " + numLocalVertices +
+                  " vertices, on partition " + partitionIndex + " out of " +
+                  numPartitions);
+          nextPrintMsecs = System.currentTimeMillis() + 15000;
+          nextPrintVertices = verticesWritten + 250000;
+        }
       }
       getContext().progress();
+      ++partitionIndex;
     }
     vertexWriter.close(getContext());
     LoggerUtils.setStatusAndLog(getContext(), LOG, Level.INFO,
@@ -896,10 +923,11 @@ else[HADOOP_NON_SECURE]*/
   }
 
   @Override
-  public void cleanup() throws IOException, InterruptedException {
+  public void cleanup(FinishedSuperstepStats finishedSuperstepStats)
+    throws IOException, InterruptedException {
     workerClient.closeConnections();
     setCachedSuperstep(getSuperstep() - 1);
-    saveVertices();
+    saveVertices(finishedSuperstepStats.getLocalVertexCount());
     // All worker processes should denote they are done by adding special
     // znode.  Once the number of znodes equals the number of partitions
     // for workers and masters, the master will clean up the ZooKeeper
@@ -1037,7 +1065,7 @@ else[HADOOP_NON_SECURE]*/
           CreateMode.PERSISTENT,
           true);
     } catch (KeeperException.NodeExistsException e) {
-      LOG.warn("finishSuperstep: wrote checkpoint worker path " +
+      LOG.warn("storeCheckpoint: wrote checkpoint worker path " +
           workerWroteCheckpoint + " already exists!");
     } catch (KeeperException e) {
       throw new IllegalStateException("Creating " + workerWroteCheckpoint +

http://git-wip-us.apache.org/repos/asf/giraph/blob/6fd9f12d/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallable.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallable.java
b/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallable.java
index 7522027..a4f98e1 100644
--- a/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallable.java
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallable.java
@@ -18,17 +18,20 @@
 
 package org.apache.giraph.worker;
 
+import com.yammer.metrics.core.Counter;
+import java.io.IOException;
+import java.util.List;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.graph.GraphState;
-import org.apache.giraph.vertex.Vertex;
 import org.apache.giraph.graph.VertexEdgeCount;
 import org.apache.giraph.io.VertexInputFormat;
 import org.apache.giraph.io.VertexReader;
-import org.apache.giraph.partition.PartitionOwner;
 import org.apache.giraph.metrics.GiraphMetrics;
 import org.apache.giraph.metrics.GiraphMetricsRegistry;
+import org.apache.giraph.partition.PartitionOwner;
 import org.apache.giraph.utils.LoggerUtils;
 import org.apache.giraph.utils.MemoryUtils;
+import org.apache.giraph.vertex.Vertex;
 import org.apache.giraph.zk.ZooKeeperExt;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
@@ -37,11 +40,6 @@ import org.apache.hadoop.mapreduce.Mapper;
 import org.apache.log4j.Level;
 import org.apache.log4j.Logger;
 
-import com.yammer.metrics.core.Counter;
-
-import java.io.IOException;
-import java.util.List;
-
 /**
  * Load as many vertex input splits as possible.
  * Every thread will has its own instance of WorkerClientRequestProcessor
@@ -131,6 +129,8 @@ public class VertexInputSplitsCallable<I extends WritableComparable,
     vertexReader.initialize(inputSplit, context);
     long inputSplitVerticesLoaded = 0;
     long inputSplitEdgesLoaded = 0;
+    long nextPrintVertices = 0;
+    long nextPrintMsecs = System.currentTimeMillis() + 15000;
     while (vertexReader.nextVertex()) {
       Vertex<I, V, E, M> readerVertex =
           vertexReader.getCurrentVertex();
@@ -153,14 +153,20 @@ public class VertexInputSplitsCallable<I extends WritableComparable,
       ++inputSplitVerticesLoaded;
       inputSplitEdgesLoaded += readerVertex.getNumEdges();
 
-      // Update status every 250k vertices
-      if (((inputSplitVerticesLoaded + totalVerticesLoaded) % 250000) == 0) {
-        LoggerUtils.setStatusAndLog(context, LOG, Level.INFO,
+      // Update status at most every 250k vertices or 15 seconds
+      if ((inputSplitVerticesLoaded + totalVerticesLoaded) >
+          nextPrintVertices &&
+          System.currentTimeMillis() > nextPrintMsecs) {
+        LoggerUtils.setStatusAndLog(
+            context, LOG, Level.INFO,
             "readInputSplit: Loaded " +
                 (inputSplitVerticesLoaded + totalVerticesLoaded) +
                 " vertices " +
                 (inputSplitEdgesLoaded + totalEdgesLoaded) + " edges " +
                 MemoryUtils.getRuntimeMemoryStats());
+        nextPrintMsecs = System.currentTimeMillis() + 15000;
+        nextPrintVertices = inputSplitVerticesLoaded + totalVerticesLoaded +
+            250000;
       }
 
       // For sampling, or to limit outlier input splits, the number of


Mime
View raw message