Return-Path: X-Original-To: apmail-giraph-commits-archive@www.apache.org Delivered-To: apmail-giraph-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 284D7100FD for ; Mon, 3 Feb 2014 17:20:48 +0000 (UTC) Received: (qmail 86537 invoked by uid 500); 3 Feb 2014 17:20:45 -0000 Delivered-To: apmail-giraph-commits-archive@giraph.apache.org Received: (qmail 85799 invoked by uid 500); 3 Feb 2014 17:20:43 -0000 Mailing-List: contact commits-help@giraph.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@giraph.apache.org Delivered-To: mailing list commits@giraph.apache.org Received: (qmail 85792 invoked by uid 99); 3 Feb 2014 17:20:43 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 03 Feb 2014 17:20:43 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id BE2CC91809D; Mon, 3 Feb 2014 17:20:42 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: maja@apache.org To: commits@giraph.apache.org Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: git commit: updated refs/heads/trunk to 763621a Date: Mon, 3 Feb 2014 17:20:42 +0000 (UTC) Updated Branches: refs/heads/trunk 419445017 -> 763621a45 GIRAPH-835: org.apache.giraph.hive.input.CheckInputTest Fails because JobProgressTracker doesn't check null (akila via majakabiljo) Project: http://git-wip-us.apache.org/repos/asf/giraph/repo Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/763621a4 Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/763621a4 Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/763621a4 Branch: refs/heads/trunk Commit: 763621a45cd7812ee968f78961775f85f5e95cb2 Parents: 4194450 Author: Maja Kabiljo Authored: Mon Feb 3 09:18:28 2014 -0800 Committer: Maja Kabiljo Committed: Mon Feb 3 09:20:08 2014 -0800 ---------------------------------------------------------------------- CHANGELOG | 3 + .../apache/giraph/job/JobProgressTracker.java | 91 ++++++++++---------- 2 files changed, 50 insertions(+), 44 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/giraph/blob/763621a4/CHANGELOG ---------------------------------------------------------------------- diff --git a/CHANGELOG b/CHANGELOG index d2f5980..e21a291 100644 --- a/CHANGELOG +++ b/CHANGELOG @@ -1,6 +1,9 @@ Giraph Change Log Release 1.1.0 - unreleased + GIRAPH-835: org.apache.giraph.hive.input.CheckInputTest Fails because JobProgressTracker + doesn't check null (akila via majakabiljo) + GIRAPH-834: Metrcis missing superstep time (armax00 via claudio) GIRAPH-819: Number of Containers Required for a Job (Rafal Wojdyla via ereisman) http://git-wip-us.apache.org/repos/asf/giraph/blob/763621a4/giraph-core/src/main/java/org/apache/giraph/job/JobProgressTracker.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/job/JobProgressTracker.java b/giraph-core/src/main/java/org/apache/giraph/job/JobProgressTracker.java index f685344..a364dc4 100644 --- a/giraph-core/src/main/java/org/apache/giraph/job/JobProgressTracker.java +++ b/giraph-core/src/main/java/org/apache/giraph/job/JobProgressTracker.java @@ -66,7 +66,8 @@ public class JobProgressTracker implements Watcher { final String basePath = CounterUtils.waitAndGetCounterNameFromGroup( submittedJob, GiraphConstants.ZOOKEEPER_BASE_PATH_COUNTER_GROUP); // Connect to ZooKeeper - zk = new ZooKeeperExt( + if (zkServer != null && basePath != null) { + zk = new ZooKeeperExt( zkServer, conf.getZooKeeperSessionTimeout(), conf.getZookeeperOpsMaxAttempts(), @@ -77,62 +78,64 @@ public class JobProgressTracker implements Watcher { public void progress() { } }); - writerThread = new Thread(new Runnable() { - @Override - public void run() { - String workerProgressBasePath = basePath + BspService.WORKER_PROGRESSES; - try { - while (!finished) { - if (zk.exists(workerProgressBasePath, false) != null) { - // Get locations of all worker progresses - List workerProgressPaths = zk.getChildrenExt( + writerThread = new Thread(new Runnable() { + @Override + public void run() { + String workerProgressBasePath = basePath + + BspService.WORKER_PROGRESSES; + try { + while (!finished) { + if (zk.exists(workerProgressBasePath, false) != null) { + // Get locations of all worker progresses + List workerProgressPaths = zk.getChildrenExt( workerProgressBasePath, false, false, true); - List workerProgresses = + List workerProgresses = new ArrayList(workerProgressPaths.size()); - // Read all worker progresses - for (String workerProgressPath : workerProgressPaths) { - WorkerProgress workerProgress = new WorkerProgress(); - byte[] zkData = zk.getData(workerProgressPath, false, null); - WritableUtils.readFieldsFromByteArray(zkData, workerProgress); - workerProgresses.add(workerProgress); - } - // Combine and log - CombinedWorkerProgress combinedWorkerProgress = + // Read all worker progresses + for (String workerProgressPath : workerProgressPaths) { + WorkerProgress workerProgress = new WorkerProgress(); + byte[] zkData = zk.getData(workerProgressPath, false, null); + WritableUtils.readFieldsFromByteArray(zkData, workerProgress); + workerProgresses.add(workerProgress); + } + // Combine and log + CombinedWorkerProgress combinedWorkerProgress = new CombinedWorkerProgress(workerProgresses); - if (LOG.isInfoEnabled()) { - LOG.info(combinedWorkerProgress.toString()); - } - // Check if application is done - if (combinedWorkerProgress.isDone(conf.getMaxWorkers())) { - break; + if (LOG.isInfoEnabled()) { + LOG.info(combinedWorkerProgress.toString()); + } + // Check if application is done + if (combinedWorkerProgress.isDone(conf.getMaxWorkers())) { + break; + } } + Thread.sleep(UPDATE_MILLISECONDS); } - Thread.sleep(UPDATE_MILLISECONDS); - } - } catch (InterruptedException | KeeperException e) { - if (LOG.isInfoEnabled()) { - LOG.info("run: Exception occurred", e); - } - } finally { - try { - // Create a node so master knows we stopped communicating with - // ZooKeeper and it's safe to cleanup - zk.createExt( + } catch (InterruptedException | KeeperException e) { + if (LOG.isInfoEnabled()) { + LOG.info("run: Exception occurred", e); + } + } finally { + try { + // Create a node so master knows we stopped communicating with + // ZooKeeper and it's safe to cleanup + zk.createExt( basePath + BspService.CLEANED_UP_DIR + "/client", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, true); - zk.close(); - } catch (InterruptedException | KeeperException e) { - if (LOG.isInfoEnabled()) { - LOG.info("run: Exception occurred", e); + zk.close(); + } catch (InterruptedException | KeeperException e) { + if (LOG.isInfoEnabled()) { + LOG.info("run: Exception occurred", e); + } } } } - } - }); - writerThread.start(); + }); + writerThread.start(); + } } /**