Repository: giraph
Updated Branches:
refs/heads/trunk 5a04dc554 -> e4aa99d3f
Increase info-logging while waiting for straggler workers
Summary:
Keep logging info messages while waiting for task-time-out
Test Plan:
All unit tests are passing.
Manual tests to ensure desired functionality is observed.
Reviewers: maja.kabiljo
Subscribers: dionysis.logothetis
Differential Revision: https://reviews.facebook.net/D55467
Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/e4aa99d3
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/e4aa99d3
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/e4aa99d3
Branch: refs/heads/trunk
Commit: e4aa99d3f603e70c7db3a55ae5d59470c1a37f58
Parents: 5a04dc5
Author: Tyler Serdar Bulut <sbulut@fb.com>
Authored: Tue Mar 15 12:10:36 2016 -0700
Committer: Maja Kabiljo <majakabiljo@fb.com>
Committed: Thu Mar 17 09:56:00 2016 -0700
----------------------------------------------------------------------
.../apache/giraph/master/BspServiceMaster.java | 106 ++++++++++++-------
1 file changed, 65 insertions(+), 41 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/giraph/blob/e4aa99d3/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java b/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
index cc70b17..e9ece66 100644
--- a/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
+++ b/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
@@ -1275,41 +1275,47 @@ public class BspServiceMaster<I extends WritableComparable,
}
String workerInfoHealthyPath =
getWorkerInfoHealthyPath(getApplicationAttempt(), getSuperstep());
- List<String> finishedHostnameIdList;
+ List<String> finishedHostnameIdList = new ArrayList<>();
long nextInfoMillis = System.currentTimeMillis();
final int defaultTaskTimeoutMsec = 10 * 60 * 1000; // from TaskTracker
+ final int waitBetweenLogInfoMsec = 30 * 1000;
final int taskTimeoutMsec = getContext().getConfiguration().getInt(
- "mapred.task.timeout", defaultTaskTimeoutMsec);
+ "mapred.task.timeout", defaultTaskTimeoutMsec) / 2;
+ long lastRegularRunTimeMsec = 0;
+ int eventLoopTimeout = Math.min(taskTimeoutMsec, waitBetweenLogInfoMsec);
+ boolean logInfoOnlyRun = false;
List<WorkerInfo> deadWorkers = new ArrayList<>();
while (true) {
- try {
- finishedHostnameIdList =
- getZkExt().getChildrenExt(finishedWorkerPath,
- true,
- false,
- false);
- } catch (KeeperException e) {
- throw new IllegalStateException(
- "barrierOnWorkerList: KeeperException - Couldn't get " +
- "children of " + finishedWorkerPath, e);
- } catch (InterruptedException e) {
- throw new IllegalStateException(
- "barrierOnWorkerList: IllegalException - Couldn't get " +
- "children of " + finishedWorkerPath, e);
- }
- if (LOG.isDebugEnabled()) {
- LOG.debug("barrierOnWorkerList: Got finished worker list = " +
- finishedHostnameIdList + ", size = " +
- finishedHostnameIdList.size() +
- ", worker list = " +
- workerInfoList + ", size = " +
- workerInfoList.size() +
- " from " + finishedWorkerPath);
+ if (! logInfoOnlyRun) {
+ try {
+ finishedHostnameIdList =
+ getZkExt().getChildrenExt(finishedWorkerPath,
+ true,
+ false,
+ false);
+ } catch (KeeperException e) {
+ throw new IllegalStateException(
+ "barrierOnWorkerList: KeeperException - Couldn't get " +
+ "children of " + finishedWorkerPath, e);
+ } catch (InterruptedException e) {
+ throw new IllegalStateException(
+ "barrierOnWorkerList: IllegalException - Couldn't get " +
+ "children of " + finishedWorkerPath, e);
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("barrierOnWorkerList: Got finished worker list = " +
+ finishedHostnameIdList + ", size = " +
+ finishedHostnameIdList.size() +
+ ", worker list = " +
+ workerInfoList + ", size = " +
+ workerInfoList.size() +
+ " from " + finishedWorkerPath);
+ }
}
if (LOG.isInfoEnabled() &&
(System.currentTimeMillis() > nextInfoMillis)) {
- nextInfoMillis = System.currentTimeMillis() + 30000;
+ nextInfoMillis = System.currentTimeMillis() + waitBetweenLogInfoMsec;
LOG.info("barrierOnWorkerList: " +
finishedHostnameIdList.size() +
" out of " + workerInfoList.size() +
@@ -1322,29 +1328,47 @@ public class BspServiceMaster<I extends WritableComparable,
LOG.info("barrierOnWorkerList: Waiting on " + remainingWorkers);
}
}
- getContext().setStatus(getGraphTaskManager().getGraphFunctions() + " - " +
- finishedHostnameIdList.size() +
- " finished out of " +
- workerInfoList.size() +
- " on superstep " + getSuperstep());
- if (finishedHostnameIdList.containsAll(hostnameIdList)) {
- break;
- }
- for (WorkerInfo deadWorker : deadWorkers) {
- if (!finishedHostnameIdList.contains(deadWorker.getHostnameId())) {
- LOG.error("barrierOnWorkerList: no results arived from " +
- "worker that was pronounced dead: " + deadWorker +
- " on superstep " + getSuperstep());
- return false;
+ if (! logInfoOnlyRun) {
+ getContext().setStatus(getGraphTaskManager().getGraphFunctions() +
+ " - " +
+ finishedHostnameIdList.size() +
+ " finished out of " +
+ workerInfoList.size() +
+ " on superstep " + getSuperstep());
+ if (finishedHostnameIdList.containsAll(hostnameIdList)) {
+ break;
+ }
+
+ for (WorkerInfo deadWorker : deadWorkers) {
+ if (!finishedHostnameIdList.contains(deadWorker.getHostnameId())) {
+ LOG.error("barrierOnWorkerList: no results arived from " +
+ "worker that was pronounced dead: " + deadWorker +
+ " on superstep " + getSuperstep());
+ return false;
+ }
}
+
+ // wall-clock time skew is ignored
+ lastRegularRunTimeMsec = System.currentTimeMillis();
}
// Wait for a signal or timeout
- event.waitMsecs(taskTimeoutMsec / 2);
+ boolean eventTriggered = event.waitMsecs(eventLoopTimeout);
+ long elapsedTimeSinceRegularRunMsec = System.currentTimeMillis() -
+ lastRegularRunTimeMsec;
event.reset();
getContext().progress();
+ if (eventTriggered ||
+ taskTimeoutMsec == eventLoopTimeout ||
+ elapsedTimeSinceRegularRunMsec >= taskTimeoutMsec) {
+ logInfoOnlyRun = false;
+ } else {
+ logInfoOnlyRun = true;
+ continue;
+ }
+
// Did a worker die?
try {
deadWorkers.addAll(superstepChosenWorkerAlive(
|