hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From acmur...@apache.org
Subject svn commit: r1471637 - in /hadoop/common/branches/branch-1.2: CHANGES.txt src/mapred/org/apache/hadoop/mapred/LocalJobRunner.java
Date Wed, 24 Apr 2013 20:19:34 GMT
Author: acmurthy
Date: Wed Apr 24 20:19:34 2013
New Revision: 1471637

URL: http://svn.apache.org/r1471637
Log:
Merge -c 1471635 from branch-1 to branch-1.2 to fix MAPREDUCE-5166. Fix ConcurrentModificationException
due to insufficient synchronization on updates to task Counters. Contributed by Sandy Ryza.

Modified:
    hadoop/common/branches/branch-1.2/CHANGES.txt
    hadoop/common/branches/branch-1.2/src/mapred/org/apache/hadoop/mapred/LocalJobRunner.java

Modified: hadoop/common/branches/branch-1.2/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1.2/CHANGES.txt?rev=1471637&r1=1471636&r2=1471637&view=diff
==============================================================================
--- hadoop/common/branches/branch-1.2/CHANGES.txt (original)
+++ hadoop/common/branches/branch-1.2/CHANGES.txt Wed Apr 24 20:19:34 2013
@@ -585,6 +585,9 @@ Release 1.2.0 - 2013.04.16
     jobToken file are saved correctly to prevent race-condition between job
     submission and initialization. (acmurthy)
 
+    MAPREDUCE-5166. Fix ConcurrentModificationException due to insufficient
+    synchronization on updates to task Counters. (Sandy Ryza via acmurthy)
+
 Release 1.1.2 - 2013.01.30
 
   INCOMPATIBLE CHANGES

Modified: hadoop/common/branches/branch-1.2/src/mapred/org/apache/hadoop/mapred/LocalJobRunner.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1.2/src/mapred/org/apache/hadoop/mapred/LocalJobRunner.java?rev=1471637&r1=1471636&r2=1471637&view=diff
==============================================================================
--- hadoop/common/branches/branch-1.2/src/mapred/org/apache/hadoop/mapred/LocalJobRunner.java
(original)
+++ hadoop/common/branches/branch-1.2/src/mapred/org/apache/hadoop/mapred/LocalJobRunner.java
Wed Apr 24 20:19:34 2013
@@ -18,6 +18,10 @@
 
 package org.apache.hadoop.mapred;
 
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
 import java.io.File;
 import java.io.IOException;
 import java.io.OutputStream;
@@ -69,8 +73,6 @@ public class LocalJobRunner implements J
 
   private static final String jobDir =  "localRunner/";
   
-  private static final Counters EMPTY_COUNTERS = new Counters();
-  
   public long getProtocolVersion(String protocol, long clientVersion) {
     return JobSubmissionProtocol.versionID;
   }
@@ -263,10 +265,10 @@ public class LocalJobRunner implements J
       this.partialMapProgress = new float[numMaps];
       this.mapCounters = new Counters[numMaps];
       for (int i = 0; i < numMaps; i++) {
-        this.mapCounters[i] = EMPTY_COUNTERS;
+        this.mapCounters[i] = new Counters();
       }
 
-      this.reduceCounters = EMPTY_COUNTERS;
+      this.reduceCounters = new Counters();
     }
 
     /**
@@ -453,6 +455,14 @@ public class LocalJobRunner implements J
 
     public synchronized boolean statusUpdate(TaskAttemptID taskId, TaskStatus taskStatus,

         JvmContext context) throws IOException, InterruptedException {
+      // Serialize as we would if distributed in order to make deep copy
+      ByteArrayOutputStream baos = new ByteArrayOutputStream();
+      DataOutputStream dos = new DataOutputStream(baos);
+      TaskStatus.writeTaskStatus(dos, taskStatus);
+      dos.close();
+      taskStatus = TaskStatus.readTaskStatus(new DataInputStream(
+          new ByteArrayInputStream(baos.toByteArray())));
+      
       LOG.info(taskStatus.getStateString());
       int taskIndex = mapIds.indexOf(taskId);
       if (taskIndex >= 0) {                       // mapping
@@ -477,13 +487,13 @@ public class LocalJobRunner implements J
     /** Return the current values of the counters for this job,
      * including tasks that are in progress.
     */
-    public synchronized Counters getCurrentCounters() {
+    public synchronized Counters GetCurrentCounters() {
       if (null == mapCounters) {
         // Counters not yet initialized for job.
-        return EMPTY_COUNTERS;
+        return new Counters();
       }
 
-      Counters current = EMPTY_COUNTERS;
+      Counters current = new Counters();
       for (Counters c : mapCounters) {
         current = Counters.sum(current, c);
       }



Mime
View raw message