giraph-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From m...@apache.org
Subject git commit: updated refs/heads/trunk to 763621a
Date Mon, 03 Feb 2014 17:20:42 GMT
Updated Branches:
  refs/heads/trunk 419445017 -> 763621a45


GIRAPH-835: org.apache.giraph.hive.input.CheckInputTest Fails because JobProgressTracker doesn't
check null (akila via majakabiljo)


Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/763621a4
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/763621a4
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/763621a4

Branch: refs/heads/trunk
Commit: 763621a45cd7812ee968f78961775f85f5e95cb2
Parents: 4194450
Author: Maja Kabiljo <majakabiljo@fb.com>
Authored: Mon Feb 3 09:18:28 2014 -0800
Committer: Maja Kabiljo <majakabiljo@fb.com>
Committed: Mon Feb 3 09:20:08 2014 -0800

----------------------------------------------------------------------
 CHANGELOG                                       |  3 +
 .../apache/giraph/job/JobProgressTracker.java   | 91 ++++++++++----------
 2 files changed, 50 insertions(+), 44 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/giraph/blob/763621a4/CHANGELOG
----------------------------------------------------------------------
diff --git a/CHANGELOG b/CHANGELOG
index d2f5980..e21a291 100644
--- a/CHANGELOG
+++ b/CHANGELOG
@@ -1,6 +1,9 @@
 Giraph Change Log
 
 Release 1.1.0 - unreleased
+  GIRAPH-835: org.apache.giraph.hive.input.CheckInputTest Fails because JobProgressTracker
+  doesn't check null (akila via majakabiljo)
+
   GIRAPH-834: Metrcis missing superstep time (armax00 via claudio)
 
   GIRAPH-819: Number of Containers Required for a Job (Rafal Wojdyla via ereisman)

http://git-wip-us.apache.org/repos/asf/giraph/blob/763621a4/giraph-core/src/main/java/org/apache/giraph/job/JobProgressTracker.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/job/JobProgressTracker.java b/giraph-core/src/main/java/org/apache/giraph/job/JobProgressTracker.java
index f685344..a364dc4 100644
--- a/giraph-core/src/main/java/org/apache/giraph/job/JobProgressTracker.java
+++ b/giraph-core/src/main/java/org/apache/giraph/job/JobProgressTracker.java
@@ -66,7 +66,8 @@ public class JobProgressTracker implements Watcher {
     final String basePath = CounterUtils.waitAndGetCounterNameFromGroup(
         submittedJob, GiraphConstants.ZOOKEEPER_BASE_PATH_COUNTER_GROUP);
     // Connect to ZooKeeper
-    zk = new ZooKeeperExt(
+    if (zkServer != null && basePath != null) {
+      zk = new ZooKeeperExt(
         zkServer,
         conf.getZooKeeperSessionTimeout(),
         conf.getZookeeperOpsMaxAttempts(),
@@ -77,62 +78,64 @@ public class JobProgressTracker implements Watcher {
           public void progress() {
           }
         });
-    writerThread = new Thread(new Runnable() {
-      @Override
-      public void run() {
-        String workerProgressBasePath = basePath + BspService.WORKER_PROGRESSES;
-        try {
-          while (!finished) {
-            if (zk.exists(workerProgressBasePath, false) != null) {
-              // Get locations of all worker progresses
-              List<String> workerProgressPaths = zk.getChildrenExt(
+      writerThread = new Thread(new Runnable() {
+        @Override
+        public void run() {
+          String workerProgressBasePath = basePath +
+            BspService.WORKER_PROGRESSES;
+          try {
+            while (!finished) {
+              if (zk.exists(workerProgressBasePath, false) != null) {
+                // Get locations of all worker progresses
+                List<String> workerProgressPaths = zk.getChildrenExt(
                   workerProgressBasePath, false, false, true);
-              List<WorkerProgress> workerProgresses =
+                List<WorkerProgress> workerProgresses =
                   new ArrayList<WorkerProgress>(workerProgressPaths.size());
-              // Read all worker progresses
-              for (String workerProgressPath : workerProgressPaths) {
-                WorkerProgress workerProgress = new WorkerProgress();
-                byte[] zkData = zk.getData(workerProgressPath, false, null);
-                WritableUtils.readFieldsFromByteArray(zkData, workerProgress);
-                workerProgresses.add(workerProgress);
-              }
-              // Combine and log
-              CombinedWorkerProgress combinedWorkerProgress =
+                // Read all worker progresses
+                for (String workerProgressPath : workerProgressPaths) {
+                  WorkerProgress workerProgress = new WorkerProgress();
+                  byte[] zkData = zk.getData(workerProgressPath, false, null);
+                  WritableUtils.readFieldsFromByteArray(zkData, workerProgress);
+                  workerProgresses.add(workerProgress);
+                }
+                // Combine and log
+                CombinedWorkerProgress combinedWorkerProgress =
                   new CombinedWorkerProgress(workerProgresses);
-              if (LOG.isInfoEnabled()) {
-                LOG.info(combinedWorkerProgress.toString());
-              }
-              // Check if application is done
-              if (combinedWorkerProgress.isDone(conf.getMaxWorkers())) {
-                break;
+                if (LOG.isInfoEnabled()) {
+                  LOG.info(combinedWorkerProgress.toString());
+                }
+                // Check if application is done
+                if (combinedWorkerProgress.isDone(conf.getMaxWorkers())) {
+                  break;
+                }
               }
+              Thread.sleep(UPDATE_MILLISECONDS);
             }
-            Thread.sleep(UPDATE_MILLISECONDS);
-          }
-        } catch (InterruptedException | KeeperException e) {
-          if (LOG.isInfoEnabled()) {
-            LOG.info("run: Exception occurred", e);
-          }
-        } finally {
-          try {
-            // Create a node so master knows we stopped communicating with
-            // ZooKeeper and it's safe to cleanup
-            zk.createExt(
+          } catch (InterruptedException | KeeperException e) {
+            if (LOG.isInfoEnabled()) {
+              LOG.info("run: Exception occurred", e);
+            }
+          } finally {
+            try {
+              // Create a node so master knows we stopped communicating with
+              // ZooKeeper and it's safe to cleanup
+              zk.createExt(
                 basePath + BspService.CLEANED_UP_DIR + "/client",
                 null,
                 ZooDefs.Ids.OPEN_ACL_UNSAFE,
                 CreateMode.PERSISTENT,
                 true);
-            zk.close();
-          } catch (InterruptedException | KeeperException e) {
-            if (LOG.isInfoEnabled()) {
-              LOG.info("run: Exception occurred", e);
+              zk.close();
+            } catch (InterruptedException | KeeperException e) {
+              if (LOG.isInfoEnabled()) {
+                LOG.info("run: Exception occurred", e);
+              }
             }
           }
         }
-      }
-    });
-    writerThread.start();
+      });
+      writerThread.start();
+    }
   }
 
   /**


Mime
View raw message