Updated Branches:
refs/heads/trunk 419445017 -> 763621a45
GIRAPH-835: org.apache.giraph.hive.input.CheckInputTest Fails because JobProgressTracker doesn't
check null (akila via majakabiljo)
Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/763621a4
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/763621a4
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/763621a4
Branch: refs/heads/trunk
Commit: 763621a45cd7812ee968f78961775f85f5e95cb2
Parents: 4194450
Author: Maja Kabiljo <majakabiljo@fb.com>
Authored: Mon Feb 3 09:18:28 2014 -0800
Committer: Maja Kabiljo <majakabiljo@fb.com>
Committed: Mon Feb 3 09:20:08 2014 -0800
----------------------------------------------------------------------
CHANGELOG | 3 +
.../apache/giraph/job/JobProgressTracker.java | 91 ++++++++++----------
2 files changed, 50 insertions(+), 44 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/giraph/blob/763621a4/CHANGELOG
----------------------------------------------------------------------
diff --git a/CHANGELOG b/CHANGELOG
index d2f5980..e21a291 100644
--- a/CHANGELOG
+++ b/CHANGELOG
@@ -1,6 +1,9 @@
Giraph Change Log
Release 1.1.0 - unreleased
+ GIRAPH-835: org.apache.giraph.hive.input.CheckInputTest Fails because JobProgressTracker
+ doesn't check null (akila via majakabiljo)
+
GIRAPH-834: Metrcis missing superstep time (armax00 via claudio)
GIRAPH-819: Number of Containers Required for a Job (Rafal Wojdyla via ereisman)
http://git-wip-us.apache.org/repos/asf/giraph/blob/763621a4/giraph-core/src/main/java/org/apache/giraph/job/JobProgressTracker.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/job/JobProgressTracker.java b/giraph-core/src/main/java/org/apache/giraph/job/JobProgressTracker.java
index f685344..a364dc4 100644
--- a/giraph-core/src/main/java/org/apache/giraph/job/JobProgressTracker.java
+++ b/giraph-core/src/main/java/org/apache/giraph/job/JobProgressTracker.java
@@ -66,7 +66,8 @@ public class JobProgressTracker implements Watcher {
final String basePath = CounterUtils.waitAndGetCounterNameFromGroup(
submittedJob, GiraphConstants.ZOOKEEPER_BASE_PATH_COUNTER_GROUP);
// Connect to ZooKeeper
- zk = new ZooKeeperExt(
+ if (zkServer != null && basePath != null) {
+ zk = new ZooKeeperExt(
zkServer,
conf.getZooKeeperSessionTimeout(),
conf.getZookeeperOpsMaxAttempts(),
@@ -77,62 +78,64 @@ public class JobProgressTracker implements Watcher {
public void progress() {
}
});
- writerThread = new Thread(new Runnable() {
- @Override
- public void run() {
- String workerProgressBasePath = basePath + BspService.WORKER_PROGRESSES;
- try {
- while (!finished) {
- if (zk.exists(workerProgressBasePath, false) != null) {
- // Get locations of all worker progresses
- List<String> workerProgressPaths = zk.getChildrenExt(
+ writerThread = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ String workerProgressBasePath = basePath +
+ BspService.WORKER_PROGRESSES;
+ try {
+ while (!finished) {
+ if (zk.exists(workerProgressBasePath, false) != null) {
+ // Get locations of all worker progresses
+ List<String> workerProgressPaths = zk.getChildrenExt(
workerProgressBasePath, false, false, true);
- List<WorkerProgress> workerProgresses =
+ List<WorkerProgress> workerProgresses =
new ArrayList<WorkerProgress>(workerProgressPaths.size());
- // Read all worker progresses
- for (String workerProgressPath : workerProgressPaths) {
- WorkerProgress workerProgress = new WorkerProgress();
- byte[] zkData = zk.getData(workerProgressPath, false, null);
- WritableUtils.readFieldsFromByteArray(zkData, workerProgress);
- workerProgresses.add(workerProgress);
- }
- // Combine and log
- CombinedWorkerProgress combinedWorkerProgress =
+ // Read all worker progresses
+ for (String workerProgressPath : workerProgressPaths) {
+ WorkerProgress workerProgress = new WorkerProgress();
+ byte[] zkData = zk.getData(workerProgressPath, false, null);
+ WritableUtils.readFieldsFromByteArray(zkData, workerProgress);
+ workerProgresses.add(workerProgress);
+ }
+ // Combine and log
+ CombinedWorkerProgress combinedWorkerProgress =
new CombinedWorkerProgress(workerProgresses);
- if (LOG.isInfoEnabled()) {
- LOG.info(combinedWorkerProgress.toString());
- }
- // Check if application is done
- if (combinedWorkerProgress.isDone(conf.getMaxWorkers())) {
- break;
+ if (LOG.isInfoEnabled()) {
+ LOG.info(combinedWorkerProgress.toString());
+ }
+ // Check if application is done
+ if (combinedWorkerProgress.isDone(conf.getMaxWorkers())) {
+ break;
+ }
}
+ Thread.sleep(UPDATE_MILLISECONDS);
}
- Thread.sleep(UPDATE_MILLISECONDS);
- }
- } catch (InterruptedException | KeeperException e) {
- if (LOG.isInfoEnabled()) {
- LOG.info("run: Exception occurred", e);
- }
- } finally {
- try {
- // Create a node so master knows we stopped communicating with
- // ZooKeeper and it's safe to cleanup
- zk.createExt(
+ } catch (InterruptedException | KeeperException e) {
+ if (LOG.isInfoEnabled()) {
+ LOG.info("run: Exception occurred", e);
+ }
+ } finally {
+ try {
+ // Create a node so master knows we stopped communicating with
+ // ZooKeeper and it's safe to cleanup
+ zk.createExt(
basePath + BspService.CLEANED_UP_DIR + "/client",
null,
ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT,
true);
- zk.close();
- } catch (InterruptedException | KeeperException e) {
- if (LOG.isInfoEnabled()) {
- LOG.info("run: Exception occurred", e);
+ zk.close();
+ } catch (InterruptedException | KeeperException e) {
+ if (LOG.isInfoEnabled()) {
+ LOG.info("run: Exception occurred", e);
+ }
}
}
}
- }
- });
- writerThread.start();
+ });
+ writerThread.start();
+ }
}
/**
|