hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From acmur...@apache.org
Subject svn commit: r677470 - in /hadoop/core/trunk: CHANGES.txt src/mapred/org/apache/hadoop/mapred/MapTask.java
Date Wed, 16 Jul 2008 23:55:47 GMT
Author: acmurthy
Date: Wed Jul 16 16:55:47 2008
New Revision: 677470

URL: http://svn.apache.org/viewvc?rev=677470&view=rev
Log:
HADOOP-3617. Removed redundant checks of accounting space in MapTask and makes the spill thread
persistent so as to avoid creating a new one for each spill. Contributed by Chris Douglas.

Modified:
    hadoop/core/trunk/CHANGES.txt
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MapTask.java

Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=677470&r1=677469&r2=677470&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Wed Jul 16 16:55:47 2008
@@ -90,6 +90,10 @@
     randomization is by the hosts (and not the map outputs themselves). 
     (Jothi Padmanabhan via ddas)
 
+    HADOOP-3617. Removed redundant checks of accounting space in MapTask and
+    makes the spill thread persistent so as to avoid creating a new one for
+    each spill. (Chris Douglas via acmurthy)  
+
   OPTIMIZATIONS
 
     HADOOP-3556. Removed lock contention in MD5Hash by changing the 

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MapTask.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MapTask.java?rev=677470&r1=677469&r2=677470&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MapTask.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MapTask.java Wed Jul 16 16:55:47
2008
@@ -32,6 +32,8 @@
 import java.io.OutputStream;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -326,8 +328,12 @@
     private final int softBufferLimit;
     private final int minSpillsForCombine;
     private final IndexedSorter sorter;
-    private final Object spillLock = new Object();
+    private final ReentrantLock spillLock = new ReentrantLock();
+    private final Condition spillDone = spillLock.newCondition();
+    private final Condition spillReady = spillLock.newCondition();
     private final BlockingBuffer bb = new BlockingBuffer();
+    private volatile boolean spillThreadRunning = false;
+    private final SpillThread spillThread = new SpillThread();
 
     private final FileSystem localFs;
 
@@ -403,6 +409,24 @@
         ? new CombineOutputCollector(combineOutputCounter)
         : null;
       minSpillsForCombine = job.getInt("min.num.spills.for.combine", 3);
+      spillThread.setDaemon(true);
+      spillThread.setName("SpillThread");
+      spillLock.lock();
+      try {
+        spillThread.start();
+        while (!spillThreadRunning) {
+          spillDone.await();
+        }
+      } catch (InterruptedException e) {
+        throw (IOException)new IOException("Spill thread failed to initialize"
+            ).initCause(sortSpillException);
+      } finally {
+        spillLock.unlock();
+      }
+      if (sortSpillException != null) {
+        throw (IOException)new IOException("Spill thread failed to initialize"
+            ).initCause(sortSpillException);
+      }
     }
 
     @SuppressWarnings("unchecked")
@@ -419,12 +443,43 @@
                               + valClass.getName() + ", recieved "
                               + value.getClass().getName());
       }
