hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From yjzhan...@apache.org
Subject hadoop git commit: HDFS-9612. DistCp worker threads are not terminated after jobs are done. (Wei-Chiu Chuang via Yongjun Zhang)
Date Fri, 15 Jan 2016 19:01:30 GMT
Repository: hadoop
Updated Branches:
  refs/heads/branch-2.8 9baeae936 -> 4c8131b1b


HDFS-9612. DistCp worker threads are not terminated after jobs are done. (Wei-Chiu Chuang
via Yongjun Zhang)

(cherry picked from commit a9c69ebeb707801071db3cc22bfcd14f87be443a)

Conflicts:
	hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/4c8131b1
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/4c8131b1
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/4c8131b1

Branch: refs/heads/branch-2.8
Commit: 4c8131b1bf8ead0722b7ec005bceb3ea16ef4f6f
Parents: 9baeae9
Author: Yongjun Zhang <yzhang@cloudera.com>
Authored: Fri Jan 15 10:03:09 2016 -0800
Committer: Yongjun Zhang <yzhang@cloudera.com>
Committed: Fri Jan 15 10:12:15 2016 -0800

----------------------------------------------------------------------
 hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt     |  3 +
 .../hadoop/tools/util/ProducerConsumer.java     | 55 +++++++++----
 .../hadoop/tools/util/WorkRequestProcessor.java |  2 +
 .../hadoop/tools/util/TestProducerConsumer.java | 84 ++++++++++++++++++++
 4 files changed, 130 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/4c8131b1/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index b9762d9..564b792 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -1657,6 +1657,9 @@ Release 2.8.0 - UNRELEASED
     HDFS-9493. Test o.a.h.hdfs.server.namenode.TestMetaSave fails in trunk.
     (Tony Wu via lei)
 
