Author: yhemanth
Date: Fri Mar 6 06:47:43 2009
New Revision: 750785
URL: http://svn.apache.org/viewvc?rev=750785&view=rev
Log:
HADOOP-5338. Fix jobtracker restart to clear task completion events cached by tasktrackers
to avoid missing events. Contributed by Amar Kamat.
Modified:
hadoop/core/branches/branch-0.20/CHANGES.txt
hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/HeartbeatResponse.java
hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/InterTrackerProtocol.java
hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/JobTracker.java
hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
hadoop/core/branches/branch-0.20/src/test/org/apache/hadoop/mapred/TestJobTrackerRestart.java
Modified: hadoop/core/branches/branch-0.20/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.20/CHANGES.txt?rev=750785&r1=750784&r2=750785&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.20/CHANGES.txt (original)
+++ hadoop/core/branches/branch-0.20/CHANGES.txt Fri Mar 6 06:47:43 2009
@@ -681,6 +681,10 @@
HADOOP-5145. Balancer sometimes runs out of memory after running days or weeks.
(hairong)
+ HADOOP-5338. Fix jobtracker restart to clear task completion events cached by
+ tasktrackers forcing them to fetch all events afresh, thus avoiding missed
+ task completion events on the tasktrackers. (Amar Kamat via yhemanth)
+
Release 0.19.2 - Unreleased
BUG FIXES
Modified: hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/HeartbeatResponse.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/HeartbeatResponse.java?rev=750785&r1=750784&r2=750785&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/HeartbeatResponse.java
(original)
+++ hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/HeartbeatResponse.java
Fri Mar 6 06:47:43 2009
@@ -21,8 +21,10 @@
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
+import java.util.HashSet;
import java.util.Map;
import java.util.HashMap;
+import java.util.Set;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
@@ -39,7 +41,7 @@
short responseId;
int heartbeatInterval;
TaskTrackerAction[] actions;
- Map<JobID, Integer> lastKnownIndexMap = null;
+ Set<JobID> recoveredJobs = new HashSet<JobID>();
HeartbeatResponse() {}
@@ -57,12 +59,12 @@
return responseId;
}
- public void setLastKnownIndices(Map<JobID, Integer> lastKnownIndexMap) {
- this.lastKnownIndexMap = lastKnownIndexMap;
+ public void setRecoveredJobs(Set<JobID> ids) {
+ recoveredJobs = ids;
}
- public Map<JobID, Integer> getLastKnownIndex() {
- return lastKnownIndexMap;
+ public Set<JobID> getRecoveredJobs() {
+ return recoveredJobs;
}
public void setActions(TaskTrackerAction[] actions) {
@@ -101,17 +103,11 @@
action.write(out);
}
}
- // Write the last map event index for the jobs
- if (lastKnownIndexMap != null) {
- out.writeInt(lastKnownIndexMap.size());
- for (Map.Entry<JobID, Integer> entry : lastKnownIndexMap.entrySet()) {
- entry.getKey().write(out);
- out.writeInt(entry.getValue());
- }
- } else {
- out.writeInt(0);
+ // Write the job ids of the jobs that were recovered
+ out.writeInt(recoveredJobs.size());
+ for (JobID id : recoveredJobs) {
+ id.write(out);
}
- //ObjectWritable.writeObject(out, actions, actions.getClass(), conf);
}
public void readFields(DataInput in) throws IOException {
@@ -129,17 +125,12 @@
} else {
actions = null;
}
- // Read the last map events index of the jobs
+ // Read the job ids of the jobs that were recovered
int size = in.readInt();
- if (size != 0) {
- lastKnownIndexMap = new HashMap<JobID, Integer>(size);
- for (int i = 0; i < size; ++i) {
- JobID id = new JobID();
- id.readFields(in);
- int count = in.readInt();
- lastKnownIndexMap.put(id, count);
- }
+ for (int i = 0; i < size; ++i) {
+ JobID id = new JobID();
+ id.readFields(in);
+ recoveredJobs.add(id);
}
- //actions = (TaskTrackerAction[]) ObjectWritable.readObject(in, conf);
}
}
Modified: hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/InterTrackerProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/InterTrackerProtocol.java?rev=750785&r1=750784&r2=750785&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/InterTrackerProtocol.java
(original)
+++ hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/InterTrackerProtocol.java
Fri Mar 6 06:47:43 2009
@@ -60,8 +60,9 @@
* Version 23: Added parameter 'initialContact' again in heartbeat method
* (HADOOP-4869)
* Version 24: Changed format of Task and TaskStatus for HADOOP-4759
+ * Version 25: JobIDs are passed in response to JobTracker restart
*/
- public static final long versionID = 24L;
+ public static final long versionID = 25L;
public final static int TRACKERS_OK = 0;
public final static int UNKNOWN_TASKTRACKER = 1;
Modified: hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/JobTracker.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/JobTracker.java?rev=750785&r1=750784&r2=750785&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/JobTracker.java (original)
+++ hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/JobTracker.java Fri
Mar 6 06:47:43 2009
@@ -848,6 +848,10 @@
return jobsToRecover.size() != 0;
}
+ Set<JobID> getJobsToRecover() {
+ return jobsToRecover;
+ }
+
/** Check if the given string represents a job-id or not
*/
private boolean isJobNameValid(String str) {
@@ -1127,7 +1131,9 @@
long recoveryStartTime = System.currentTimeMillis();
// II. Recover each job
- for (JobID id : jobsToRecover) {
+ idIter = jobsToRecover.iterator();
+ while (idIter.hasNext()) {
+ JobID id = idIter.next();
JobInProgress pJob = getJob(id);
// 1. Get the required info
@@ -1169,11 +1175,9 @@
+ id + ". Ignoring it.", ioe);
}
- // 6. Inform the jobtracker as to how much of the data is recovered.
- // This is done so that TT should rollback to account for lost
- // updates
- lastSeenEventMapOnRestart.put(pJob.getStatus().getJobID(),
- pJob.getNumTaskCompletionEvents());
+ if (pJob.isComplete()) {
+ idIter.remove(); // no need to keep this job info as its successful
+ }
}
recoveryDuration = System.currentTimeMillis() - recoveryStartTime;
@@ -1198,8 +1202,6 @@
trackerExpiryQueue.add(status);
}
- // IV. Cleanup
- jobsToRecover.clear();
LOG.info("Restoration complete");
}
@@ -1277,10 +1279,6 @@
Map<String, Node> hostnameToNodeMap =
Collections.synchronizedMap(new TreeMap<String, Node>());
- // A map from JobID to the last known task-completion-event-index on restart
- Map<JobID, Integer> lastSeenEventMapOnRestart =
- new HashMap<JobID, Integer>();
-
// Number of resolved entries
int numResolved;
@@ -2375,7 +2373,7 @@
// check if the restart info is req
if (addRestartInfo) {
- response.setLastKnownIndices(lastSeenEventMapOnRestart);
+ response.setRecoveredJobs(recoveryManager.getJobsToRecover());
}
// Update the trackerToHeartbeatResponseMap
Modified: hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/TaskTracker.java?rev=750785&r1=750784&r2=750785&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
(original)
+++ hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
Fri Mar 6 06:47:43 2009
@@ -699,17 +699,14 @@
}
/**
- * Check if the number of events that are obtained are more than required.
- * If yes then purge the extra ones.
+ * Reset the events obtained so far.
*/
- public void purgeMapEvents(int lastKnownIndex) {
+ public void reset() {
// Note that the sync is first on fromEventId and then on allMapEvents
synchronized (fromEventId) {
synchronized (allMapEvents) {
- if (allMapEvents.size() > lastKnownIndex) {
- fromEventId.set(lastKnownIndex);
- allMapEvents = allMapEvents.subList(0, lastKnownIndex);
- }
+ fromEventId.set(0); // set the new index for TCE
+ allMapEvents.clear();
}
}
}
@@ -1068,19 +1065,19 @@
// Check if the map-event list needs purging
- if (heartbeatResponse.getLastKnownIndex() != null) {
+ Set<JobID> jobs = heartbeatResponse.getRecoveredJobs();
+ if (jobs.size() > 0) {
synchronized (this) {
// purge the local map events list
- for (Map.Entry<JobID, Integer> entry
- : heartbeatResponse.getLastKnownIndex().entrySet()) {
+ for (JobID job : jobs) {
RunningJob rjob;
synchronized (runningJobs) {
- rjob = runningJobs.get(entry.getKey());
+ rjob = runningJobs.get(job);
if (rjob != null) {
synchronized (rjob) {
FetchStatus f = rjob.getFetchStatus();
if (f != null) {
- f.purgeMapEvents(entry.getValue());
+ f.reset();
}
}
}
Modified: hadoop/core/branches/branch-0.20/src/test/org/apache/hadoop/mapred/TestJobTrackerRestart.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.20/src/test/org/apache/hadoop/mapred/TestJobTrackerRestart.java?rev=750785&r1=750784&r2=750785&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.20/src/test/org/apache/hadoop/mapred/TestJobTrackerRestart.java
(original)
+++ hadoop/core/branches/branch-0.20/src/test/org/apache/hadoop/mapred/TestJobTrackerRestart.java
Fri Mar 6 06:47:43 2009
@@ -25,6 +25,8 @@
import junit.framework.TestCase;
import java.io.*;
+import java.util.ArrayList;
+import java.util.List;
/**
* TestJobTrackerRestart checks if the jobtracker can restart. JobTracker
@@ -268,18 +270,6 @@
// Test if all the events that were recovered match exactly
testTaskCompletionEvents(prevEvents, jtEvents, false, numToMatch);
- TaskCompletionEvent[] trackerEvents;
- while(true) {
- trackerEvents =
- mr.getMapTaskCompletionEventsUpdates(0, id, 2 * numMaps)
- .getMapTaskCompletionEvents();
- if (trackerEvents.length < jtEvents.length) {
- UtilsForTests.waitFor(1000);
- } else {
- break;
- }
- }
-
// Check the task reports
// The reports should match exactly if the attempts are same
TaskReport[] afterMapReports = jobClient.getMapTaskReports(id);
@@ -291,13 +281,35 @@
assertEquals("Job priority change is not reflected",
JobPriority.HIGH, mr.getJobPriority(id));
+ List<TaskCompletionEvent> jtMapEvents =
+ new ArrayList<TaskCompletionEvent>();
+ for (TaskCompletionEvent tce : jtEvents) {
+ if (tce.isMapTask()) {
+ jtMapEvents.add(tce);
+ }
+ }
+
+ TaskCompletionEvent[] trackerEvents;
+ while(true) {
+ // Wait for the tracker to pull all the map events
+ trackerEvents =
+ mr.getMapTaskCompletionEventsUpdates(0, id, jtMapEvents.size())
+ .getMapTaskCompletionEvents();
+ if (trackerEvents.length < jtMapEvents.size()) {
+ UtilsForTests.waitFor(1000);
+ } else {
+ break;
+ }
+ }
+
// Signal the reduce tasks
UtilsForTests.signalTasks(dfs, fileSys, false, getMapSignalFile(shareDir),
getReduceSignalFile(shareDir));
UtilsForTests.waitTillDone(jobClient);
- testTaskCompletionEvents(jtEvents, trackerEvents, true, 2 * numMaps);
+ testTaskCompletionEvents(jtMapEvents.toArray(new TaskCompletionEvent[0]),
+ trackerEvents, true, -1);
// check if the cluster status is insane
ClusterStatus status = jobClient.getClusterStatus();
|