-      if (sortSpillException != null) {
-        throw (IOException)new IOException("Spill failed"
-            ).initCause(sortSpillException);
+      final int kvnext = (kvindex + 1) % kvoffsets.length;
+      spillLock.lock();
+      try {
+        boolean kvfull;
+        do {
+          if (sortSpillException != null) {
+            throw (IOException)new IOException("Spill failed"
+                ).initCause(sortSpillException);
+          }
+          // sufficient acct space
+          kvfull = kvnext == kvstart;
+          final boolean kvsoftlimit = ((kvnext > kvend)
+              ? kvnext - kvend > softRecordLimit
+              : kvend - kvnext <= kvoffsets.length - softRecordLimit);
+          if (kvstart == kvend && kvsoftlimit) {
+            LOG.info("Spilling map output: record full = " + kvsoftlimit);
+            startSpill();
+          }
+          if (kvfull) {
+            try {
+              while (kvstart != kvend) {
+                reporter.progress();
+                spillDone.await();
+              }
+            } catch (InterruptedException e) {
+              throw (IOException)new IOException(
+                  "Collector interrupted while waiting for the writer"
+                  ).initCause(e);
+            }
+          }
+        } while (kvfull);
+      } finally {
+        spillLock.unlock();
       }
+
       try {
-          // serialize key bytes into buffer
+        // serialize key bytes into buffer
         int keystart = bufindex;
         keySerializer.serialize(key);
         if (bufindex < keystart) {
@@ -433,25 +488,20 @@
           keystart = 0;
         }
         // serialize value bytes into buffer
-        int valstart = bufindex;
+        final int valstart = bufindex;
         valSerializer.serialize(value);
         int valend = bb.markRecord();
-        mapOutputByteCounter.increment(valend >= keystart
-            ? valend - keystart
-            : (bufvoid - keystart) + valend);
 
-        if (keystart == bufindex) {
-          // if emitted records make no writes, it's possible to wrap
-          // accounting space without notice
-          bb.write(new byte[0], 0, 0);
-        }
-
-        int partition = partitioner.getPartition(key, value, partitions);
+        final int partition = partitioner.getPartition(key, value, partitions);
         if (partition < 0 || partition >= partitions) {
           throw new IOException("Illegal partition for " + key + " (" +
               partition + ")");
         }
+
         mapOutputRecordCounter.increment(1);
+        mapOutputByteCounter.increment(valend >= keystart
+            ? valend - keystart
+            : (bufvoid - keystart) + valend);
 
         // update accounting info
         int ind = kvindex * ACCTSIZE;
@@ -459,7 +509,7 @@
         kvindices[ind + PARTITION] = partition;
         kvindices[ind + KEYSTART] = keystart;
         kvindices[ind + VALSTART] = valstart;
-        kvindex = (kvindex + 1) % kvoffsets.length;
+        kvindex = kvnext;
       } catch (MapBufferTooSmallException e) {
         LOG.info("Record too large for in-memory buffer: " + e.getMessage());
         spillSingleRecord(key, value);
@@ -578,19 +628,16 @@
       @Override
       public synchronized void write(byte b[], int off, int len)
           throws IOException {
-        boolean kvfull = false;
         boolean buffull = false;
         boolean wrap = false;
-        synchronized(spillLock) {
+        spillLock.lock();
+        try {
           do {
             if (sortSpillException != null) {
               throw (IOException)new IOException("Spill failed"
                   ).initCause(sortSpillException);
             }
 
-            // sufficient accounting space?
-            final int kvnext = (kvindex + 1) % kvoffsets.length;
-            kvfull = kvnext == kvstart;
             // sufficient buffer space?
             if (bufstart <= bufend && bufend <= bufindex) {
               buffull = bufindex + len > bufvoid;
@@ -606,26 +653,12 @@
               // spill thread not running
               if (kvend != kvindex) {
                 // we have records we can spill
-                final boolean kvsoftlimit = (kvnext > kvend)
-                  ? kvnext - kvend > softRecordLimit
-                  : kvend - kvnext <= kvoffsets.length - softRecordLimit;
                 final boolean bufsoftlimit = (bufindex > bufend)
                   ? bufindex - bufend > softBufferLimit
                   : bufend - bufindex < bufvoid - softBufferLimit;
-                if (kvsoftlimit || bufsoftlimit || (buffull && !wrap)) {
-                  LOG.info("Spilling map output: buffer full = " + bufsoftlimit+
-                           " and record full = " + kvsoftlimit);
-                  LOG.info("bufstart = " + bufstart + "; bufend = " + bufmark +
-                           "; bufvoid = " + bufvoid);
-                  LOG.info("kvstart = " + kvstart + "; kvend = " + kvindex +
-                           "; length = " + kvoffsets.length);
-                  kvend = kvindex;
-                  bufend = bufmark;
-                  // TODO No need to recreate this thread every time
-                  SpillThread t = new SpillThread();
-                  t.setDaemon(true);
-                  t.setName("SpillThread");
-                  t.start();
+                if (bufsoftlimit || (buffull && !wrap)) {
+                  LOG.info("Spilling map output: buffer full= " + bufsoftlimit);
+                  startSpill();
                 }
               } else if (buffull && !wrap) {
                 // We have no buffered records, and this record is too large
@@ -641,19 +674,21 @@
               }
             }
 
-            if (kvfull || (buffull && !wrap)) {
-              while (kvstart != kvend) {
-                reporter.progress();
-                try {
-                  spillLock.wait();
-                } catch (InterruptedException e) {
+            if (buffull && !wrap) {
+              try {
+                while (kvstart != kvend) {
+                  reporter.progress();
+                  spillDone.await();
+                }
+              } catch (InterruptedException e) {
                   throw (IOException)new IOException(
                       "Buffer interrupted while waiting for the writer"
                       ).initCause(e);
-                }
               }
             }
-          } while (kvfull || (buffull && !wrap));
+          } while (buffull && !wrap);
+        } finally {
+          spillLock.unlock();
         }
         // here, we know that we have sufficient space to write
         if (buffull) {
@@ -670,30 +705,41 @@
 
     public synchronized void flush() throws IOException {
       LOG.info("Starting flush of map output");
-      synchronized (spillLock) {
+      spillLock.lock();
+      try {
         while (kvstart != kvend) {
-          try {
-            reporter.progress();
-            spillLock.wait();
-          } catch (InterruptedException e) {
-            throw (IOException)new IOException(
-                "Buffer interrupted while waiting for the writer"
-                ).initCause(e);
-          }
+          reporter.progress();
+          spillDone.await();
+        }
+        if (sortSpillException != null) {
+          throw (IOException)new IOException("Spill failed"
+              ).initCause(sortSpillException);
+        }
+        if (kvend != kvindex) {
+          kvend = kvindex;
+          bufend = bufmark;
+          sortAndSpill();
         }
+      } catch (InterruptedException e) {
+        throw (IOException)new IOException(
+            "Buffer interrupted while waiting for the writer"
+            ).initCause(e);
+      } finally {
+        spillLock.unlock();
       }
-      if (sortSpillException != null) {
+      assert !spillLock.isHeldByCurrentThread();
+      // shut down spill thread and wait for it to exit. Since the preceding
+      // ensures that it is finished with its work (and sortAndSpill did not
+      // throw), we elect to use an interrupt instead of setting a flag.
+      // Spilling simultaneously from this thread while the spill thread
+      // finishes its work might be both a useful way to extend this and also
+      // sufficient motivation for the latter approach.
+      try {
+        spillThread.interrupt();
+        spillThread.join();
+      } catch (InterruptedException e) {
         throw (IOException)new IOException("Spill failed"
-            ).initCause(sortSpillException);
-      }
-      if (kvend != kvindex) {
-        LOG.info("bufstart = " + bufstart + "; bufend = " + bufmark +
-                 "; bufvoid = " + bufvoid);
-        LOG.info("kvstart = " + kvstart + "; kvend = " + kvindex +
-                 "; length = " + kvoffsets.length);
-        kvend = kvindex;
-        bufend = bufmark;
-        sortAndSpill();
+            ).initCause(e);
       }
       // release sort buffer before the merge
       kvbuffer = null;
@@ -706,23 +752,47 @@
 
       @Override
       public void run() {
+        spillLock.lock();
+        spillThreadRunning = true;
         try {
-          sortAndSpill();
-        } catch (Throwable e) {
-          sortSpillException = e;
-        } finally {
-          synchronized(spillLock) {
-            if (bufend < bufindex && bufindex < bufstart) {
-              bufvoid = kvbuffer.length;
+          while (true) {
+            spillDone.signal();
+            while (kvstart == kvend) {
+              spillReady.await();
+            }
+            try {
+              spillLock.unlock();
+              sortAndSpill();
+            } catch (Throwable e) {
+              sortSpillException = e;
+            } finally {
+              spillLock.lock();
+              if (bufend < bufindex && bufindex < bufstart) {
+                bufvoid = kvbuffer.length;
+              }
+              kvstart = kvend;
+              bufstart = bufend;
             }
-            kvstart = kvend;
-            bufstart = bufend;
-            spillLock.notify();
           }
+        } catch (InterruptedException e) {
+          Thread.currentThread().interrupt();
+        } finally {
+          spillLock.unlock();
+          spillThreadRunning = false;
         }
       }
     }
 
+    private synchronized void startSpill() {
+      LOG.info("bufstart = " + bufstart + "; bufend = " + bufmark +
+               "; bufvoid = " + bufvoid);
+      LOG.info("kvstart = " + kvstart + "; kvend = " + kvindex +
+               "; length = " + kvoffsets.length);
+      kvend = kvindex;
+      bufend = bufmark;
+      spillReady.signal();
+    }
+
     private void sortAndSpill() throws IOException {
       //approximate the length of the output file to be the length of the
       //buffer + header lengths for the partitions



Mime
View raw message