giraph-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From apre...@apache.org
Subject svn commit: r1373331 - in /giraph/trunk: CHANGELOG src/main/java/org/apache/giraph/graph/BspServiceMaster.java src/main/java/org/apache/giraph/graph/BspServiceWorker.java
Date Wed, 15 Aug 2012 11:03:04 GMT
Author: apresta
Date: Wed Aug 15 11:03:03 2012
New Revision: 1373331

URL: http://svn.apache.org/viewvc?rev=1373331&view=rev
Log:
GIRAPH-296: TotalNumVertices and TotalNumEdges are not saved in checkpoint. (majakabiljo via
apresta)

Modified:
    giraph/trunk/CHANGELOG
    giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceMaster.java
    giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java

Modified: giraph/trunk/CHANGELOG
URL: http://svn.apache.org/viewvc/giraph/trunk/CHANGELOG?rev=1373331&r1=1373330&r2=1373331&view=diff
==============================================================================
--- giraph/trunk/CHANGELOG (original)
+++ giraph/trunk/CHANGELOG Wed Aug 15 11:03:03 2012
@@ -2,6 +2,9 @@ Giraph Change Log
 
 Release 0.2.0 - unreleased
 
+  GIRAPH-296: TotalNumVertices and TotalNumEdges are not saved in checkpoint.
+  (majakabiljo via apresta)
+
   GIRAPH-297: Checkpointing on master is done one superstep later
   (majakabiljo via aching).
 

Modified: giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceMaster.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceMaster.java?rev=1373331&r1=1373330&r2=1373331&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceMaster.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceMaster.java Wed Aug 15 11:03:03
2012
@@ -621,6 +621,9 @@ public class BspServiceMaster<I extends 
         getCheckpointBasePath(superstep) + CHECKPOINT_FINALIZED_POSTFIX;
     DataInputStream finalizedStream =
         fs.open(new Path(finalizedCheckpointPath));
+    GlobalStats globalStats = new GlobalStats();
+    globalStats.readFields(finalizedStream);
+    updateCounters(globalStats);
     int prefixFileCount = finalizedStream.readInt();
     for (int i = 0; i < prefixFileCount; ++i) {
       String metadataFilePath =
@@ -1055,11 +1058,19 @@ public class BspServiceMaster<I extends 
     }
 
     // Format:
+    // <global statistics>
     // <number of files>
     // <used file prefix 0><used file prefix 1>...
     // <aggregator data length><aggregators as a serialized JSON byte array>
+    // <masterCompute data>
     FSDataOutputStream finalizedOutputStream =
         getFs().create(finalizedCheckpointPath);
+
+    String superstepFinishedNode =
+        getSuperstepFinishedPath(getApplicationAttempt(), superstep - 1);
+    finalizedOutputStream.write(
+        getZkExt().getData(superstepFinishedNode, false, null));
+
     finalizedOutputStream.writeInt(chosenWorkerInfoList.size());
     for (WorkerInfo chosenWorkerInfo : chosenWorkerInfoList) {
       String chosenWorkerInfoPrefix =
@@ -1507,18 +1518,7 @@ public class BspServiceMaster<I extends 
         getSuperstepFinishedPath(getApplicationAttempt(), getSuperstep());
     WritableUtils.writeToZnode(
         getZkExt(), superstepFinishedNode, -1, globalStats);
-    vertexCounter.increment(
-        globalStats.getVertexCount() -
-        vertexCounter.getValue());
-    finishedVertexCounter.increment(
-        globalStats.getFinishedVertexCount() -
-        finishedVertexCounter.getValue());
-    edgeCounter.increment(
-        globalStats.getEdgeCount() -
-        edgeCounter.getValue());
-    sentMessagesCounter.increment(
-        globalStats.getMessageCount() -
-        sentMessagesCounter.getValue());
+    updateCounters(globalStats);
 
     incrCachedSuperstep();
     // Counter starts at zero, so no need to increment
@@ -1839,4 +1839,24 @@ public class BspServiceMaster<I extends 
     }
     ((AggregatorWrapper<A>) aggregator).setCurrentAggregatedValue(value);
   }
+
+  /**
+   * Set values of counters to match the ones from {@link GlobalStats}
+   *
+   * @param globalStats Global statistics which holds new counter values
+   */
+  private void updateCounters(GlobalStats globalStats) {
+    vertexCounter.increment(
+        globalStats.getVertexCount() -
+            vertexCounter.getValue());
+    finishedVertexCounter.increment(
+        globalStats.getFinishedVertexCount() -
+            finishedVertexCounter.getValue());
+    edgeCounter.increment(
+        globalStats.getEdgeCount() -
+            edgeCounter.getValue());
+    sentMessagesCounter.increment(
+        globalStats.getMessageCount() -
+            sentMessagesCounter.getValue());
+  }
 }

Modified: giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java?rev=1373331&r1=1373330&r2=1373331&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java Wed Aug 15 11:03:03
2012
@@ -1322,6 +1322,23 @@ public class BspServiceWorker<I extends 
           workerGraphPartitioner.getPartitionOwners().size() +
           " total.");
     }
+
+    // Load global statistics
+    String finalizedCheckpointPath =
+        getCheckpointBasePath(superstep) + CHECKPOINT_FINALIZED_POSTFIX;
+    try {
+      DataInputStream finalizedStream =
+          getFs().open(new Path(finalizedCheckpointPath));
+      GlobalStats globalStats = new GlobalStats();
+      globalStats.readFields(finalizedStream);
+      getGraphMapper().getGraphState().
+          setTotalNumEdges(globalStats.getEdgeCount()).
+          setTotalNumVertices(globalStats.getVertexCount());
+    } catch (IOException e) {
+      throw new IllegalStateException(
+          "loadCheckpoint: Failed to load global statistics", e);
+    }
+
     // Communication service needs to setup the connections prior to
     // processing vertices
     commService.setup();



Mime
View raw message