+    HDFS-9612. DistCp worker threads are not terminated after jobs are done.
+    (Wei-Chiu Chuang via Yongjun Zhang)
+
 Release 2.7.3 - UNRELEASED
 
   INCOMPATIBLE CHANGES

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4c8131b1/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/ProducerConsumer.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/ProducerConsumer.java
b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/ProducerConsumer.java
index 16bf254..906e1ea 100644
--- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/ProducerConsumer.java
+++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/ProducerConsumer.java
@@ -70,7 +70,10 @@ public class ProducerConsumer<T, R> {
    *  completion of any pending work.
    */
   public void shutdown() {
-    executor.shutdown();
+    if (hasWork()) {
+      LOG.warn("Shutdown() is called but there are still unprocessed work!");
+    }
+    executor.shutdownNow();
   }
 
   /**
@@ -117,6 +120,8 @@ public class ProducerConsumer<T, R> {
   /**
    *  Blocking take from ProducerConsumer output queue that can be interrupted.
    *
+   *  @throws InterruptedException if interrupted before an element becomes
+   *  available.
    *  @return  item returned by processor's processItem().
    */
   public WorkReport<R> take() throws InterruptedException {
@@ -143,30 +148,52 @@ public class ProducerConsumer<T, R> {
     }
   }
 
+  /**
+   * Worker thread implementation.
+   *
+   */
   private class Worker implements Runnable {
     private WorkRequestProcessor<T, R> processor;
 
+    /**
+     * Constructor.
+     * @param processor is used to process an item from input queue.
+     */
     public Worker(WorkRequestProcessor<T, R> processor) {
       this.processor = processor;
     }
 
+    /**
+     * The worker continuously gets an item from input queue, process it and
+     * then put the processed result into output queue. It waits to get an item
+     * from input queue if there's none.
+     */
     public void run() {
       while (true) {
+        WorkRequest<T> work;
+
         try {
-          WorkRequest<T> work = inputQueue.take();
-          WorkReport<R> result = processor.processItem(work);
-
-          boolean isDone = false;
-          while (!isDone) {
-            try {
-              outputQueue.put(result);
-              isDone = true;
-            } catch (InterruptedException ie) {
-              LOG.debug("Could not put report into outputQueue. Retrying...");
-            }
+          work = inputQueue.take();
+        } catch (InterruptedException e) {
+          // It is assumed that if an interrupt occurs while taking a work
+          // out from input queue, the interrupt is likely triggered by
+          // ProducerConsumer.shutdown(). Therefore, exit the thread.
+          LOG.debug("Interrupted while waiting for requests from inputQueue.");
+          return;
+        }
+
+        boolean isDone = false;
+        while (!isDone) {
+          try {
+            // if the interrupt happens while the work is being processed,
+            // go back to process the same work again.
+            WorkReport<R> result = processor.processItem(work);
+            outputQueue.put(result);
+            isDone = true;
+          } catch (InterruptedException ie) {
+            LOG.debug("Worker thread was interrupted while processing an item,"
+                + " or putting into outputQueue. Retrying...");
           }
-        } catch (InterruptedException ie) {
-          LOG.debug("Interrupted while waiting for request from inputQueue.");
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4c8131b1/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/WorkRequestProcessor.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/WorkRequestProcessor.java
b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/WorkRequestProcessor.java
index 91f738e..6a4c797 100644
--- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/WorkRequestProcessor.java
+++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/WorkRequestProcessor.java
@@ -26,6 +26,8 @@ public interface WorkRequestProcessor<T, R> {
 
   /**
    * Work processor.
+   * The processor should be stateless: that is, it can be repeated after
+   * being interrupted.
    *
    * @param   workRequest  Input work item.
    * @return  Outputs WorkReport after processing workRequest item.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4c8131b1/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/util/TestProducerConsumer.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/util/TestProducerConsumer.java
b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/util/TestProducerConsumer.java
index de0fcfd..ea52f69 100644
--- a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/util/TestProducerConsumer.java
+++ b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/util/TestProducerConsumer.java
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.tools.util;
 
+import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.hadoop.tools.util.ProducerConsumer;
 import org.apache.hadoop.tools.util.WorkReport;
 import org.apache.hadoop.tools.util.WorkRequest;
@@ -27,6 +28,7 @@ import org.junit.Test;
 
 import java.lang.Exception;
 import java.lang.Integer;
+import java.util.concurrent.TimeoutException;
 
 public class TestProducerConsumer {
   public class CopyProcessor implements WorkRequestProcessor<Integer, Integer> {
@@ -64,6 +66,7 @@ public class TestProducerConsumer {
     } catch (InterruptedException ie) {
       Assert.assertTrue(false);
     }
+    worker.shutdown();
   }
 
   @Test
@@ -89,6 +92,7 @@ public class TestProducerConsumer {
     }
     Assert.assertEquals(0, sum);
     Assert.assertEquals(numRequests, numReports);
+    workers.shutdown();
   }
 
   @Test
@@ -105,5 +109,85 @@ public class TestProducerConsumer {
     } catch (InterruptedException ie) {
       Assert.assertTrue(false);
     }
+    worker.shutdown();
+  }
+
+  @Test
+  public void testSimpleProducerConsumerShutdown() throws InterruptedException,
+      TimeoutException {
+    // create a producer-consumer thread pool with one thread.
+    ProducerConsumer<Integer, Integer> worker =
+        new ProducerConsumer<Integer, Integer>(1);
+    worker.addWorker(new CopyProcessor());
+    // interrupt worker threads
+    worker.shutdown();
+    // Regression test for HDFS-9612
+    // Periodically check, and make sure that worker threads are ultimately
+    // terminated after interrupts
+    GenericTestUtils.waitForThreadTermination("pool-.*-thread.*",100,10000);
+  }
+
+  @Test(timeout=10000)
+  public void testMultipleProducerConsumerShutdown()
+      throws InterruptedException, TimeoutException {
+    int numWorkers = 10;
+    // create a producer consumer thread pool with 10 threads.
+    final ProducerConsumer<Integer, Integer> worker =
+        new ProducerConsumer<Integer, Integer>(numWorkers);
+    for (int i=0; i< numWorkers; i++) {
+      worker.addWorker(new CopyProcessor());
+    }
+
+    // starts two thread: a source thread which put in work, and a sink thread
+    // which takes a piece of work from ProducerConsumer
+    class SourceThread extends Thread {
+      public void run() {
+        while (true) {
+          try {
+            worker.put(new WorkRequest<Integer>(42));
+            Thread.sleep(1);
+          } catch (InterruptedException ie) {
+            return;
+          }
+        }
+      }
+    };
+    // The source thread put requests into producer-consumer.
+    SourceThread source = new SourceThread();
+    source.start();
+    class SinkThread extends Thread {
+      public void run() {
+        try {
+          while (true) {
+            WorkReport<Integer> report = worker.take();
+            Assert.assertEquals(42, report.getItem().intValue());
+          }
+        } catch (InterruptedException ie) {
+          return;
+        }
+      }
+    };
+    // The sink thread gets proceessed items from producer-consumer
+    SinkThread sink = new SinkThread();
+    sink.start();
+    // sleep 1 second and then shut down source.
+    // This makes sure producer consumer gets some work to do
+    Thread.sleep(1000);
+    // after 1 second, stop source thread to stop pushing items.
+    source.interrupt();
+    // wait until all work is consumed by sink
+    while (worker.hasWork()) {
+      Thread.sleep(1);
+    }
+    worker.shutdown();
+    // Regression test for HDFS-9612
+    // make sure worker threads are terminated after workers are asked to
+    // shutdown.
+    GenericTestUtils.waitForThreadTermination("pool-.*-thread.*",100,10000);
+
+    sink.interrupt();
+
+    source.join();
+    sink.join();
   }
 }


Mime
View raw message